Close Menu
    Trending
    • TDS Newsletter: How Compelling Data Stories Lead to Better Business Decisions
    • I Measured Neural Network Training Every 5 Steps for 10,000 Iterations
    • “The success of an AI product depends on how intuitively users can interact with its capabilities”
    • How to Crack Machine Learning System-Design Interviews
    • Music, Lyrics, and Agentic AI: Building a Smart Song Explainer using Python and OpenAI
    • An Anthropic Merger, “Lying,” and a 52-Page Memo
    • Apple’s $1 Billion Bet on Google Gemini to Fix Siri
    • Critical Mistakes Companies Make When Integrating AI/ML into Their Processes
    ProfitlyAI
    • Home
    • Latest News
    • AI Technology
    • Latest AI Innovations
    • AI Tools & Technologies
    • Artificial Intelligence
    ProfitlyAI
    Home » Multi-Agent SQL Assistant, Part 2: Building a RAG Manager
    Artificial Intelligence

    Multi-Agent SQL Assistant, Part 2: Building a RAG Manager

    ProfitlyAIBy ProfitlyAINovember 6, 2025No Comments23 Mins Read
    Share Facebook Twitter Pinterest LinkedIn Tumblr Reddit Telegram Email
    Share
    Facebook Twitter LinkedIn Pinterest Email


    in my blog post, I had explored how you can construct a Multi-agent SQL assistant utilizing CrewAI & Streamlit. The consumer may question a SQLite database in pure language. The AI brokers would generate a SQL question primarily based on consumer enter, assessment it, and test for compliance earlier than operating it in opposition to the database to get outcomes. I additionally carried out a human-in-the-loop checkpoint to keep up management and displayed the LLM prices related to each question for transparency and price management. Whereas the prototype was nice and generated good outcomes for my small demo database, I knew this might not be sufficient for real-life databases. Within the earlier setup, I used to be sending the entire database schema as context together with the consumer enter. As database schemas develop bigger, passing the complete schema to the LLM will increase token utilization, slows down response instances, and makes hallucination extra doubtless. I wanted a option to feed solely related schema snippets to the LLM. That is the place RAG (Retrieval Augmented Era) is available in.

    On this weblog publish, I construct a RAG supervisor and add a number of RAG methods to my SQL assistant to match their efficiency on metrics like response time and token utilization. The assistant now helps 4 RAG methods:

    • No RAG: Passes the complete schema (baseline for comparability)
    • Key phrase RAG: Makes use of domain-specific key phrase matching to pick related tables
    • FAISS RAG: Leverages semantic vector similarity through FAISS with all-MiniLM-L6-v2 embeddings
    • Chroma RAG: A persistent vector retailer answer with ChromaDB for scalable production-grade search

    For this venture, I solely centered on RAG strategies which are sensible, light-weight, and cost-efficient (free). You’ll be able to add any variety of implementations on prime and select the very best one in your case. To facilitate experimentation and evaluation, I constructed an interactive efficiency comparability instrument that evaluates token discount, desk rely, response time, and question accuracy throughout all 4 methods.

    Screenshot of App, by Writer

    Constructing a RAG Supervisor

    The rag_manager.py file comprises the complete implementation for the RAG supervisor. First, I created a BaseRAG class – a template that I take advantage of for all my completely different RAG methods. It makes positive each RAG method follows the identical construction. Any new technique could have two issues: a way to fetch the related schema primarily based on the consumer question, and one other methodology that explains what the method is about. By utilizing the summary base class (ABC), I preserve the code clear, modular, and straightforward to increase later.

    from typing import Dict, Record, Any, Optionally available
    from abc import ABC, abstractmethod
    
    class BaseRAG(ABC):
        """Base class for all RAG implementations."""
        
        def __init__(self, db_path: str = DB_PATH):
            self.db_path = db_path
            self.identify = self.__class__.__name__
            
        @abstractmethod
        def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
            """Get related schema for the consumer question."""
            cross
        
        @abstractmethod
        def get_approach_info(self) -> Dict[str, Any]:
            """Get details about this RAG method."""
            cross
    

    No RAG Technique

    That is principally the identical method that I used beforehand, the place I despatched in the complete database schema as context to the LLM with none filtering or optimization. This method is finest for very small schemas (ideally lower than 10 tables).

    class NoRAG(BaseRAG):
        """No RAG - returns full schema."""
        
        def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
            return get_structured_schema(self.db_path)
        
        def get_approach_info(self) -> Dict[str, Any]:
            return {
                "identify": "No RAG (Full Schema)",
                "description": "Makes use of full database schema",
                "execs": ["Simple", "Always complete", "No setup required"],
                "cons": ["High token usage", "Slower for large schemas"],
                "best_for": "Small schemas (< 10 tables)"
            }
    

    Key phrase RAG Technique

    Within the key phrase RAG method, I take advantage of a bunch of predefined key phrases mapped to every desk within the schema. When a consumer asks one thing, the system checks for key phrase matches within the question and picks out solely essentially the most related tables. This fashion, I don’t ship the complete schema to the LLM – saving tokens and dashing issues up. It really works properly when your schema is acquainted and your queries are business-related or comply with widespread patterns.

    The _build_table_keywords(self) methodology is the core of how the key phrase matching logic works. It comprises a hardcoded key phrase mapping for every desk within the schema. It helps to affiliate consumer question phrases (like “gross sales”, “model”, “buyer”) with the almost definitely related tables.

    class KeywordRAG(BaseRAG):
        """Key phrase-based RAG utilizing enterprise context matching."""
        
        def __init__(self, db_path: str = DB_PATH):
            tremendous().__init__(db_path)
            self.table_keywords = self._build_table_keywords()
        
        def _build_table_keywords(self) -> Dict[str, List[str]]:
            """Construct key phrase mappings for every desk."""
            return {
                'merchandise': ['product', 'item', 'catalog', 'price', 'category', 'brand', 'sales', 'sold'],
                'product_variants': ['variant', 'product', 'sku', 'color', 'size', 'brand', 'sales', 'sold'],
                'prospects': ['customer', 'user', 'client', 'buyer', 'person', 'email', 'name'],
                'orders': ['order', 'purchase', 'transaction', 'sale', 'buy', 'total', 'amount', 'sales'],
                'order_items': ['item', 'product', 'quantity', 'line', 'detail', 'sales', 'sold', 'brand'],
                'funds': ['payment', 'pay', 'money', 'revenue', 'amount'],
                'stock': ['inventory', 'stock', 'quantity', 'warehouse', 'available'],
                'critiques': ['review', 'rating', 'feedback', 'comment', 'opinion'],
                'suppliers': ['supplier', 'vendor', 'procurement', 'purchase'],
                'classes': ['category', 'type', 'classification', 'group'],
                'manufacturers': ['brand', 'manufacturer', 'company', 'sales', 'sold', 'quantity', 'total'],
                'addresses': ['address', 'location', 'shipping', 'billing'],
                'shipments': ['shipment', 'delivery', 'shipping', 'tracking'],
                'reductions': ['discount', 'coupon', 'promotion', 'offer'],
                'warehouses': ['warehouse', 'facility', 'location', 'storage'],
                'staff': ['employee', 'staff', 'worker', 'person'],
                'departments': ['department', 'division', 'team'],
                'product_images': ['image', 'photo', 'picture', 'media'],
                'purchase_orders': ['purchase', 'procurement', 'supplier', 'order'],
                'purchase_order_items': ['purchase', 'procurement', 'supplier', 'item'],
                'order_discounts': ['discount', 'coupon', 'promotion', 'order'],
                'shipment_items': ['shipment', 'delivery', 'item', 'tracking']
            }
        
        def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
            import re
            
            # Rating tables by key phrase relevance
            query_words = set(re.findall(r'bw+b', user_query.decrease()))
            table_scores = {}
            
            for table_name, key phrases in self.table_keywords.gadgets():
                rating = 0
                
                # Depend key phrase matches
                for key phrase in key phrases:
                    if key phrase in query_words:
                        rating += 3
                    # Partial matches
                    for query_word in query_words:
                        if key phrase in query_word or query_word in key phrase:
                            rating += 1
                
                # Bonus for precise desk identify match
                if table_name.decrease() in query_words:
                    rating += 10
                
                table_scores[table_name] = rating
            
            # Get prime scoring tables
            sorted_tables = sorted(table_scores.gadgets(), key=lambda x: x[1], reverse=True)
            relevant_tables = [table for table, score in sorted_tables[:max_tables] if rating > 0]
            
            # Fallback to default tables if no matches
            if not relevant_tables:
                relevant_tables = self._get_default_tables(user_query)[:max_tables]
            
            # Construct schema for chosen tables
            return self._build_schema(relevant_tables)
        
        def _get_default_tables(self, user_query: str) -> Record[str]:
            """Get default tables primarily based on question patterns."""
            query_lower = user_query.decrease()
            
            # Gross sales/income queries
            if any(phrase in query_lower for phrase in ['revenue', 'sales', 'total', 'amount', 'brand']):
                return ['orders', 'order_items', 'product_variants', 'products', 'brands']
            
            # Product queries
            if any(phrase in query_lower for phrase in ['product', 'item', 'catalog']):
                return ['products', 'product_variants', 'categories', 'brands']
            
            # Buyer queries
            if any(phrase in query_lower for phrase in ['customer', 'user', 'buyer']):
                return ['customers', 'orders', 'addresses']
            
            # Default
            return ['products', 'customers', 'orders', 'order_items']
        
        def _build_schema(self, table_names: Record[str]) -> str:
            """Construct schema string for specified tables."""
            if not table_names:
                return get_structured_schema(self.db_path)
            
            conn = sqlite3.join(self.db_path)
            cursor = conn.cursor()
            schema_lines = ["Available tables and columns:"]
            
            strive:
                for table_name in table_names:
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    columns = cursor.fetchall()
                    if columns:
                        col_names = [col[1] for col in columns]
                        schema_lines.append(f"- {table_name}: {', '.be part of(col_names)}")
            lastly:
                conn.shut()
            
            return 'n'.be part of(schema_lines)
        
        def get_approach_info(self) -> Dict[str, Any]:
            return {
                "identify": "Key phrase RAG",
                "description": "Makes use of enterprise context key phrases to match related tables",
                "execs": ["Fast", "No external dependencies", "Good for business queries"],
                "cons": ["Limited by predefined keywords", "May miss complex relationships"],
                "best_for": "Enterprise queries with clear area phrases"
            }
    

    FAISS RAG Method

    The FAISS RAG technique is the place issues begin getting smarter. As a substitute of dumping the entire schema, I embed every desk’s metadata (columns, relationships, enterprise context) into vectors utilizing a sentence transformer. When the consumer asks a query, it’s embedded into that question too and makes use of FAISS to do a semantic search matching on the “which means” as a substitute of simply key phrases. It’s good for queries the place customers aren’t being very particular or when tables have associated phrases. I like FAISS as a result of it’s free, runs regionally, and provides fairly correct outcomes whereas saving tokens.

    The one catch is that setting it up takes some further steps, and it makes use of extra reminiscence than primary approaches. LLMs and embedding fashions don’t know what your tables imply except you clarify it to them. Within the _get_business_context() methodology, we have to manually write a brief description of what every desk represents within the enterprise.

    Within the _extract_table_info() methodology, I pull in desk names, column names, and international key relationships from SQLite’s PRAGMA queries to construct a dictionary with structured data about every desk. Lastly, within the _create_table_description() methodology, complete descriptions for every desk are constructed to be embedded by a SentenceTransformer.

    class FAISSVectorRAG(BaseRAG):
        """FAISS-based vector RAG utilizing sentence transformers."""
        
        def __init__(self, db_path: str = DB_PATH):
            tremendous().__init__(db_path)
            self.mannequin = None
            self.index = None
            self.table_info = {}
            self.table_names = []
            self._initialize()
        
        def _initialize(self):
            """Initialize the FAISS vector retailer and embeddings."""
            strive:
                from sentence_transformers import SentenceTransformer
                import faiss
                import numpy as np
                
                print("🔄 Initializing FAISS Vector RAG...")
                
                # Load embedding mannequin
                self.mannequin = SentenceTransformer('all-MiniLM-L6-v2')
                print("✅ Loaded embedding mannequin: all-MiniLM-L6-v2")
                
                # Extract desk data and create embeddings
                self.table_info = self._extract_table_info()
                
                # Create embeddings for every desk
                table_descriptions = []
                self.table_names = []
                
                for table_name, data in self.table_info.gadgets():
                    description = self._create_table_description(table_name, data)
                    table_descriptions.append(description)
                    self.table_names.append(table_name)
                
                # Generate embeddings
                print(f"🔄 Producing embeddings for {len(table_descriptions)} tables...")
                embeddings = self.mannequin.encode(table_descriptions)
                
                # Create FAISS index
                dimension = embeddings.form[1]
                self.index = faiss.IndexFlatIP(dimension)  # Internal product for cosine similarity
                
                # Normalize embeddings for cosine similarity
                faiss.normalize_L2(embeddings)
                self.index.add(embeddings.astype('float32'))
                
                print(f"✅ FAISS Vector RAG initialized with {len(table_descriptions)} tables")
                
            besides Exception as e:
                print(f"❌ Error initializing FAISS Vector RAG: {e}")
                self.mannequin = None
                self.index = None
        
        def _extract_table_info(self) -> Dict[str, Dict]:
            """Extract detailed details about every desk."""
            conn = sqlite3.join(self.db_path)
            cursor = conn.cursor()
            table_info = {}
            
            strive:
                # Get all desk names
                cursor.execute("SELECT identify FROM sqlite_master WHERE sort='desk';")
                tables = cursor.fetchall()
                
                for (table_name,) in tables:
                    data = {
                        'columns': [],
                        'foreign_keys': [],
                        'business_context': self._get_business_context(table_name)
                    }
                    
                    # Get column data
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    columns = cursor.fetchall()
                    for col in columns:
                        data['columns'].append({
                            'identify': col[1],
                            'sort': col[2],
                            'primary_key': bool(col[5])
                        })
                    
                    # Get international key data
                    cursor.execute(f"PRAGMA foreign_key_list({table_name});")
                    fks = cursor.fetchall()
                    for fk in fks:
                        data['foreign_keys'].append({
                            'column': fk[3],
                            'references_table': fk[2],
                            'references_column': fk[4]
                        })
                    
                    table_info[table_name] = data
            
            lastly:
                conn.shut()
            
            return table_info
        
        def _get_business_context(self, table_name: str) -> str:
            """Get enterprise context description for tables."""
            contexts = {
                'merchandise': 'Product catalog with gadgets, costs, classes, and model data. Core stock knowledge.',
                'product_variants': 'Product variations like colours, sizes, SKUs. Hyperlinks merchandise to particular sellable gadgets.',
                'prospects': 'Buyer profiles with private data, contact particulars, and account standing.',
                'orders': 'Buy transactions with totals, dates, standing, and buyer relationships.',
                'order_items': 'Particular person line gadgets inside orders. Incorporates portions, costs, and product references.',
                'funds': 'Cost processing data with strategies, quantities, and transaction standing.',
                'stock': 'Inventory ranges and warehouse portions for product variants.',
                'critiques': 'Buyer suggestions, scores, and product critiques.',
                'suppliers': 'Vendor data for procurement and provide chain administration.',
                'classes': 'Product categorization hierarchy for organizing catalog.',
                'manufacturers': 'Model data for merchandise and advertising and marketing functions.',
                'addresses': 'Buyer transport and billing deal with data.',
                'shipments': 'Supply monitoring and transport standing data.',
                'reductions': 'Promotional codes, coupons, and low cost campaigns.',
                'warehouses': 'Storage facility places and warehouse administration.',
                'staff': 'Workers data and organizational construction.',
                'departments': 'Organizational divisions and group construction.',
                'product_images': 'Product images and media property.',
                'purchase_orders': 'Procurement orders from suppliers.',
                'purchase_order_items': 'Line gadgets for provider buy orders.',
                'order_discounts': 'Utilized reductions and promotions on orders.',
                'shipment_items': 'Particular person gadgets inside cargo packages.'
            }
            
            return contexts.get(table_name, f'Database desk for {table_name} associated operations.')
        
        def _create_table_description(self, table_name: str, data: Dict) -> str:
            """Create a complete description for embedding."""
            description = f"Desk: {table_name}n"
            description += f"Function: {data['business_context']}n"
            
            # Add column data
            description += "Columns: "
            col_names = [col['name'] for col in data['columns']]
            description += ", ".be part of(col_names) + "n"
            
            # Add relationship data
            if data['foreign_keys']:
                description += "Relationships: "
                relationships = []
                for fk in data['foreign_keys']:
                    relationships.append(f"hyperlinks to {fk['references_table']} through {fk['column']}")
                description += "; ".be part of(relationships) + "n"
            
            # Add widespread use instances primarily based on desk sort
            use_cases = self._get_use_cases(table_name)
            if use_cases:
                description += f"Frequent queries: {use_cases}"
            
            return description
        
        def _get_use_cases(self, table_name: str) -> str:
            """Get widespread use instances for every desk."""
            use_cases = {
                'merchandise': 'product searches, catalog listings, value queries, stock checks',
                'prospects': 'buyer lookup, registration evaluation, geographic distribution',
                'orders': 'gross sales evaluation, income monitoring, order historical past, standing monitoring',
                'order_items': 'product gross sales efficiency, income by product, order composition',
                'funds': 'cost processing, income reconciliation, cost methodology evaluation',
                'manufacturers': 'model efficiency, gross sales by model, model comparability',
                'classes': 'class evaluation, product group, catalog construction'
            }
            
            return use_cases.get(table_name, 'normal knowledge queries and evaluation')
        
        def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
            """Get related schema utilizing vector similarity search."""
            if self.mannequin is None or self.index is None:
                print("⚠️ FAISS not initialized, falling again to full schema")
                return get_structured_schema(self.db_path)
            
            strive:
                import faiss
                import numpy as np
                
                # Generate question embedding
                query_embedding = self.mannequin.encode([user_query])
                faiss.normalize_L2(query_embedding)
                
                # Seek for comparable tables
                scores, indices = self.index.search(query_embedding.astype('float32'), max_tables)
                
                # Get related desk names
                relevant_tables = []
                for i, (rating, idx) in enumerate(zip(scores[0], indices[0])):
                    if idx < len(self.table_names) and rating > 0.1:  # Minimal similarity threshold
                        relevant_tables.append(self.table_names[idx])
                
                # Fallback if no related tables discovered
                if not relevant_tables:
                    print("⚠️ No related tables discovered, utilizing defaults")
                    relevant_tables = self._get_default_tables(user_query)[:max_tables]
                
                # Construct schema for chosen tables
                return self._build_schema(relevant_tables)
                
            besides Exception as e:
                print(f"⚠️ Vector search failed: {e}, falling again to full schema")
                return get_structured_schema(self.db_path)
        
        def _get_default_tables(self, user_query: str) -> Record[str]:
            """Get default tables primarily based on question patterns."""
            query_lower = user_query.decrease()
            
            if any(phrase in query_lower for phrase in ['revenue', 'sales', 'total', 'amount', 'brand']):
                return ['orders', 'order_items', 'product_variants', 'products', 'brands']
            elif any(phrase in query_lower for phrase in ['product', 'item', 'catalog']):
                return ['products', 'product_variants', 'categories', 'brands']
            elif any(phrase in query_lower for phrase in ['customer', 'user', 'buyer']):
                return ['customers', 'orders', 'addresses']
            else:
                return ['products', 'customers', 'orders', 'order_items']
        
        def _build_schema(self, table_names: Record[str]) -> str:
            """Construct schema string for specified tables."""
            if not table_names:
                return get_structured_schema(self.db_path)
            
            conn = sqlite3.join(self.db_path)
            cursor = conn.cursor()
            schema_lines = ["Available tables and columns:"]
            
            strive:
                for table_name in table_names:
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    columns = cursor.fetchall()
                    if columns:
                        col_names = [col[1] for col in columns]
                        schema_lines.append(f"- {table_name}: {', '.be part of(col_names)}")
            lastly:
                conn.shut()
            
            return 'n'.be part of(schema_lines)
        
        def get_approach_info(self) -> Dict[str, Any]:
            return {
                "identify": "FAISS Vector RAG",
                "description": "Makes use of semantic embeddings and vector similarity search",
                "execs": ["Semantic understanding", "Handles complex queries", "No API costs"],
                "cons": ["Requires model download", "Higher memory usage", "Setup complexity"],
                "best_for": "Complicated queries, giant schemas, semantic relationships"
            }
    

    Chroma RAG Technique

    Chroma RAG is a extra production-friendly model of FAISS as a result of it provides persistent storage. As a substitute of protecting embeddings in reminiscence, Chroma shops them regionally, so even when I restart the app, the vector index continues to be there. Similar to in FAISS, I nonetheless must manually describe what every desk does in enterprise phrases (in _get_business_context()). I embed my schema descriptions and retailer them in ChromaDB. Upon initialization, sentence-transformer (MiniLM) is loaded. If the vector already exists, it’s loaded. If not, I extract data + descriptions and name _populate_collection() to generate and retailer vectors. This course of solely must be finished as soon as or when the schema adjustments.

    It’s quick, constant throughout classes, and straightforward to arrange. I selected it as a result of it’s free, doesn’t want exterior companies, and works properly for real-world use instances the place you need to scale with out worrying about shedding the vector index or reprocessing every thing each time.

    class ChromaVectorRAG(BaseRAG):
        """Chroma-based vector RAG utilizing sentence transformers with persistent storage."""
        
        def __init__(self, db_path: str = DB_PATH):
            tremendous().__init__(db_path)
            self.mannequin = None
            self.chroma_client = None
            self.assortment = None
            self.table_info = {}
            self.table_names = []
            self._initialize()
        
        def _initialize(self):
            """Initialize the Chroma vector retailer and embeddings."""
            strive:
                import chromadb
                from sentence_transformers import SentenceTransformer
                
                print("🔄 Initializing Chroma Vector RAG...")
                
                # Load embedding mannequin
                self.mannequin = SentenceTransformer('all-MiniLM-L6-v2')
                print("✅ Loaded embedding mannequin: all-MiniLM-L6-v2")
                
                # Initialize Chroma shopper (persistent storage)
                self.chroma_client = chromadb.PersistentClient(path="./knowledge/chroma_db")
                
                # Get or create assortment
                collection_name = "schema_tables"
                strive:
                    self.assortment = self.chroma_client.get_collection(collection_name)
                    print("✅ Loaded current Chroma assortment")
                besides:
                    # Create new assortment if it would not exist
                    self.assortment = self.chroma_client.create_collection(
                        identify=collection_name,
                        metadata={"description": "Database schema desk embeddings"}
                    )
                    print("✅ Created new Chroma assortment")
                    
                    # Extract desk data and create embeddings
                    self.table_info = self._extract_table_info()
                    self._populate_collection()
                
                # Load desk names for reference
                self._load_table_names()
                
                print(f"✅ Chroma Vector RAG initialized with {len(self.table_names)} tables")
                
            besides Exception as e:
                print(f"❌ Error initializing Chroma Vector RAG: {e}")
                self.mannequin = None
                self.chroma_client = None
                self.assortment = None
        
        def _extract_table_info(self) -> Dict[str, Dict]:
            """Extract detailed details about every desk."""
            conn = sqlite3.join(self.db_path)
            cursor = conn.cursor()
            table_info = {}
            
            strive:
                # Get all desk names
                cursor.execute("SELECT identify FROM sqlite_master WHERE sort='desk';")
                tables = cursor.fetchall()
                
                for (table_name,) in tables:
                    data = {
                        'columns': [],
                        'foreign_keys': [],
                        'business_context': self._get_business_context(table_name)
                    }
                    
                    # Get column data
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    columns = cursor.fetchall()
                    for col in columns:
                        data['columns'].append({
                            'identify': col[1],
                            'sort': col[2],
                            'primary_key': bool(col[5])
                        })
                    
                    # Get international key data
                    cursor.execute(f"PRAGMA foreign_key_list({table_name});")
                    fks = cursor.fetchall()
                    for fk in fks:
                        data['foreign_keys'].append({
                            'column': fk[3],
                            'references_table': fk[2],
                            'references_column': fk[4]
                        })
                    
                    table_info[table_name] = data
            
            lastly:
                conn.shut()
            
            return table_info
        
        def _get_business_context(self, table_name: str) -> str:
            """Get enterprise context description for tables."""
            contexts = {
                'merchandise': 'Product catalog with gadgets, costs, classes, and model data. Core stock knowledge.',
                'product_variants': 'Product variations like colours, sizes, SKUs. Hyperlinks merchandise to particular sellable gadgets.',
                'prospects': 'Buyer profiles with private data, contact particulars, and account standing.',
                'orders': 'Buy transactions with totals, dates, standing, and buyer relationships.',
                'order_items': 'Particular person line gadgets inside orders. Incorporates portions, costs, and product references.',
                'funds': 'Cost processing data with strategies, quantities, and transaction standing.',
                'stock': 'Inventory ranges and warehouse portions for product variants.',
                'critiques': 'Buyer suggestions, scores, and product critiques.',
                'suppliers': 'Vendor data for procurement and provide chain administration.',
                'classes': 'Product categorization hierarchy for organizing catalog.',
                'manufacturers': 'Model data for merchandise and advertising and marketing functions.',
                'addresses': 'Buyer transport and billing deal with data.',
                'shipments': 'Supply monitoring and transport standing data.',
                'reductions': 'Promotional codes, coupons, and low cost campaigns.',
                'warehouses': 'Storage facility places and warehouse administration.',
                'staff': 'Workers data and organizational construction.',
                'departments': 'Organizational divisions and group construction.',
                'product_images': 'Product images and media property.',
                'purchase_orders': 'Procurement orders from suppliers.',
                'purchase_order_items': 'Line gadgets for provider buy orders.',
                'order_discounts': 'Utilized reductions and promotions on orders.',
                'shipment_items': 'Particular person gadgets inside cargo packages.'
            }
            
            return contexts.get(table_name, f'Database desk for {table_name} associated operations.')
        
        def _populate_collection(self):
            """Populate Chroma assortment with desk embeddings."""
            if not self.assortment or not self.table_info:
                return
            
            paperwork = []
            metadatas = []
            ids = []
            
            for table_name, data in self.table_info.gadgets():
                # Create complete description
                description = self._create_table_description(table_name, data)
                
                paperwork.append(description)
                metadatas.append({
                    'table_name': table_name,
                    'column_count': len(data['columns']),
                    'has_foreign_keys': len(data['foreign_keys']) > 0,
                    'business_context': data['business_context']
                })
                ids.append(f"table_{table_name}")
            
            # Add to assortment
            self.assortment.add(
                paperwork=paperwork,
                metadatas=metadatas,
                ids=ids
            )
            
            print(f"✅ Added {len(paperwork)} desk embeddings to Chroma assortment")
        
        def _create_table_description(self, table_name: str, data: Dict) -> str:
            """Create a complete description for embedding."""
            description = f"Desk: {table_name}n"
            description += f"Function: {data['business_context']}n"
            
            # Add column data
            description += "Columns: "
            col_names = [col['name'] for col in data['columns']]
            description += ", ".be part of(col_names) + "n"
            
            # Add relationship data
            if data['foreign_keys']:
                description += "Relationships: "
                relationships = []
                for fk in data['foreign_keys']:
                    relationships.append(f"hyperlinks to {fk['references_table']} through {fk['column']}")
                description += "; ".be part of(relationships) + "n"
            
            # Add widespread use instances
            use_cases = self._get_use_cases(table_name)
            if use_cases:
                description += f"Frequent queries: {use_cases}"
            
            return description
        
        def _get_use_cases(self, table_name: str) -> str:
            """Get widespread use instances for every desk."""
            use_cases = {
                'merchandise': 'product searches, catalog listings, value queries, stock checks',
                'prospects': 'buyer lookup, registration evaluation, geographic distribution',
                'orders': 'gross sales evaluation, income monitoring, order historical past, standing monitoring',
                'order_items': 'product gross sales efficiency, income by product, order composition',
                'funds': 'cost processing, income reconciliation, cost methodology evaluation',
                'manufacturers': 'model efficiency, gross sales by model, model comparability',
                'classes': 'class evaluation, product group, catalog construction'
            }
            
            return use_cases.get(table_name, 'normal knowledge queries and evaluation')
        
        def _load_table_names(self):
            """Load desk names from the gathering."""
            if not self.assortment:
                return
            
            strive:
                # Get all gadgets from assortment
                outcomes = self.assortment.get()
                self.table_names = [metadata['table_name'] for metadata in outcomes['metadatas']]
            besides Exception as e:
                print(f"⚠️ Couldn't load desk names from Chroma: {e}")
                self.table_names = []
        
        def get_relevant_schema(self, user_query: str, max_tables: int = 5) -> str:
            """Get related schema utilizing Chroma vector similarity search."""
            if not self.assortment:
                print("⚠️ Chroma not initialized, falling again to full schema")
                return get_structured_schema(self.db_path)
            
            strive:
                # Seek for comparable tables
                outcomes = self.assortment.question(
                    query_texts=[user_query],
                    n_results=max_tables
                )
                
                # Extract related desk names
                relevant_tables = []
                if outcomes['metadatas'] and len(outcomes['metadatas']) > 0:
                    for metadata in outcomes['metadatas'][0]:
                        relevant_tables.append(metadata['table_name'])
                
                # Fallback if no related tables discovered
                if not relevant_tables:
                    print("⚠️ No related tables discovered, utilizing defaults")
                    relevant_tables = self._get_default_tables(user_query)[:max_tables]
                
                # Construct schema for chosen tables
                return self._build_schema(relevant_tables)
                
            besides Exception as e:
                print(f"⚠️ Chroma search failed: {e}, falling again to full schema")
                return get_structured_schema(self.db_path)
        
        def _get_default_tables(self, user_query: str) -> Record[str]:
            """Get default tables primarily based on question patterns."""
            query_lower = user_query.decrease()
            
            if any(phrase in query_lower for phrase in ['revenue', 'sales', 'total', 'amount', 'brand']):
                return ['orders', 'order_items', 'product_variants', 'products', 'brands']
            elif any(phrase in query_lower for phrase in ['product', 'item', 'catalog']):
                return ['products', 'product_variants', 'categories', 'brands']
            elif any(phrase in query_lower for phrase in ['customer', 'user', 'buyer']):
                return ['customers', 'orders', 'addresses']
            else:
                return ['products', 'customers', 'orders', 'order_items']
        
        def _build_schema(self, table_names: Record[str]) -> str:
            """Construct schema string for specified tables."""
            if not table_names:
                return get_structured_schema(self.db_path)
            
            conn = sqlite3.join(self.db_path)
            cursor = conn.cursor()
            schema_lines = ["Available tables and columns:"]
            
            strive:
                for table_name in table_names:
                    cursor.execute(f"PRAGMA table_info({table_name});")
                    columns = cursor.fetchall()
                    if columns:
                        col_names = [col[1] for col in columns]
                        schema_lines.append(f"- {table_name}: {', '.be part of(col_names)}")
            lastly:
                conn.shut()
            
            return 'n'.be part of(schema_lines)
        
        def get_approach_info(self) -> Dict[str, Any]:
            return {
                "identify": "Chroma Vector RAG",
                "description": "Makes use of Chroma DB for persistent vector storage with semantic search",
                "execs": ["Persistent storage", "Fast queries", "Scalable", "Easy management"],
                "cons": ["Requires disk space", "Initial setup time", "Additional dependency"],
                "best_for": "Manufacturing environments, persistent workflows, group collaboration"
            }
    

    Evaluating the completely different RAG Methods

    This RAGManager class is the management middle for switching between completely different RAG methods. Primarily based on the consumer question, it picks the fitting method, fetches essentially the most related a part of the schema, and tracks efficiency like response time, token financial savings, and desk rely. It additionally has a examine operate to benchmark all RAGs side-by-side, and shops historic metrics so you may analyze how each is doing over time. Tremendous helpful for testing what works finest and protecting issues optimized.

    All of the completely different RAG technique lessons are initialized and saved in self.approaches. Every RAG method is a category that inherits from BaseRAG, so all of them have a constant interface (get_relevant_schema() and get_approach_info()). This implies you may simply plug in a brand new technique (say Pinecone or Weaviate) so long as it extends BaseRAG.

    The tactic get_relevant_schema() returns the schema related to that question primarily based on the chosen technique. If an invalid technique is handed or there’s a failure for some cause, it neatly falls again to the 'Key phrase RAG' technique.

    The tactic compare_approaches() runs the identical question by way of all of the RAG methods. It measures:

    • Size of ensuing schema
    • % Token discount vs full schema
    • Response time
    • Variety of tables returned

    That is actually helpful to benchmark methods side-by-side and choose the one finest suited in your use case.

    class RAGManager:
        """Supervisor for a number of RAG approaches."""
        
        def __init__(self, db_path: str = DB_PATH):
            self.db_path = db_path
            self.approaches = {
                'no_rag': NoRAG(db_path),
                'key phrase': KeywordRAG(db_path),
                'faiss': FAISSVectorRAG(db_path),
                'chroma': ChromaVectorRAG(db_path)
            }
            self.performance_metrics = {}
        
        def get_available_approaches(self) -> Dict[str, Dict[str, Any]]:
            """Get details about all obtainable RAG approaches."""
            return {
                approach_id: method.get_approach_info() 
                for approach_id, method in self.approaches.gadgets()
            }
        
        def get_relevant_schema(self, user_query: str, method: str = 'key phrase', max_tables: int = 5) -> str:
            """Get related schema utilizing specified method."""
            if method not in self.approaches:
                print(f"⚠️ Unknown method '{method}', falling again to key phrase")
                method = 'key phrase'
            
            start_time = time.time()
            
            strive:
                schema = self.approaches[approach].get_relevant_schema(user_query, max_tables)
                
                # Document efficiency metrics
                end_time = time.time()
                self._record_performance(method, user_query, schema, end_time - start_time)
                
                return schema
                
            besides Exception as e:
                print(f"⚠️ Error with {method} method: {e}")
                # Fallback to key phrase method
                if method != 'key phrase':
                    return self.get_relevant_schema(user_query, 'key phrase', max_tables)
                else:
                    return get_structured_schema(self.db_path)
        
        def compare_approaches(self, user_query: str, max_tables: int = 5) -> Dict[str, Any]:
            """Examine all approaches for a given question."""
            outcomes = {}
            full_schema = get_structured_schema(self.db_path)
            full_schema_length = len(full_schema)
            
            for approach_id, method in self.approaches.gadgets():
                start_time = time.time()
                
                strive:
                    schema = method.get_relevant_schema(user_query, max_tables)
                    end_time = time.time()
                    
                    outcomes[approach_id] = {
                        'schema': schema,
                        'schema_length': len(schema),
                        'token_reduction': ((full_schema_length - len(schema)) / full_schema_length) * 100,
                        'response_time': end_time - start_time,
                        'table_count': len([line for line in schema.split('n') if line.startswith('- ')]),
                        'success': True
                    }
                    
                besides Exception as e:
                    outcomes[approach_id] = {
                        'schema': '',
                        'schema_length': 0,
                        'token_reduction': 0,
                        'response_time': 0,
                        'table_count': 0,
                        'success': False,
                        'error': str(e)
                    }
            
            return outcomes
        
        def _record_performance(self, method: str, question: str, schema: str, response_time: float):
            """Document efficiency metrics for evaluation."""
            if method not in self.performance_metrics:
                self.performance_metrics[approach] = []
            
            full_schema_length = len(get_structured_schema(self.db_path))
            schema_length = len(schema)
            
            metrics = {
                'question': question,
                'schema_length': schema_length,
                'token_reduction': ((full_schema_length - schema_length) / full_schema_length) * 100,
                'response_time': response_time,
                'table_count': len([line for line in schema.split('n') if line.startswith('- ')]),
                'timestamp': time.time()
            }
            
            self.performance_metrics[approach].append(metrics)
        
        def get_performance_summary(self) -> Dict[str, Any]:
            """Get efficiency abstract for all approaches."""
            abstract = {}
            
            for method, metrics_list in self.performance_metrics.gadgets():
                if not metrics_list:
                    proceed
                    
                avg_token_reduction = sum(m['token_reduction'] for m in metrics_list) / len(metrics_list)
                avg_response_time = sum(m['response_time'] for m in metrics_list) / len(metrics_list)
                avg_table_count = sum(m['table_count'] for m in metrics_list) / len(metrics_list)
                
                abstract[approach] = {
                    'queries_processed': len(metrics_list),
                    'avg_token_reduction': spherical(avg_token_reduction, 1),
                    'avg_response_time': spherical(avg_response_time, 3),
                    'avg_table_count': spherical(avg_table_count, 1)
                }
            
            return abstract
    
    
    # Comfort capabilities for backward compatibility
    def get_rag_enhanced_schema(user_query: str, db_path: str = DB_PATH, method: str = 'key phrase') -> str:
        """Get RAG-enhanced schema utilizing specified method."""
        supervisor = RAGManager(db_path)
        return supervisor.get_relevant_schema(user_query, method)
    
    
    # World cached occasion
    _rag_manager_instance = None
    
    def get_cached_rag_manager(db_path: str = DB_PATH) -> RAGManager:
        """Get cached RAG supervisor occasion."""
        world _rag_manager_instance
        if _rag_manager_instance is None:
            _rag_manager_instance = RAGManager(db_path)
        return _rag_manager_instance

    The Streamlit app is totally built-in with this supervisor, so customers can select the technique they need and see real-time outcomes. You’ll be able to take a look at the entire code on GitHub here. Right here’s a working demo of the brand new App in motion:

    RAG Implementation In Motion, by Writer

    Last Ideas

    This isn’t the top; there’s nonetheless loads to enhance. I must stress take a look at in opposition to a wide range of assaults and reinforce guardrails to scale back hallucinations and guarantee knowledge security. Will probably be good to construct a role-based entry system for knowledge governance. Possibly changing Streamlit with a frontend framework like React may make the app extra scalable for real-world deployments. All this, for subsequent time.


    Earlier than you go…

    Comply with me so that you don’t miss any new posts I write in future; you will see that extra of my articles on my profile page. You can too join with me on LinkedIn or X!



    Source link

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Previous ArticleTech-företagen vill ha AI-datacenter i rymden
    Next Article MIT researchers propose a new model for legible, modular software | MIT News
    ProfitlyAI
    • Website

    Related Posts

    Artificial Intelligence

    TDS Newsletter: How Compelling Data Stories Lead to Better Business Decisions

    November 15, 2025
    Artificial Intelligence

    I Measured Neural Network Training Every 5 Steps for 10,000 Iterations

    November 15, 2025
    Artificial Intelligence

    “The success of an AI product depends on how intuitively users can interact with its capabilities”

    November 14, 2025
    Add A Comment
    Leave A Reply Cancel Reply

    Top Posts

    AI maps how a new antibiotic targets gut bacteria | MIT News

    October 3, 2025

    Log Link vs Log Transformation in R — The Difference that Misleads Your Entire Data Analysis

    May 10, 2025

    Stop Chasing “Efficiency AI.” The Real Value Is in “Opportunity AI.”

    June 25, 2025

    The Crucial Role of Color Theory in Data Analysis and Visualization

    September 11, 2025

    Your ultimate guide to understanding LayoutLM

    September 5, 2025
    Categories
    • AI Technology
    • AI Tools & Technologies
    • Artificial Intelligence
    • Latest AI Innovations
    • Latest News
    Most Popular

    Feature Detection, Part 2: Laplace & Gaussian Operators

    November 12, 2025

    Perplexity Labs lanserar projektassistenten Pro AI-suite

    May 30, 2025

    Hur man tar bort bakgrunder från foton med AI – enkelt och gratis

    July 6, 2025
    Our Picks

    TDS Newsletter: How Compelling Data Stories Lead to Better Business Decisions

    November 15, 2025

    I Measured Neural Network Training Every 5 Steps for 10,000 Iterations

    November 15, 2025

    “The success of an AI product depends on how intuitively users can interact with its capabilities”

    November 14, 2025
    Categories
    • AI Technology
    • AI Tools & Technologies
    • Artificial Intelligence
    • Latest AI Innovations
    • Latest News
    • Privacy Policy
    • Disclaimer
    • Terms and Conditions
    • About us
    • Contact us
    Copyright © 2025 ProfitlyAI All Rights Reserved.

    Type above and press Enter to search. Press Esc to cancel.