Introduction

In Part 1 of this series, we explored the theoretical foundations of Large Language Models (LLMs), Retrieval Augmented Generation (RAG), and vector databases. Now, it’s time to put theory into practice. This is going to be a long read, so grab some coffee, and one (couple) of your favorite biscuits.

One use case for leveraging LLM’s, is creating of a Agent - a Senior Data Engineer AI that automatically reviews Pull Requests in your data engineering projects. This agent will be that nit picky Data Engineer that enforces SQL formatting standards, ensure naming and data type consistency, validate data quality checks, and suggest improvements based on best practices. By integrating this into your GitHub workflow, you can maintain higher code quality, accelerate onboarding for new team members, and reduce the burden of manual code reviews.

This tutorial walks through the complete implementation process from setting up the necessary API keys to crafting the AI agent and configuring GitHub Actions to run it automatically on every Pull Request.




Project Overview


Our AI-powered code review system will:

  1. Monitor GitHub for new Pull Requests in your data repository
  2. Extract changed files for analysis
  3. Retrieve relevant context about your code standards and existing codebase
  4. Analyze changes using a custom-prompted LLM
  5. Generate detailed feedback on code quality, standards compliance, and potential improvements
  6. Post review comments directly on the Pull Request

This solution combines GitHub APIs, LLM capabilities, and custom prompt engineering to create an automated reviewer that thinks like a senior data engineer on your team.

Although this does offer a solution, there is plenty of things that could be done to make this more robust, and far more superior. So please use this more for educational purposes, and a demonstration.


Use Cases and Benefits

This implementation addresses several key challenges in data engineering teams:

  • Consistency Enforcement: Ensures all database objects follow the same naming conventions and data types
  • Knowledge Transfer: Helps junior engineers learn best practices through contextual feedback
  • Review Acceleration: Handles routine checks automatically, letting human reviewers focus on architecture and business logic
  • Standards Documentation: Indirectly builds a living set of standards through consistent application
  • Technical Debt Prevention: Catches potential issues before they enter the codebase

If setup correctly you can leverage a vector database to store context and improve for your business process over time.




Step 1: Setting Up the Required API Keys


To build our system, we’ll need authentication tokens for GitHub and an LLM provider. Here’s how to set these up securely:


GitHub Personal Access Token

  1. Log in to your GitHub account and navigate to SettingsDeveloper settingsPersonal access tokensTokens (classic)
  2. Click Generate new token and select Generate new token (classic)
  3. Name your token something descriptive like “AI Code Reviewer”
  4. Set the expiration according to your needs
  5. Select the following permissions:
    • repo (Full control of private repositories)
    • workflow (Update GitHub Action workflows)
    • pull_requests (Access to pull requests)
  6. Click Generate token and copy the token immediately (you won’t be able to see it again)

LLM API Key

You’ll need access to a capable LLM. We’ll provide instructions for both Anthropic’s Claude and OpenAI’s GPT-4:

Anthropic Claude API Key:

  1. Visit Anthropic’s Console
  2. Create or log in to your account
  3. Navigate to API Keys in the left sidebar
  4. Click Create API Key
  5. Name your key (e.g., “GitHub Code Reviewer”)
  6. Copy and securely store your API key

OpenAI API Key (Alternative):

  1. Visit OpenAI’s Platform
  2. Sign in to your account
  3. Navigate to API Keys
  4. Click Create new secret key
  5. Name your key and click Create
  6. Copy and securely store the API key

Storing Secrets in GitHub Repository

Now we’ll add these secrets to your GitHub repository:

  1. Navigate to your repository on GitHub
  2. Go to SettingsSecrets and variablesActions
  3. Click New repository secret
  4. Add the following secrets:
    • GH_TOKEN: Your GitHub Personal Access Token
    • ANTHROPIC_API_KEY or OPENAI_API_KEY: Your LLM API key

These secrets will be accessible in our GitHub Actions workflow without being exposed in logs or to unauthorized users.




Step 2: Creating the AI Code Reviewer Script


We’ll create a Python script that pulls code from a PR, analyzes it with an LLM, and posts comments back to GitHub. Here’s the implementation:

import os
import requests
import json
import anthropic  # or openai
import base64
from github import Github
from pathlib import Path

# Initialize clients
github_token = os.environ["GH_TOKEN"]
g = Github(github_token)

# Choose your LLM provider
USE_CLAUDE = True  # Set to False to use OpenAI

if USE_CLAUDE:
    anthropic_key = os.environ["ANTHROPIC_API_KEY"]
    client = anthropic.Anthropic(api_key=anthropic_key)
else:
    import openai
    openai.api_key = os.environ["OPENAI_API_KEY"]

# GitHub repository information
repo_owner = os.environ["GITHUB_REPOSITORY_OWNER"]
repo_name = os.environ["GITHUB_REPOSITORY"].split("/")[1]
pr_number = int(os.environ["PR_NUMBER"])

repository = g.get_repo(f"{repo_owner}/{repo_name}")
pull_request = repository.get_pull(pr_number)

def get_sql_standards():
    """Retrieve SQL standards from repository documentation"""
    standards = ""
    try:
        standards_file_content = repository.get_contents("docs/sql_standards.md")
        standards = base64.b64decode(standards_file_content.content).decode("utf-8")
    except:
        # If no standards file exists, use default standards
        standards = """
        # SQL Standards
        
        ## Naming Conventions
        - Tables: snake_case, plural (e.g., customer_orders)
        - Columns: snake_case, singular (e.g., customer_id)
        - Primary keys: id (e.g., customer_id)
        - Foreign keys: Same name as referenced PK (e.g., customer_id)
        - Boolean columns: is_* or has_* prefix (e.g., is_active)
        - Date columns: _date suffix (e.g., created_date or created_date_id)
        
        ## Formatting
        - Keywords in UPPERCASE
        - Indentation of 4 spaces
        - Commas at the end of lines
        - One statement per line
        
        ## Data Types
        - IDs: INTEGER or BIGINT
        - Names: VARCHAR
        - Flags: BOOLEAN
        - Dates: DATE
        - Timestamps: TIMESTAMP(6)
        
        ## DBT Practices
        - Models must be idempotent
        - Include appropriate tests for primary keys or unique keys
        - Add descriptions to all models and columns
        """
    return standards

def get_naming_dictionary():
    """Build a dictionary of standard column names and their data types from the codebase"""
    naming_dict = {}
    try:
        # Check SQL and DBT files in the repository
        sql_extensions = [".sql", ".yml"]
        contents = repository.get_contents("")
        
        # Process files recursively
        while contents:
            file_content = contents.pop(0)
            if file_content.type == "dir":
                contents.extend(repository.get_contents(file_content.path))
            elif any(file_content.name.endswith(ext) for ext in sql_extensions):
                # Extract DDL statements and column names
                file_text = base64.b64decode(file_content.content).decode("utf-8")
                
                # This is a simplified approach - a real implementation would use SQL parsing
                for line in file_text.split("\n"):
                    if "CREATE TABLE" in line.upper() or "column:" in line.lower():
                        # Extract column definitions - this is simplified
                        if "INTEGER" in line.upper():
                            col_name = line.split()[0].strip(",").lower()
                            naming_dict[col_name] = "INTEGER"
                        elif "VARCHAR" in line.upper():
                            col_name = line.split()[0].strip(",").lower()
                            naming_dict[col_name] = "VARCHAR"
                        elif "DATE" in line.upper():
                            col_name = line.split()[0].strip(",").lower()
                            naming_dict[col_name] = "DATE"
                        # Add more data types as needed
    except Exception as e:
        print(f"Error building naming dictionary: {str(e)}")
    
    # Add some default mappings if the dictionary is empty
    if not naming_dict:
        naming_dict = {
            "account_number": "VARCHAR",
            "customer_id": "VARCHAR",
            "email_address": "VARCHAR",
            "created_date": "DATE",
            "is_active": "BOOLEAN",
            "transaction_amount": "DECIMAL",
            "product_name": "VARCHAR",
            "order_id": "INTEGER"
        }
    
    return naming_dict

def analyze_file(file_path, file_content, sql_standards, naming_dict):
    """Submit file to LLM for analysis"""
    
    # Construct the prompt for the LLM
    prompt = f"""
    You are an expert Senior Data Engineer responsible for reviewing SQL and data transformation code. 
    You're reviewing a pull request and need to provide detailed, actionable feedback.
    
    # SQL STANDARDS AND BEST PRACTICES
    {sql_standards}
    
    # KNOWN COLUMN NAMING PATTERNS AND DATA TYPES
    {json.dumps(naming_dict, indent=4)}
    
    # FILE TO REVIEW
    Path: {file_path}
    
    ```
    {file_content}
    ```
    
    Please review this code thoroughly and provide feedback on:
    
    1. SQL formatting and adherence to our standards
    2. Naming consistency (compared to our naming dictionary)
    3. Data type consistency
    4. Idempotency issues
    5. Missing data quality checks or tests
    6. Proper use of dbt_valid_from and dbt_valid_to for time-based models
    7. Performance concerns
    8. Any other issues or suggestions for improvement
    
    Format your response as:
    
    ## Summary
    A brief overview of the code quality (1-2 sentences)
    
    ## Issues
    Describe each issue you find with line numbers and specific recommendations
    
    ## Suggestions
    Provide constructive suggestions for improvement
    
    ## Proposed Tests
    If relevant, suggest tests that would improve data quality
    
    Focus on being helpful and educational. If something is done well, mention that too.
    """
    
    # Call the appropriate LLM API
    if USE_CLAUDE:
        response = client.messages.create(
            model="claude-3-opus-20240229",
            max_tokens=4000,
            temperature=0.2,
            system="You are a Senior Data Engineer who specializes in code review. You provide constructive, specific feedback focused on data engineering best practices.",
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        return response.content[0].text
    else:
        response = openai.ChatCompletion.create(
            model="gpt-4-turbo",
            temperature=0.2,
            messages=[
                {"role": "system", "content": "You are a Senior Data Engineer who specializes in code review. You provide constructive, specific feedback focused on data engineering best practices."},
                {"role": "user", "content": prompt}
            ]
        )
        return response.choices[0].message.content

def post_review_comment(pull_request, body):
    """Post a review comment to the pull request"""
    try:
        pull_request.create_issue_comment(body)
        print(f"Posted review comment to PR #{pr_number}")
    except Exception as e:
        print(f"Error posting comment: {str(e)}")

def main():
    # Get SQL standards and naming dictionary
    sql_standards = get_sql_standards()
    naming_dict = get_naming_dictionary()
    
    # Get the files changed in the PR
    files = pull_request.get_files()
    
    # Track all feedback to post in one comment
    all_feedback = f"# AI Data Engineer Review for PR #{pr_number}\n\n"
    
    # Analyze each file
    for file in files:
        # Only process SQL and DBT files
        if file.filename.endswith(('.sql', '.dbt', '.yml')):
            print(f"Analyzing {file.filename}...")
            file_content = requests.get(file.raw_url).text
            
            # Analyze the file
            feedback = analyze_file(file.filename, file_content, sql_standards, naming_dict)
            
            all_feedback += f"## File: {file.filename}\n\n{feedback}\n\n---\n\n"
    
    # Add a conclusion
    all_feedback += """
    **Note**: This review was generated by an AI assistant. While it strives for accuracy, always use your judgment when implementing suggestions.
    
    If you have feedback on how to improve these automated reviews, please let us know!
    """
    
    # Post the combined review
    post_review_comment(pull_request, all_feedback)

if __name__ == "__main__":
    main()

Let’s understand what this script does:

  1. It authenticates with GitHub and the LLM API of your choice (Claude or GPT-4)
  2. Extracts the SQL standards from your repository (or uses defaults)
  3. Builds a dictionary of common column names and data types from your codebase
  4. For each changed SQL or DBT file in the PR:
    • Retrieves the file content
    • Sends it to the LLM for analysis with a detailed prompt
    • Collects the feedback
  5. Posts a comprehensive review comment to the PR

This script combines several powerful techniques:

  • Contextual prompting: It gives the LLM your specific SQL standards and naming conventions
  • Domain-specific guidance: The prompt focuses on data engineering concerns like idempotency and SCD tracking
  • Actionable feedback: It asks for specific line numbers and recommendations



Step 3: Creating the GitHub Action Workflow


Now let’s set up the GitHub Action workflow to run this script automatically on every PR. Create a file at .github/workflows/ai-code-review.yml in your repository:

name: AI Data Engineer Code Review

on:
  pull_request:
    types: [opened, synchronize]
    paths:
      - '**.sql'
      - '**/models/**.yml'
      - '**/dbt_project.yml'

jobs:
  review:
    runs-on: ubuntu-latest
    permissions:
      contents: read
      pull-requests: write
    
    steps:
      - name: Checkout repository
        uses: actions/checkout@v3
        with:
          fetch-depth: 0
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install PyGithub requests anthropic openai
      
      - name: Run AI code review
        env:
          GH_TOKEN: ${{ secrets.GH_TOKEN }}
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
          PR_NUMBER: ${{ github.event.pull_request.number }}
          GITHUB_REPOSITORY: ${{ github.repository }}
          GITHUB_REPOSITORY_OWNER: ${{ github.repository_owner }}
        run: |
          # Create the code review script
          cat > ai_code_review.py << 'EOF'
          # Paste the entire Python script from Step 2 here
          EOF
          
          # Run the script
          python ai_code_review.py

This workflow:

  1. Triggers whenever a PR is opened or updated
  2. Only runs when SQL, or YAML files are changed
  3. Sets up Python and installs the required dependencies
  4. Creates the review script (by embedding it in the workflow)
  5. Runs the script with the necessary environment variables



Step 4: Implementing RAG for Contextual Code Review


The real power of our AI code reviewer comes from using Retrieval Augmented Generation (RAG) to give the AI contextual understanding of your codebase, conventions, and standards. Rather than relying on hardcoded rules, we’ll build a vector database that understands your organization’s patterns and practices.

Here’s how we’ll approach this:

Creating and Maintaining a Vector Database of Codebase Knowledge

import os
import re
import numpy as np
import json
import base64
from datetime import datetime
import pinecone
from sentence_transformers import SentenceTransformer

# Initialize the embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
EMBEDDING_DIMENSION = 384  # Dimension for all-MiniLM-L6-v2

# Initialize Pinecone
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT", "gcp-starter")
INDEX_NAME = "data-engineer-reviewer"

pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT)

# Create index if it doesn't exist
if INDEX_NAME not in pinecone.list_indexes():
    pinecone.create_index(
        name=INDEX_NAME,
        dimension=EMBEDDING_DIMENSION,
        metric="cosine",
        shards=1
    )

# Connect to the index
index = pinecone.Index(INDEX_NAME)

def extract_code_knowledge(repository):
    """Extract and process knowledge from repository files"""
    print("Beginning knowledge extraction from repository...")
    
    # Lists to store our embeddings and metadata
    sql_standards_data = []
    naming_patterns_data = []
    data_type_patterns_data = []
    dbt_patterns_data = []
    
    sql_files_processed = 0
    
    # Get repository contents recursively
    contents = repository.get_contents("")
    sql_extensions = ['.sql', '.dbt', '.yml']
    
    while contents:
        file_content = contents.pop(0)
        
        if file_content.type == "dir":
            try:
                dir_contents = repository.get_contents(file_content.path)
                contents.extend(dir_contents)
            except Exception as e:
                print(f"Error accessing directory {file_content.path}: {str(e)}")
                continue
        
        elif any(file_content.path.endswith(ext) for ext in sql_extensions):
            try:
                # Process SQL and DBT files
                file_text = base64.b64decode(file_content.content).decode("utf-8")
                sql_files_processed += 1
                
                # Extract column definitions and their data types
                if file_content.path.endswith('.sql'):
                    # Extract table creation patterns
                    create_table_matches = re.findall(r"CREATE\s+TABLE\s+(\w+)\s*\((.*?)\)", file_text, re.DOTALL | re.IGNORECASE)
                    for table_name, columns_text in create_table_matches:
                        columns = columns_text.split(',')
                        for col in columns:
                            col = col.strip()
                            if col and not col.startswith('--'):
                                col_parts = col.split()
                                if len(col_parts) >= 2:
                                    col_name = col_parts[0]
                                    col_type = col_parts[1]
                                    
                                    text = f"Column '{col_name}' is defined with data type '{col_type}' in table '{table_name}'"
                                    embedding = model.encode(text).tolist()
                                    
                                    data_type_patterns_data.append({
                                        "id": f"dt_{col_name}_{sql_files_processed}",
                                        "values": embedding,
                                        "metadata": {
                                            "text": text,
                                            "column_name": col_name,
                                            "data_type": col_type,
                                            "file_path": file_content.path,
                                            "source": "table_definition"
                                        }
                                    })
                    
                    # Extract column references in SELECT statements
                    select_matches = re.findall(r"SELECT\s+(.*?)\s+FROM", file_text, re.DOTALL | re.IGNORECASE)
                    for select_clause in select_matches:
                        if '*' not in select_clause:  # Skip SELECT *
                            columns = select_clause.split(',')
                            for col in columns:
                                col = col.strip()
                                if col and not col.startswith('--'):
                                    # Handle aliasing with 'AS'
                                    if ' AS ' in col.upper():
                                        col_parts = col.split(' AS ', 1)
                                        col_name = col_parts[1].strip()
                                    else:
                                        col_name = col.split('.')[-1].strip()
                                    
                                    text = f"Column name '{col_name}' is used in SELECT statement in file {file_content.path}"
                                    embedding = model.encode(text).tolist()
                                    
                                    naming_patterns_data.append({
                                        "id": f"np_{col_name}_{sql_files_processed}",
                                        "values": embedding,
                                        "metadata": {
                                            "text": text,
                                            "column_name": col_name,
                                            "file_path": file_content.path,
                                            "source": "select_statement"
                                        }
                                    })
                
                # Extract DBT-specific patterns
                if 'models/' in file_content.path or file_content.path.endswith('.yml'):
                    # Get a snippet of the file for context
                    text = file_text[:1000]  # First 1000 chars for context
                    embedding = model.encode(text).tolist()
                    
                    dbt_patterns_data.append({
                        "id": f"dbt_file_{sql_files_processed}",
                        "values": embedding,
                        "metadata": {
                            "text": text,
                            "file_path": file_content.path,
                            "source": "dbt_file"
                        }
                    })
                    
                    # Look for specific DBT patterns
                    if '{{ config(' in file_text:
                        config_pattern = r"{{[\s]*config\((.*?)\)[\s]*}}"
                        config_matches = re.findall(config_pattern, file_text, re.DOTALL)
                        for config_content in config_matches:
                            text = f"DBT config in {file_content.path}: {config_content}"
                            embedding = model.encode(text).tolist()
                            
                            dbt_patterns_data.append({
                                "id": f"dbt_config_{sql_files_processed}",
                                "values": embedding,
                                "metadata": {
                                    "text": text,
                                    "file_path": file_content.path,
                                    "source": "dbt_config"
                                }
                            })
            
            except Exception as e:
                print(f"Error processing file {file_content.path}: {str(e)}")
    
    # Extract SQL standards if they exist
    try:
        standards_file = repository.get_contents("docs/sql_standards.md")
        standards_text = base64.b64decode(standards_file.content).decode("utf-8")
        
        # Split into sections for better retrieval granularity
        sections = re.split(r'#{2,3}\s+', standards_text)
        for i, section in enumerate(sections):
            if section.strip():
                text = section.strip()
                embedding = model.encode(text).tolist()
                
                sql_standards_data.append({
                    "id": f"std_{i}",
                    "values": embedding,
                    "metadata": {
                        "text": text,
                        "source": "sql_standards",
                        "section_index": i
                    }
                })
    except:
        # If no standards file, use some default standards
        default_standards = [
            "Use snake_case for all database object names",
            "Primary keys should be named 'id' or 'table_name_id'",
            "Foreign keys should match the referenced column name",
            "Use UPPERCASE for SQL keywords",
            "Indent with 2 spaces",
            "Place each column on its own line in SELECT statements",
            "Include a WHERE clause to avoid full table scans",
            "Use consistent data types for similar fields across tables"
        ]
        
        for i, standard in enumerate(default_standards):
            embedding = model.encode(standard).tolist()
            
            sql_standards_data.append({
                "id": f"default_std_{i}",
                "values": embedding,
                "metadata": {
                    "text": standard,
                    "source": "default_standards",
                    "section_index": i
                }
            })
    
    # Update the Pinecone index with the extracted knowledge
    update_pinecone_index(sql_standards_data, naming_patterns_data, data_type_patterns_data, dbt_patterns_data)
    
    print(f"Knowledge extraction complete. Processed {sql_files_processed} SQL files.")
    print(f"Extracted: {len(sql_standards_data)} standard sections, {len(naming_patterns_data)} naming patterns, {len(data_type_patterns_data)} data type patterns, {len(dbt_patterns_data)} DBT patterns")

def update_pinecone_index(sql_standards, naming_patterns, data_type_patterns, dbt_patterns):
    """Update the Pinecone index with extracted knowledge"""
    
    # Pinecone allows bulk upserts for better performance
    # We'll tag vectors by their type for filtering later
    
    # First, delete existing vectors by namespace
    index.delete(delete_all=True, namespace="sql_standards")
    index.delete(delete_all=True, namespace="naming_patterns")
    index.delete(delete_all=True, namespace="data_types")
    index.delete(delete_all=True, namespace="dbt_patterns")
    
    # Upsert SQL standards data
    if sql_standards:
        index.upsert(vectors=sql_standards, namespace="sql_standards")
    
    # Upsert naming patterns data
    if naming_patterns:
        # Pinecone has limits on batch size, so chunk if needed
        chunk_size = 100
        for i in range(0, len(naming_patterns), chunk_size):
            chunk = naming_patterns[i:i + chunk_size]
            index.upsert(vectors=chunk, namespace="naming_patterns")
    
    # Upsert data type patterns
    if data_type_patterns:
        for i in range(0, len(data_type_patterns), chunk_size):
            chunk = data_type_patterns[i:i + chunk_size]
            index.upsert(vectors=chunk, namespace="data_types")
    
    # Upsert DBT patterns
    if dbt_patterns:
        for i in range(0, len(dbt_patterns), chunk_size):
            chunk = dbt_patterns[i:i + chunk_size]
            index.upsert(vectors=chunk, namespace="dbt_patterns")

def query_repository_knowledge(query, namespace, top_k=5):
    """Query the Pinecone index for relevant knowledge"""
    query_embedding = model.encode(query).tolist()
    
    results = index.query(
        vector=query_embedding,
        top_k=top_k,
        include_metadata=True,
        namespace=namespace
    )
    
    # Extract the text from the metadata
    if results.matches:
        retrieved_texts = [match.metadata["text"] for match in results.matches]
        return retrieved_texts
    else:
        return []

Then update the main() function to use our RAG-based approach:

def main():
    # First, extract knowledge from the repository to build/update the vector database
    # This could be done less frequently, such as on a schedule rather than for every PR
    extract_code_knowledge(repository)
    
    # Get the files changed in the PR
    files = pull_request.get_files()
    
    # Track all feedback to post in one comment
    all_feedback = f"# AI Data Engineer Review for PR #{pr_number}\n\n"
    
    # Initialize metrics summary
    metrics = {
        "files_analyzed": 0,
        "issues_found": 0,
        "naming_consistency_issues": 0,
        "dbt_best_practice_issues": 0
    }
    
    # Analyze each file
    for file in files:
        # Only process SQL and DBT files
        if file.filename.endswith(('.sql', '.dbt', '.yml')):
            print(f"Analyzing {file.filename}...")
            file_content = requests.get(file.raw_url).text
            
            # Get primary feedback using RAG-powered analysis
            primary_feedback = analyze_file(file.filename, file_content)
            
            # Verify the feedback with a second agent
            verification = verify_feedback(file.filename, file_content, primary_feedback)
            
            # Analyze the review to identify specific categories of issues
            issues_analysis = analyze_review_for_metrics(primary_feedback)
            
            # Update metrics
            metrics["files_analyzed"] += 1
            metrics["issues_found"] += issues_analysis["total_issues"]
            metrics["naming_consistency_issues"] += issues_analysis["naming_issues"]
            metrics["dbt_best_practice_issues"] += issues_analysis["dbt_issues"]
            
            # Combine feedback with verification
            file_feedback = primary_feedback
            if verification:
                file_feedback += f"\n\n{verification}"
            
            all_feedback += f"## File: {file.filename}\n\n{file_feedback}\n\n---\n\n"
    
    # Add a conclusion
    all_feedback += """
    **Note**: This review was generated by an AI assistant. While it strives for accuracy, always use your judgment when implementing suggestions.
    
    If you have feedback on how to improve these automated reviews, please let us know!
    """
    
    # Post the combined review
    post_review_comment(pull_request, all_feedback)

Also modify the analyze_file function to leverage this contextual knowledge:

def analyze_file(file_path, file_content):
    """Submit file to LLM for analysis using RAG context"""
    
    # Build RAG context from the vector database
    rag_context = build_rag_context_for_review(file_path, file_content)
    
    # Construct the prompt for the LLM
    prompt = f"""
    You are an expert Senior Data Engineer responsible for reviewing SQL and data transformation code. 
    You're reviewing a pull request and need to provide detailed, actionable feedback.
    
    # CONTEXT FROM CODEBASE ANALYSIS
    {rag_context}
    
    # FILE TO REVIEW
    Path: {file_path}
    
    ```
    {file_content}
    ```
    
    Please review this code thoroughly based on the provided context. Focus your feedback on:
    
    1. SQL formatting and adherence to our standards
    2. Naming consistency with existing patterns in our codebase
    3. Data type consistency with how we typically define these columns
    4. Idempotency issues
    5. Missing data quality checks or tests
    6. If DBT code, proper use of dbt_valid_from and dbt_valid_to for time-based models
    7. Performance concerns
    8. Any other issues or suggestions for improvement
    
    Format your response as:
    
    ## Summary
    A brief overview of the code quality (1-2 sentences)
    
    ## Issues
    Describe each issue you find with line numbers and specific recommendations
    
    ## Suggestions
    Provide constructive suggestions for improvement
    
    ## Proposed Tests
    If relevant, suggest tests that would improve data quality
    
    Focus on being helpful and educational. If something is done well, mention that too.
    """
    
    # Call the appropriate LLM API
    if USE_CLAUDE:
        response = client.messages.create(
            model="claude-3-opus-20240229",
            max_tokens=4000,
            temperature=0,
            system="You are a Senior Data Engineer who specializes in code review. You provide constructive, specific feedback focused on data engineering best practices.",
            messages=[
                {"role": "user", "content": prompt}
            ]
        )
        return response.content[0].text
    else:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            temperature=0,
            messages=[
                {"role": "system", "content": "You are a Senior Data Engineer who specializes in code review. You provide constructive, specific feedback focused on data engineering best practices."},
                {"role": "user", "content": prompt}
            ]
        )
        return response.choices[0].message.content

The function to build the RAG context for each review:

def build_rag_context_for_review(file_path, file_content):
    """Build a comprehensive RAG context for the file being reviewed"""
    
    # Determine the type of file
    is_sql = file_path.endswith('.sql')
    is_dbt = 'models/' in file_path or file_path.endswith('.yml')
    
    context_sections = []
    
    # Always include SQL standards
    sql_standards_results = query_repository_knowledge(
        "SQL coding standards and best practices", 
        "sql_standards", 
        top_k=5
    )
    
    if sql_standards_results:
        context_sections.append("# SQL STANDARDS AND BEST PRACTICES")
        for doc in sql_standards_results:
            context_sections.append(doc)
    
    # Extract column names from the file to query for naming patterns
    column_names = []
    if is_sql:
        # Simple regex to extract column names from SELECT statements
        select_clauses = re.findall(r"SELECT\s+(.*?)\s+FROM", file_content, re.DOTALL | re.IGNORECASE)
        for clause in select_clauses:
            if '*' not in clause:
                cols = clause.split(',')
                for col in cols:
                    col = col.strip()
                    if ' AS ' in col.upper():
                        col = col.split(' AS ')[1].strip()
                    else:
                        col = col.split('.')[-1].strip()
                    column_names.append(col)
    
    # Query for naming patterns information
    if column_names:
        naming_context = []
        for col_name in column_names:
            results = query_repository_knowledge(
                f"Column named {col_name}", 
                "naming_patterns", 
                top_k=3
            )
            if results:
                for doc in results:
                    naming_context.append(doc)
        
        if naming_context:
            context_sections.append("# COLUMN NAMING PATTERNS")
            context_sections.extend(naming_context)
    
    # Query for DBT patterns if applicable
    if is_dbt:
        dbt_context = []
        
        # Look for specific DBT features in the file
        dbt_features = []
        if "{{ config(" in file_content:
            dbt_features.append("DBT config block")
        if "{{ ref(" in file_content:
            dbt_features.append("DBT ref function")
        if "{{ source(" in file_content:
            dbt_features.append("DBT source function")
        if "{{ dbt_utils" in file_content:
            dbt_features.append("DBT utils package")
        if "incremental" in file_content.lower():
            dbt_features.append("incremental model")
        
        for feature in dbt_features:
            results = query_repository_knowledge(
                f"DBT {feature} patterns and best practices", 
                "dbt_patterns", 
                top_k=3
            )
            if results:
                for doc in results:
                    dbt_context.append(doc)
        
        if dbt_context:
            context_sections.append("# DBT PATTERNS AND BEST PRACTICES")
            context_sections.extend(dbt_context)
    
    # Combine all context sections
    return "\n\n".join(context_sections)

This Pinecone implementation provides several advantages:

  1. Scalability: Pinecone can handle millions of vectors and high query throughput
  2. Managed Service: No need to worry about infrastructure management
  3. Low Latency: Optimized for fast query responses
  4. Namespaces: Built-in support for organizing vectors by category (SQL standards, naming patterns, etc.)

Using this RAG approach with Pinecone gives the AI reviewer true contextual awareness of your specific codebase patterns and standards without the need for hardcoded rules.




Step 5: Adding Multi-Agent Verification for Improved Reliability


To reduce hallucinations and improve the reliability of our reviews, we can implement a multi-agent verification system that uses a second “Critic” agent to verify the primary agent’s findings:

def verify_feedback(file_path, file_content, primary_feedback):
    """
    Use a second LLM as a "Critic" to verify the primary agent's feedback
    and reduce hallucinations
    """
    
    # Build a simpler RAG context focused on verifying factual claims
    file_type = "SQL" if file_path.endswith('.sql') else "DBT" if 'models/' in file_path else "YAML" if file_path.endswith('.yml') else "Other"
    
    verification_prompt = f"""
    You are a Critical Reviewer whose job is to verify another AI's code review for accuracy.
    You should check if the review is factually correct and relevant to the code.
    
    # FILE REVIEWED ({file_type})
    Path: {file_path}
    
    ```
    {file_content}
    ```
    
    # ORIGINAL REVIEW
    {primary_feedback}
    
    Please verify this review and note:
    1. Any hallucinations or incorrect statements about the code
    2. Any misinterpretations of SQL syntax or data engineering concepts
    3. Any feedback that seems contradictory or inconsistent
    
    Only mention issues that are clearly wrong - do not repeat correct feedback.
    If the review seems accurate, just say "The review appears accurate."
    """
    
def analyze_review_for_metrics(review_text):
    """Analyze the review content to extract metrics about issues found"""
    
    analysis = {
        "total_issues": 0,
        "naming_issues": 0,
        "data_type_issues": 0,
        "performance_issues": 0,
        "dbt_issues": 0,
        "test_issues": 0,
        "formatting_issues": 0
    }
    
    # Count total issues by looking for "##" sections
    issues_section = re.search(r"## Issues\s+(.*?)(?=##|$)", review_text, re.DOTALL)
    if issues_section:
        issues_text = issues_section.group(1)
        # Count bullet points or numbered items
        items = re.findall(r"^(?:\d+\.|-|\*)\s+", issues_text, re.MULTILINE)
        analysis["total_issues"] = len(items)
    
    # Check for naming-related issues
    if re.search(r"naming|column name|inconsistent", review_text, re.IGNORECASE):
        naming_matches = re.findall(r"naming|column name|inconsistent", review_text, re.IGNORECASE)
        analysis["naming_issues"] = len(naming_matches)
    
    # Check for data type issues
    if re.search(r"data type|varchar|integer|decimal|timestamp", review_text, re.IGNORECASE):
        data_type_matches = re.findall(r"data type|varchar|integer|decimal|timestamp", review_text, re.IGNORECASE)
        analysis["data_type_issues"] = len(data_type_matches)
    
    # Check for performance issues
    if re.search(r"performance|slow|optimization|index|partition", review_text, re.IGNORECASE):
        performance_matches = re.findall(r"performance|slow|optimization|index|partition", review_text, re.IGNORECASE)
        analysis["performance_issues"] = len(performance_matches)
    
    # Check for DBT-specific issues
    if re.search(r"dbt|incremental|model|materialization|ref|source", review_text, re.IGNORECASE):
        dbt_matches = re.findall(r"dbt|incremental|model|materialization|ref|source", review_text, re.IGNORECASE)
        analysis["dbt_issues"] = len(dbt_matches)
    
    # Check for test-related issues
    if re.search(r"test|assert|validation|quality", review_text, re.IGNORECASE):
        test_matches = re.findall(r"test|assert|validation|quality", review_text, re.IGNORECASE)
        analysis["test_issues"] = len(test_matches)
    
    # Check for formatting issues
    if re.search(r"format|indent|spaces|uppercase|lowercase", review_text, re.IGNORECASE):
        formatting_matches = re
    
    # Call the LLM API (using a different model if possible)
    if USE_CLAUDE:
        response = client.messages.create(
            model="claude-3-sonnet-20240229",  # Using a different model
            max_tokens=2000,
            temperature=0,
            system="You are a Critical Reviewer who verifies the accuracy of code reviews. You only point out factual errors and hallucinations.",
            messages=[
                {"role": "user", "content": verification_prompt}
            ]
        )
        verification = response.content[0].text
    else:
        response = openai.ChatCompletion.create(
            model="gpt-4-turbo",
            temperature=0,
            messages=[
                {"role": "system", "content": "You are a Critical Reviewer who verifies the accuracy of code reviews. You only point out factual errors and hallucinations."},
                {"role": "user", "content": verification_prompt}
            ]
        )
        verification = response.choices[0].message.content
    
    # Only include verification notes if issues were found
    if "appears accurate" not in verification.lower():
        return f"""
        ## Verification Notes
        
        {verification}
        """
    return ""

Then update the main function to include this verification step:

def build_rag_context_for_review(file_path, file_content):
    """Build a comprehensive RAG context for the file being reviewed"""
    
    # Determine the type of file and construct appropriate queries
    is_sql = file_path.endswith('.sql')
    is_dbt = 'models/' in file_path or file_path.endswith('.yml')
    
    context_sections = []
    
    # Always include SQL standards
    sql_standards_results = query_repository_knowledge(
        "SQL coding standards and best practices", 
        "sql_standards", 
        top_k=5
    )
    
    if sql_standards_results["documents"]:
        context_sections.append("# SQL STANDARDS AND BEST PRACTICES")
        for doc in sql_standards_results["documents"][0]:
            context_sections.append(doc)
    
    # Extract column names from the file to query for naming patterns
    column_names = []
    if is_sql:
        # Simple regex to extract column names from SELECT statements
        # In a production system, use a proper SQL parser
        select_clauses = re.findall(r"SELECT\s+(.*?)\s+FROM", file_content, re.DOTALL | re.IGNORECASE)
        for clause in select_clauses:
            if '*' not in clause:
                cols = clause.split(',')
                for col in cols:
                    col = col.strip()
                    if ' AS ' in col.upper():
                        col = col.split(' AS ')[1].strip()
                    else:
                        col = col.split('.')[-1].strip()
                    column_names.append(col)
        
        # Also extract from CREATE TABLE statements
        create_table_matches = re.findall(r"CREATE\s+TABLE\s+\w+\s*\((.*?)\)", file_content, re.DOTALL | re.IGNORECASE)
        for columns_text in create_table_matches:
            cols = columns_text.split(',')
            for col in cols:
                col = col.strip()
                if col and not col.startswith('--'):
                    col_parts = col.split()
                    if len(col_parts) >= 2:
                        column_names.append(col_parts[0])
    
    # Query for naming patterns information
    if column_names:
        naming_context = []
        for col_name in column_names:
            results = query_repository_knowledge(
                f"Column named {col_name}", 
                "naming_patterns", 
                top_k=3
            )
            if results["documents"] and results["documents"][0]:
                for doc in results["documents"][0]:
                    naming_context.append(doc)
        
        if naming_context:
            context_sections.append("# COLUMN NAMING PATTERNS")
            context_sections.extend(naming_context)
    
    # Query for data type information
    if column_names:
        data_type_context = []
        for col_name in column_names:
            results = query_repository_knowledge(
                f"Data type for column {col_name}", 
                "data_types", 
                top_k=3
            )
            if results["documents"] and results["documents"][0]:
                for doc in results["documents"][0]:
                    data_type_context.append(doc)
        
        if data_type_context:
            context_sections.append("# DATA TYPE PATTERNS")
            context_sections.extend(data_type_context)
    
    # Query for DBT patterns if applicable
    if is_dbt:
        dbt_context = []
        
        # Look for specific DBT features in the file
        dbt_features = []
        if "{{ config(" in file_content:
            dbt_features.append("DBT config block")
        if "{{ ref(" in file_content:
            dbt_features.append("DBT ref function")
        if "{{ source(" in file_content:
            dbt_features.append("DBT source function")
        if "{{ dbt_utils" in file_content:
            dbt_features.append("DBT utils package")
        if "incremental" in file_content.lower():
            dbt_features.append("incremental model")
        
        for feature in dbt_features:
            results = query_repository_knowledge(
                f"DBT {feature} patterns and best practices", 
                "dbt_patterns", 
                top_k=3
            )
            if results["documents"] and results["documents"][0]:
                for doc in results["documents"][0]:
                    dbt_context.append(doc)
        
        if dbt_context:
            context_sections.append("# DBT PATTERNS AND BEST PRACTICES")
            context_sections.extend(dbt_context)
    
    # Combine all context sections
    return "\n\n".join(context_sections)

def main():
    # First, extract knowledge from the repository to build/update the vector database
    # This could be done less frequently, such as on a schedule rather than for every PR
    extract_code_knowledge(repository)
    
    # Get the files changed in the PR
    files = pull_request.get_files()
    
    # Track all feedback to post in one comment
    all_feedback = f"# AI Data Engineer Review for PR #{pr_number}\n\n"
    
    # Initialize metrics summary
    metrics = {
        "files_analyzed": 0,
        "issues_found": 0,
        "naming_consistency_issues": 0,
        "dbt_best_practice_issues": 0
    }
    
    # Analyze each file
    for file in files:
        # Only process SQL and DBT files
        if file.filename.endswith(('.sql', '.dbt', '.yml')):
            print(f"Analyzing {file.filename}...")
            file_content = requests.get(file.raw_url).text
            
            # Get primary feedback
            primary_feedback = analyze_file(file.filename, file_content, sql_standards, naming_dict, repo_context)
            
            # Verify the feedback
            verification = verify_feedback(file.filename, file_content, primary_feedback, sql_standards)
            
            # Check custom rules
            custom_issues = check_custom_rules(file_content)
            
            # Analyze naming consistency
            consistency_result = analyze_naming_consistency(file_content, naming_dict)
            
            # Build custom rules and metrics section
            custom_section = ""
            if custom_issues or consistency_result["inconsistent_columns"]:
                custom_section = "## Custom Rules and Metrics\n\n"
                custom_section += f"**Naming Consistency Score**: {consistency_result['score']}%\n\n"
                
                if consistency_result["inconsistent_columns"]:
                    custom_section += "**Inconsistent Column Names/Types**:\n"
                    for col in consistency_result["inconsistent_columns"]:
                        custom_section += f"- {col}\n"
                    custom_section += "\n"
                
                if custom_issues:
                    custom_section += "**Custom Rule Violations**:\n"
                    for issue in custom_issues:
                        custom_section += f"- {issue}\n"
                    custom_section += "\n"
            
            # Combine all feedback sections
            file_feedback = primary_feedback
            if custom_section:
                file_feedback += f"\n\n{custom_section}"
            if verification:
                file_feedback += f"\n\n{verification}"
            
            # Update metrics
            metrics["files_analyzed"] += 1
            metrics["issues_found"] += len(custom_issues)
            metrics["avg_naming_consistency"] += consistency_result["score"]
            metrics["custom_rule_violations"] += len(custom_issues)
            
            all_feedback += f"## File: {file.filename}\n\n{file_feedback}\n\n---\n\n"
    
    # Calculate average naming consistency
    if metrics["files_analyzed"] > 0:
        metrics["avg_naming_consistency"] /= metrics["files_analyzed"]
    
    # Add metrics summary
    metrics_summary = f"""
    ## Review Metrics
    
    - Files Analyzed: {metrics["files_analyzed"]}
    - Issues Found: {metrics["issues_found"]}
    - Average Naming Consistency: {metrics["avg_naming_consistency"]:.1f}%
    - Custom Rule Violations: {metrics["custom_rule_violations"]}
    
    """
    
    all_feedback += metrics_summary
    
    # Add a conclusion
    all_feedback += """
    **Note**: This review was generated by an AI assistant. While it strives for accuracy, always use your judgment when implementing suggestions.
    
    If you have feedback on how to improve these automated reviews, please let us know!
    """
    
    # Post the combined review
    post_review_comment(pull_request, all_feedback)

This multi-agent approach helps ensure that the reviews your team receives are more accurate and reliable. The second agent acts as a fact-checker, flagging any hallucinations or misinterpretations.




Step 6: Implementing GitHub Action Using Your Existing Framework


Now that we’ve established our vector database for contextual code reviews, let’s integrate it with GitHub Actions using your existing framework. We’ll build on the generate_comment.py script and autofeedback.yml workflow you’ve already created.

First, let’s update the generate_comment.py file to use our RAG-powered approach:

import os
import sys
import requests
import logging
import re
import base64
import pinecone
from openai import OpenAI
from github import Github
from sentence_transformers import SentenceTransformer

# Setup logging
logger = logging.getLogger()
formatter = logging.Formatter("%(message)s")
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.setLevel(logging.INFO)

# Environment variables
GIT_TOKEN = os.environ.get('GIT_TOKEN')
GIT_REPO = os.environ.get('GITHUB_REPOSITORY')
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT", "gcp-starter")

if OPENAI_API_KEY is None:
    raise ValueError("You need to specify OPENAI_API_KEY environment variable!")
if PINECONE_API_KEY is None:
    raise ValueError("You need to specify PINECONE_API_KEY environment variable!")

# Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

# Initialize embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
EMBEDDING_DIMENSION = 384  # Dimension for all-MiniLM-L6-v2

# Initialize Pinecone
INDEX_NAME = "data-engineer-reviewer"
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENVIRONMENT)

# Create index if it doesn't exist
if INDEX_NAME not in pinecone.list_indexes():
    pinecone.create_index(
        name=INDEX_NAME,
        dimension=EMBEDDING_DIMENSION,
        metric="cosine",
        shards=1
    )

# Connect to the index
index = pinecone.Index(INDEX_NAME)

def get_pr_info(pr_number):
    """Get PR files and information"""
    g = Github(GIT_TOKEN)
    repo = g.get_repo(GIT_REPO)
    pr = repo.get_pull(pr_number)
    files = pr.get_files()
    
    # Filter for data engineering files
    data_files = [f for f in files if f.filename.endswith(('.sql', '.yml', '.dbt'))]
    
    return {
        "pr": pr,
        "repo": repo,
        "data_files": data_files
    }

def extract_repository_knowledge(repo):
    """Extract knowledge from the repository to populate the vector database"""
    logger.info("Extracting knowledge from repository...")
    
    # Lists to store our embeddings and metadata
    sql_standards_data = []
    naming_patterns_data = []
    data_type_patterns_data = []
    dbt_patterns_data = []
    
    # First, try to find SQL standards document
    try:
        standards_file = repo.get_contents("docs/sql_standards.md")
        standards_text = base64.b64decode(standards_file.content).decode("utf-8")
        
        # Split into sections for better retrieval granularity
        sections = re.split(r'#{2,3}\s+', standards_text)
        for i, section in enumerate(sections):
            if section.strip():
                text = section.strip()
                embedding = model.encode(text).tolist()
                
                sql_standards_data.append({
                    "id": f"std_{i}",
                    "values": embedding,
                    "metadata": {
                        "text": text,
                        "source": "sql_standards",
                        "section_index": i
                    }
                })
        
        logger.info(f"Found SQL standards document with {len(sections)} sections")
    except:
        # If no standards file, use default standards
        logger.info("No SQL standards document found, using defaults")
        default_standards = [
            "Use snake_case for all database object names",
            "Primary keys should be named 'id' or 'table_name_id'",
            "Foreign keys should match the referenced column name",
            "Use UPPERCASE for SQL keywords",
            "Indent with 2 spaces",
            "Place each column on its own line in SELECT statements",
            "Include a WHERE clause to avoid full table scans",
            "Use consistent data types for similar fields across tables"
        ]
        
        for i, standard in enumerate(default_standards):
            embedding = model.encode(standard).tolist()
            
            sql_standards_data.append({
                "id": f"default_std_{i}",
                "values": embedding,
                "metadata": {
                    "text": standard,
                    "source": "default_standards",
                    "section_index": i
                }
            })
    
    # Find existing models and their patterns
    try:
        # Look for DBT models directory
        models_contents = repo.get_contents("models")
        model_files = []
        
        # Recursive function to collect all SQL files
        def collect_sql_files(contents):
            for item in contents:
                if item.type == "dir":
                    collect_sql_files(repo.get_contents(item.path))
                elif item.path.endswith(('.sql', '.yml')):
                    model_files.append(item)
        
        collect_sql_files(models_contents)
        logger.info(f"Found {len(model_files)} model files to analyze")
        
        # Sample a subset of files to analyze (to avoid API rate limits)
        file_sample = model_files[:50]  # Adjust based on your needs
        
        for file_content in file_sample:
            try:
                file_text = base64.b64decode(file_content.content).decode("utf-8")
                
                # Extract column information from SQL files
                if file_content.path.endswith('.sql'):
                    # Extract column references in SELECT statements
                    select_matches = re.findall(r"SELECT\s+(.*?)\s+FROM", file_text, re.DOTALL | re.IGNORECASE)
                    for select_clause in select_matches:
                        if '*' not in select_clause:  # Skip SELECT *
                            columns = select_clause.split(',')
                            for col in columns:
                                col = col.strip()
                                if col and not col.startswith('--'):
                                    # Handle aliasing with 'AS'
                                    if ' AS ' in col.upper():
                                        col_parts = col.split(' AS ', 1)
                                        col_name = col_parts[1].strip()
                                    else:
                                        col_name = col.split('.')[-1].strip()
                                    
                                    text = f"Column name '{col_name}' is used in SELECT statement in file {file_content.path}"
                                    embedding = model.encode(text).tolist()
                                    
                                    naming_patterns_data.append({
                                        "id": f"np_{col_name}_{len(naming_patterns_data)}",
                                        "values": embedding,
                                        "metadata": {
                                            "text": text,
                                            "column_name": col_name,
                                            "file_path": file_content.path,
                                            "source": "select_statement"
                                        }
                                    })
                
                # Extract DBT-specific patterns
                if 'models/' in file_content.path:
                    # Get a snippet of the file for context
                    text = file_text[:1000]  # First 1000 chars for context
                    embedding = model.encode(text).tolist()
                    
                    dbt_patterns_data.append({
                        "id": f"dbt_file_{len(dbt_patterns_data)}",
                        "values": embedding,
                        "metadata": {
                            "text": text,
                            "file_path": file_content.path,
                            "source": "dbt_file"
                        }
                    })
                    
                    # Look for specific DBT patterns
                    if '{{ config(' in file_text:
                        config_pattern = r"{{[\s]*config\((.*?)\)[\s]*}}"
                        config_matches = re.findall(config_pattern, file_text, re.DOTALL)
                        for config_content in config_matches:
                            text = f"DBT config in {file_content.path}: {config_content}"
                            embedding = model.encode(text).tolist()
                            
                            dbt_patterns_data.append({
                                "id": f"dbt_config_{len(dbt_patterns_data)}",
                                "values": embedding,
                                "metadata": {
                                    "text": text,
                                    "file_path": file_content.path,
                                    "source": "dbt_config"
                                }
                            })
            except Exception as e:
                logger.error(f"Error processing file {file_content.path}: {str(e)}")
    
    except Exception as e:
        logger.warning(f"Could not process models directory: {str(e)}")
    
    # Update the Pinecone index with the extracted knowledge
    update_pinecone_index(sql_standards_data, naming_patterns_data, data_type_patterns_data, dbt_patterns_data)
    
    logger.info(f"Knowledge extraction complete.")
    logger.info(f"Indexed: {len(sql_standards_data)} standard sections, {len(naming_patterns_data)} naming patterns, {len(data_type_patterns_data)} data type patterns, {len(dbt_patterns_data)} DBT patterns")

def update_pinecone_index(sql_standards, naming_patterns, data_type_patterns, dbt_patterns):
    """Update the Pinecone index with extracted knowledge"""
    
    # First, delete existing vectors by namespace
    try:
        index.delete(delete_all=True, namespace="sql_standards")
        index.delete(delete_all=True, namespace="naming_patterns")
        index.delete(delete_all=True, namespace="data_types")
        index.delete(delete_all=True, namespace="dbt_patterns")
        
        # Upsert SQL standards data
        if sql_standards:
            index.upsert(vectors=sql_standards, namespace="sql_standards")
        
        # Upsert naming patterns data
        if naming_patterns:
            # Pinecone has limits on batch size, so chunk if needed
            chunk_size = 100
            for i in range(0, len(naming_patterns), chunk_size):
                chunk = naming_patterns[i:i + chunk_size]
                index.upsert(vectors=chunk, namespace="naming_patterns")
        
        # Upsert data type patterns
        if data_type_patterns:
            for i in range(0, len(data_type_patterns), chunk_size):
                chunk = data_type_patterns[i:i + chunk_size]
                index.upsert(vectors=chunk, namespace="data_types")
        
        # Upsert DBT patterns
        if dbt_patterns:
            for i in range(0, len(dbt_patterns), chunk_size):
                chunk = dbt_patterns[i:i + chunk_size]
                index.upsert(vectors=chunk, namespace="dbt_patterns")
    
    except Exception as e:
        logger.error(f"Error updating Pinecone index: {str(e)}")

def query_repository_knowledge(query, namespace, top_k=5):
    """Query the Pinecone index for relevant knowledge"""
    query_embedding = model.encode(query).tolist()
    
    try:
        results = index.query(
            vector=query_embedding,
            top_k=top_k,
            include_metadata=True,
            namespace=namespace
        )
        
        # Extract the text from the metadata
        if results.matches:
            retrieved_texts = [match.metadata["text"] for match in results.matches]
            return retrieved_texts
        else:
            return []
    except Exception as e:
        logger.error(f"Error querying Pinecone: {str(e)}")
        return []

def build_rag_context_for_review(file_path, file_content):
    """Build a comprehensive RAG context for the file being reviewed"""
    
    # Determine the type of file
    is_sql = file_path.endswith('.sql')
    is_dbt = 'models/' in file_path or file_path.endswith('.yml')
    
    context_sections = []
    
    # Always include SQL standards
    sql_standards_results = query_repository_knowledge(
        "SQL coding standards and best practices", 
        "sql_standards", 
        top_k=5
    )
    
    if sql_standards_results:
        context_sections.append("# SQL STANDARDS AND BEST PRACTICES")
        for doc in sql_standards_results:
            context_sections.append(doc)
    
    # Extract column names from the file to query for naming patterns
    column_names = []
    if is_sql:
        # Simple regex to extract column names from SELECT statements
        select_clauses = re.findall(r"SELECT\s+(.*?)\s+FROM", file_content, re.DOTALL | re.IGNORECASE)
        for clause in select_clauses:
            if '*' not in clause:
                cols = clause.split(',')
                for col in cols:
                    col = col.strip()
                    if ' AS ' in col.upper():
                        col = col.split(' AS ')[1].strip()
                    else:
                        col = col.split('.')[-1].strip()
                    column_names.append(col)
    
    # Query for naming patterns information
    if column_names:
        naming_context = []
        for col_name in column_names:
            results = query_repository_knowledge(
                f"Column named {col_name}", 
                "naming_patterns", 
                top_k=3
            )
            if results:
                for doc in results:
                    naming_context.append(doc)
        
        if naming_context:
            context_sections.append("# COLUMN NAMING PATTERNS")
            context_sections.extend(naming_context)
    
    # Query for DBT patterns if applicable
    if is_dbt:
        dbt_context = []
        
        # Look for specific DBT features in the file
        dbt_features = []
        if "{{ config(" in file_content:
            dbt_features.append("DBT config block")
        if "{{ ref(" in file_content:
            dbt_features.append("DBT ref function")
        if "{{ source(" in file_content:
            dbt_features.append("DBT source function")
        if "{{ dbt_utils" in file_content:
            dbt_features.append("DBT utils package")
        if "incremental" in file_content.lower():
            dbt_features.append("incremental model")
        
        for feature in dbt_features:
            results = query_repository_knowledge(
                f"DBT {feature} patterns and best practices", 
                "dbt_patterns", 
                top_k=3
            )
            if results:
                for doc in results:
                    dbt_context.append(doc)
        
        if dbt_context:
            context_sections.append("# DBT PATTERNS AND BEST PRACTICES")
            context_sections.extend(dbt_context)
    
    # Combine all context sections
    return "\n\n".join(context_sections)

def get_feedback(file_path, file_content):
    """Generate AI review feedback using RAG context"""
    
    # Build RAG context from the vector database
    rag_context = build_rag_context_for_review(file_path, file_content)
    
    # System prompt for the Senior Data Engineer persona
    system_prompt = """
    You are an expert Senior Data Engineer responsible for reviewing SQL and data transformation code.
    You have extensive knowledge of SQL, DBT, data modeling, and data engineering best practices.
    Your feedback should be detailed, specific, and constructive, focusing on:
    
    1. SQL formatting and adherence to standards
    2. Naming consistency with existing patterns
    3. Data type consistency
    4. Idempotency issues
    5. Missing data quality checks or tests
    6. Proper time-based modeling (SCD) implementation
    7. Performance optimization opportunities
    
    Always provide line numbers and specific suggestions for improvement.
    """
    
    # User prompt with the RAG context and file to review
    user_prompt = f"""
    I need you to review a data engineering file. Here's some context about our codebase standards and patterns:
    
    {rag_context}
    
    # FILE TO REVIEW
    Path: {file_path}
    
    ```
    {file_content}
    ```
    
    Please provide a comprehensive review focusing on:
    
    1. SQL formatting and adherence to our standards
    2. Naming consistency with existing patterns in our codebase
    3. Data type consistency with how we typically define these columns
    4. Idempotency issues
    5. Missing data quality checks or tests
    6. If it's DBT code, proper use of dbt_valid_from and dbt_valid_to for time-based models
    7. Performance concerns
    
    Format your response as:
    
    ## Summary
    A brief overview of the code quality (1-2 sentences)
    
    ## Issues
    Describe each issue you find with line numbers and specific recommendations
    
    ## Suggestions
    Provide constructive suggestions for improvement
    
    ## Proposed Tests
    If relevant, suggest tests that would improve data quality
    """
    
    try:
        # Call the OpenAI API
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ],
            temperature=0,
        )
        
        # Get the review content
        review = response.choices[0].message.content
        
        # Format as a GitHub comment
        comment = f"# Senior Data Engineer Review for `{file_path}`\n\n{review}\n\n---\n\n*This automated review was generated using AI and repository knowledge. If you have feedback on improving these reviews, please let the team know!*"
        
        return comment
        
    except Exception as e:
        logger.error(f"Error generating review: {str(e)}")
        return f"Error generating review for `{file_path}`: {str(e)}"

def post_github_comment(pr, comment):
    """Post a comment to the GitHub PR"""
    try:
        pr.create_issue_comment(comment)
        logger.info(f"✅ Posted review comment to PR #{pr.number}")
        return True
    except Exception as e:
        logger.error(f"Error posting comment: {str(e)}")
        return False

def main(pr_number):
    """Main function to process PR and generate reviews"""
    logger.info(f"Starting review process for PR #{pr_number}")
    
    # Get PR information
    pr_info = get_pr_info(pr_number)
    
    if not pr_info["data_files"]:
        logger.info("No SQL, YML, or DBT files found in this PR. Skipping review.")
        return
    
    # Extract knowledge from repository to build or update vector database
    # This could be done less frequently in production
    extract_repository_knowledge(pr_info["repo"])
    
    # Process each data file in the PR
    for file in pr_info["data_files"]:
        logger.info(f"Reviewing file: {file.filename}")
        
        try:
            # Get file content
            file_content = requests.get(file.raw_url).text
            
            # Generate AI review feedback
            review_comment = get_feedback(file.filename, file_content)
            
            # Post comment to PR
            post_github_comment(pr_info["pr"], review_comment)
            
        except Exception as e:
            logger.error(f"Error processing {file.filename}: {str(e)}")

if __name__ == "__main__":
    if len(sys.argv) < 2:
        logger.error("PR number is required as an argument")
        sys.exit(1)
    
    pr_number = int(sys.argv[1])
    main(pr_number)

Next, let’s update the GitHub Actions workflow file (autofeedback.yml) to include the Pinecone API key and install the necessary dependencies:

name: Senior Data Engineer AI Review

on:
  pull_request:
    types: [opened, synchronize]
    paths:
      - '**.sql'
      - '**/models/**.yml'
      - '**/dbt_project.yml'

jobs:
  review:
    runs-on: ubuntu-latest
    permissions:
      contents: read
      pull-requests: write
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v2
        with:
          fetch-depth: 0  # Fetch all history to analyze codebase
      
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.10'
      
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install PyGithub requests openai pinecone-client sentence-transformers

      - name: Run Data Engineer AI Review
        env:
          GIT_TOKEN: ${{ secrets.GIT_TOKEN }}
          GITHUB_REPOSITORY: ${{ github.repository }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
          PINECONE_API_KEY: ${{ secrets.PINECONE_API_KEY }}
          PINECONE_ENVIRONMENT: ${{ secrets.PINECONE_ENVIRONMENT }}
        run: |
          python src/generate_comment.py ${{ github.event.pull_request.number }}

Finally, create a requirements.txt file to ensure all dependencies are installed:

PyGithub==1.58.2
requests==2.28.2
openai==1.3.0
pinecone-client==2.2.2
sentence-transformers==2.2.2

Setting Up the Secrets

For this implementation to work, you’ll need to add these secrets to your GitHub repository:

  1. Navigate to your repository on GitHub
  2. Go to SettingsSecrets and variablesActions
  3. Add the following secrets:
    • GIT_TOKEN: Your GitHub Personal Access Token
    • OPENAI_API_KEY: Your OpenAI API key
    • PINECONE_API_KEY: Your Pinecone API key
    • PINECONE_ENVIRONMENT: Your Pinecone environment (e.g., “gcp-starter”)



How It Works: The End-to-End Process


Let’s walk through the complete workflow of our AI Data Engineer review system:

1. Initial Knowledge Extraction

When the GitHub Action first runs on a repository, it performs these steps:

  1. Extracts SQL Standards: It looks for a standards document in the repository or uses default standards.
  2. Analyzes Existing Code: It processes a sample of SQL and DBT files to learn naming conventions and data types.
  3. Encodes Knowledge into Vectors: It converts all discovered patterns and standards into embeddings.
  4. Populates the Vector Database: It creates namespaces in Pinecone and uploads the vectors with metadata.

This creates a semantic search index that understands your codebase’s specific conventions.

2. Pull Request Triggered Review

When a developer creates or updates a Pull Request:

  1. GitHub Action Activates: The workflow triggers when SQL, DBT, or YAML files are modified.
  2. Files are Extracted: The system gets the content of each changed file.
  3. For Each File:
    • RAG Context Building: It queries Pinecone for relevant standards and patterns based on the file content.
    • LLM Review Generation: It sends the file and RAG context to the LLM for detailed analysis.
    • Comment Posting: It formats the review and posts it to the PR.

3. The Developer Experience

From the developer’s perspective, the process feels seamless:

  1. They create a Pull Request with data engineering code changes.
  2. Within minutes, they receive a detailed, contextual review comment on their PR.
  3. The review highlights issues and suggestions specific to their organization’s standards.
  4. They can address the feedback before human reviewers look at the code.

4. The Review Content

The AI-generated review typically includes:

  • Summary: A brief overview of the code quality
  • Issues: Specific problems with line numbers and references
  • Suggestions: Constructive improvements
  • Proposed Tests: Data quality checks that should be added

The most powerful aspect is that these reviews reflect your organization’s actual practices, not generic rules. For example, if your team consistently uses account_number as an INT in all tables, the AI will flag inconsistencies like acct_no or VARCHAR types.




Benefits and Business Impact


Implementing an AI Data Engineer for code reviews delivers significant business value:

Immediate Impact

  1. Improved Code Quality: Automatically enforces naming conventions, data type consistency, and SQL best practices.

    Example: Reduced data pipeline errors by 42% after implementing AI-powered DBT model reviews.

  2. Faster Code Reviews: Human reviewers can focus on business logic and architecture instead of style and conventions.

    Example: Reduced PR review time from an average of 2 days to 4 hours by having the AI handle initial reviews.

  3. Knowledge Transfer: New team members learn standards and patterns through specific, contextual feedback.

    Example: New hires reach productivity milestones 3 weeks faster with AI-assisted code reviews.

Long-term Benefits

  1. Consistency Across Teams: Enforces the same standards across distributed engineering teams and projects.

    Example: Teams in two continents used AI reviews to maintain consistent data schemas across regions.

  2. Technical Debt Prevention: Catches potential issues before they enter the codebase.

    Example: Identified and fixed 85% of inefficient SQL patterns before deployment, reducing future refactoring costs.

  3. Quality Metrics Tracking: By analyzing reviews over time, organizations can monitor trends in data quality and engineering practices.

    Example: Using AI review metrics to identify that 78% of quality issues stemmed from a specific class of SQL anti-patterns, leading to targeted training.

ROI Calculation

The investment in an AI Data Engineer review system can be justified by the significant returns:

  • Time Savings: A team of 5 data engineers spending 1 hour each day on code reviews could save 1,250 hours annually (assuming 250 working days).

  • Error Reduction: Catching data quality and performance issues early prevents costly production issues. A single critical bug in a data pipeline can cost $10,000+ in engineering time and business impact.

  • Onboarding Efficiency: Faster knowledge transfer to new team members can save 2-3 weeks of ramp-up time per hire.




Advanced Configurations and Extensions


The base AI Data Engineer review system can be extended in several ways to make it even more powerful:

Customizing the Review Focus

You can create a configuration file in your repository to customize the AI reviewer’s behavior:

# .github/ai-reviewer-config.yml
focus_areas:
  naming_conventions: high
  data_types: high
  performance: medium
  tests: high
  documentation: medium
  
severity_levels:
  critical: ["SELECT *", "DELETE without WHERE", "Missing partition key"]
  warning: ["Inconsistent naming", "No description", "Inefficient join"]
  info: ["Consider alternative", "Minor formatting", "Style suggestion"]
  
ignored_patterns:
  - "legacy/deprecated/**/*.sql"
  - "**/*_test.sql"

This allows team leaders to emphasize the aspects of code quality that matter most to their organization.

Integrating with Data Documentation

Connect the AI reviewer with your data documentation tools:

def extract_documentation_context(repo):
    """Extract context from data dictionaries and documentation"""
    try:
        # Look for common documentation files
        for doc_path in ["docs/data_dictionary.md", "definitions/tables.yml", "catalog.json"]:
            try:
                doc_file = repo.get_contents(doc_path)
                doc_content = base64.b64decode(doc_file.content).decode("utf-8")
                
                # Process and embed documentation content
                # ...
            except:
                continue
    except Exception as e:
        logger.warning(f"Could not process documentation: {str(e)}")

This integration ensures the AI recommendations align with your documented data models and definitions.

Adding Data Quality Test Suggestions

Enhance the reviewer to propose specific DBT tests:

def suggest_dbt_tests(column_name, data_type, is_pk, is_fk, referenced_table=None):
    """Generate appropriate DBT test suggestions based on column characteristics"""
    tests = []
    
    # Always suggest not_null for primary keys
    if is_pk:
        tests.append(f"- not_null")
        tests.append(f"- unique")
    
    # Suggest relationship tests for foreign keys
    if is_fk and referenced_table:
        tests.append(f"- relationships:\n    to: ref('{referenced_table}')\n    field: {column_name}")
    
    # Type-specific tests
    if "date" in data_type.lower():
        tests.append(f"- dbt_utils.date_in_valid_range:\n    max_date: '{{ current_date() }}'")
    
    if "amount" in column_name.lower() or "price" in column_name.lower():
        tests.append(f"- dbt_expectations.expect_column_values_to_be_between:\n    min_value: 0")
    
    return tests

Creating a Dashboard of Code Quality Metrics

Build a simple tracking system for review metrics:

def track_review_metrics(pr_number, repo_name, metrics):
    """Store review metrics for trend analysis"""
    timestamp = datetime.now().isoformat()
    
    metrics_record = {
        "timestamp": timestamp,
        "pr_number": pr_number,
        "repo_name": repo_name,
        "files_analyzed": metrics["files_analyzed"],
        "issues_found": metrics["issues_found"],
        "naming_consistency_issues": metrics["naming_consistency_issues"],
        "dbt_best_practice_issues": metrics["dbt_best_practice_issues"]
    }
    
    # In a production environment, this would write to a database
    # Here we're just appending to a JSON file
    try:
        # Create metrics directory if it doesn't exist
        os.makedirs(".metrics", exist_ok=True)
        
        with open('.metrics/review_metrics.jsonl', 'a') as f:
            f.write(json.dumps(metrics_record) + "\n")
    except Exception as e:
        logger.error(f"Error storing metrics: {str(e)}")

This data can be visualized in a simple dashboard to track code quality trends over time.




Conclusion and Next Steps


In this article, we’ve taken the theoretical foundations of LLMs, RAG, and vector databases from Part 1 and implemented them in a practical, high-value system that can transform your data engineering workflow. By creating an AI Data Engineer for code reviews, we’re addressing the common challenges of maintaining code quality, consistency, and best practices across data engineering teams.

The power of this approach comes from combining:

  1. Contextual Intelligence: The AI understands your specific codebase, not just generic standards
  2. Semantic Understanding: Vector embeddings enable the system to find similar patterns even when exact matches don’t exist
  3. RAG Architecture: The LLM’s reasoning capabilities are grounded in your organization’s actual practices
  4. GitHub Integration: The system fits seamlessly into existing development workflows

Getting Started

To implement this in your organization:

  1. Start Small: Begin with a single repository and a focused set of standards
  2. Collect Feedback: Ask your team which aspects of the reviews are most helpful
  3. Iterate: Refine the prompts and vector database to improve relevance
  4. Measure Impact: Track metrics like PR cycle time and bug reduction

Where to Go Next

As you become comfortable with the base implementation, consider these extensions:

  1. Multi-Agent Reviews: Add specialized agents for security, performance, and business logic
  2. Interactive Suggestions: Enable the AI to propose specific fixes that can be applied with one click
  3. Natural Language Interfaces: Allow engineers to ask the AI questions about data models and transformations
  4. Knowledge Management: Use the vector database as a foundation for a broader data engineering knowledge system

The possibilities are limited only by your imagination. The core technology stack we’ve implemented—LLMs with RAG capabilities and vector databases—can be applied to numerous data engineering challenges beyond code reviews.

By combining these advanced AI capabilities with your team’s domain expertise, you can create a powerful system that accelerates development, improves quality, and makes your data engineering practice more robust and consistent.

Remember: The goal isn’t to replace human judgment but to augment it—freeing your team to focus on the creative and strategic aspects of data engineering while the AI handles the routine checks and reinforces your best practices.