gary.info

here be dragons

DSGo - Disgo is coming to a repo near you!

dsgo.md

DSPy-Go Production Reference: The Complete Technical Guide

Architectural Foundation: From Prompting to Programming

The Paradigm Shift

DSPy transforms LLM development from artisanal prompt engineering to systematic software engineering. Prompts become learnable parameters in a computational graph, optimized against defined metrics rather than hand-tuned through trial and error.

Core Philosophy

  • Declarative Contracts: Define what, not how
  • Systematic Optimization: Replace guesswork with formal optimization loops
  • Composable Modules: Build complex systems from simple, testable units
  • Measurable Outcomes: Every decision backed by quantitative metrics
  • Part 1: Core Programming Model - Building the Computational Graph

    1.1 Signatures: Declarative Task Definition

    Signatures are atomic units of task definition - typed schemas that specify intent without dictating implementation.

    // Simple signature with field descriptions
    type BasicQA struct {
        Question string `dspy:"input" desc:"User's question"`
        Answer   string `dspy:"output" desc:"Short factual answer, 1-5 words"`
    }
    
    // Complex signature with constraints
    type ClassifyAndExtract struct {
        Message  string   `dspy:"input"`
        Category string   `dspy:"output" literal:"Sales,Support,Technical"`
        Urgency  int      `dspy:"output" min:"1" max:"5"`
        Entities []string `dspy:"output" desc:"Company names identified"`
    }
    
    // Multi-stage signature with dependencies
    type AnalysisSignature struct {
        Document  string   `dspy:"input"`
        Summary   string   `dspy:"output" max_words:"100"`
        KeyPoints []string `dspy:"output" min_items:"3" max_items:"7"`
        Sentiment float64  `dspy:"output" min:"0.0" max:"1.0"`
    }

    Best Practices:

  • Keep descriptions clear but concise - avoid over-prescriptive constraints
  • Use type constraints (literal, min, max) for validation
  • Define the transformation intent, not the implementation method
  • 1.2 Modules: Execution Strategy Units

    Core Module Types

    // Predict: Direct transformation
    predict := modules.NewPredict(signature)
    
    // ChainOfThought: Adds reasoning field automatically
    cot := modules.NewChainOfThought(signature)
    // Transforms: input -> rationale -> output
    
    // ReAct: Tool-augmented reasoning
    react := modules.NewReAct(signature, []core.Tool{
        calculator,
        searchTool,
        codeExecutor,
    })
    
    // ProgramOfThought: Mathematical reasoning
    pot := modules.NewProgramOfThought(signature)

    Module Composition Pattern

    type RAGPipeline struct {
        queryRewriter   *modules.ChainOfThought
        retriever       *modules.Retrieve
        reranker        *modules.Predict
        synthesizer     *modules.ChainOfThought
        validator       *modules.Predict
    }
    
    func (p *RAGPipeline) Forward(ctx context.Context, question string) (map[string]interface{}, error) {
        // 1. Rewrite query for better retrieval
        rewritten := p.queryRewriter.Process(ctx, map[string]interface{}{
            "question": question,
        })
        
        // 2. Retrieve documents
        docs := p.retriever.Process(ctx, map[string]interface{}{
            "query": rewritten["improved_query"],
            "top_k": 10,
        })
        
        // 3. Rerank for relevance
        ranked := p.reranker.Process(ctx, map[string]interface{}{
            "query":     question,
            "documents": docs["documents"],
        })
        
        // 4. Synthesize answer
        answer := p.synthesizer.Process(ctx, map[string]interface{}{
            "question":  question,
            "documents": ranked["top_documents"],
        })
        
        // 5. Validate factuality
        validated := p.validator.Process(ctx, map[string]interface{}{
            "answer":    answer["answer"],
            "documents": ranked["top_documents"],
        })
        
        return validated, nil
    }

    Part 2: Compilation Engine - Systematic Optimization

    2.1 Metrics: Quantifying Success

    // Binary metric for exact match
    func exactMatchMetric(example, prediction map[string]interface{}) float64 {
        if prediction["answer"] == example["answer"] {
            return 1.0
        }
        return 0.0
    }
    
    // Graduated scoring with partial credit
    func gradedMetric(example, prediction map[string]interface{}) float64 {
        score := 0.0
        
        // Presence check (30%)
        if prediction["answer"] != nil {
            score += 0.3
        }
        
        // Correctness (50%)
        if prediction["answer"] == example["answer"] {
            score += 0.5
        }
        
        // Format compliance (20%)
        if meetsFormatRequirements(prediction) {
            score += 0.2
        }
        
        return score
    }
    
    // LLM-as-Judge for nuanced evaluation
    type FactualityJudge struct {
        judge *modules.ChainOfThought
    }
    
    func (f *FactualityJudge) Evaluate(example, prediction map[string]interface{}) float64 {
        result := f.judge.Process(context.Background(), map[string]interface{}{
            "statement": prediction["answer"],
            "context":   example["context"],
            "question":  example["question"],
        })
        
        confidence := result["confidence"].(float64)
        isFactual := result["is_factual"].(bool)
        
        if isFactual {
            return confidence
        }
        return confidence * 0.3 // Penalize non-factual answers
    }
    
    // Feedback-driven metric for GEPA
    func feedbackMetric(example, prediction map[string]interface{}) (float64, string) {
        score := gradedMetric(example, prediction)
        
        feedback := ""
        if score < 0.5 {
            feedback = analyzeFailure(example, prediction)
        } else if score < 0.8 {
            feedback = suggestImprovements(example, prediction)
        }
        
        return score, feedback
    }

    2.2 Optimizer Configurations

    // BootstrapFewShot: Standard workhorse
    bootstrap := optimizers.NewBootstrapFewShot(
        dataset,
        metricFunc,
        optimizers.WithMaxBootstrappedDemos(8),
        optimizers.WithMaxLabeledDemos(3),
    )
    
    // MIPROv2: Bayesian optimization
    mipro := optimizers.NewMIPRO(
        metricFunc,
        optimizers.WithMode(optimizers.StandardMode),
        optimizers.WithNumTrials(20),
        optimizers.WithTPEGamma(0.25),
    )
    
    // GEPA: Reflective evolution with feedback
    gepa := optimizers.NewGEPA(&optimizers.GEPAConfig{
        PopulationSize:    20,
        MaxGenerations:    15,
        SelectionStrategy: "adaptive_pareto",
        MutationRate:      0.3,
        CrossoverRate:     0.7,
        ReflectionFreq:    3,  // LLM reflection every 3 generations
        ElitismRate:       0.1,
        FeedbackMetric:    feedbackMetric, // Uses textual feedback
    })
    
    // SIMBA: Introspective mini-batch ascent
    simba := optimizers.NewSIMBA(
        optimizers.WithSIMBABatchSize(8),
        optimizers.WithSIMBAMaxSteps(12),
        optimizers.WithSIMBANumCandidates(6),
        optimizers.WithIntrospection(true), // Enable self-analysis
    )

    2.3 Teacher-Student Optimization Pattern

    type TeacherStudentOptimizer struct {
        teacher    core.LLM  // Large, expensive model
        student    core.LLM  // Small, efficient model
        optimizer  *optimizers.MIPROv2
    }
    
    func (t *TeacherStudentOptimizer) Compile(ctx context.Context, program core.Module, dataset core.Dataset) (*CompiledProgram, error) {
        // Phase 1: Generate high-quality training data with teacher
        teacherData := t.generateTeacherData(ctx, dataset)
        
        // Phase 2: Use teacher for prompt generation
        t.optimizer.SetPromptModel(t.teacher)
        t.optimizer.SetTaskModel(t.student)
        
        // Phase 3: Compile student with teacher's knowledge
        compiled, err := t.optimizer.Compile(ctx, program, teacherData, nil)
        if err != nil {
            return nil, err
        }
        
        // Phase 4: Distillation - further optimize student
        distilled := t.distill(ctx, compiled, teacherData)
        
        return distilled, nil
    }
    
    func (t *TeacherStudentOptimizer) generateTeacherData(ctx context.Context, dataset core.Dataset) core.Dataset {
        enriched := datasets.NewInMemoryDataset()
        
        for _, example := range dataset.GetExamples() {
            // Use teacher to generate high-quality outputs
            teacherOutput := t.teacher.Generate(ctx, example.Input)
            
            // Add reasoning traces
            enriched.AddExample(map[string]interface{}{
                "input":     example.Input,
                "output":    teacherOutput,
                "reasoning": t.extractReasoning(teacherOutput),
                "confidence": t.assessConfidence(teacherOutput),
            })
        }
        
        return enriched
    }

    Part 3: Runtime Environment - Reliability & Continuous Learning

    3.1 Runtime Self-Correction Architecture

    // Production deployment with layered reliability
    type ProductionSystem struct {
        compiled   *CompiledProgram  // Optimized base program
        bestOfN    *modules.BestOfN  // Diversity-based retry
        refine     *modules.Refine   // Feedback-based iteration
        monitor    *RuntimeMonitor   // Performance tracking
    }
    
    func (p *ProductionSystem) Process(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
        // Layer 1: Try compiled program
        output, err := p.compiled.Process(ctx, input)
        if err == nil && p.monitor.AssessQuality(output) > 0.8 {
            return output, nil
        }
        
        // Layer 2: BestOfN for diversity
        if err != nil || p.monitor.AssessQuality(output) < 0.6 {
            output, err = p.bestOfN.Process(ctx, input)
            if err == nil && p.monitor.AssessQuality(output) > 0.8 {
                return output, nil
            }
        }
        
        // Layer 3: Refine with feedback
        if p.monitor.AssessQuality(output) < 0.8 {
            output, err = p.refine.Process(ctx, input)
        }
        
        // Record for continuous learning
        p.monitor.Record(input, output, err)
        
        return output, err
    }

    3.2 Observability: Full Execution Tracing

    type ExecutionTracer struct {
        storage  TraceStorage
        exporter MetricsExporter
    }
    
    func (t *ExecutionTracer) TraceExecution(ctx context.Context, program core.Module, input map[string]interface{}) (map[string]interface{}, *Trace, error) {
        // Enable tracing
        ctx = core.WithExecutionState(ctx)
        
        // Execute with timing
        start := time.Now()
        output, err := program.Process(ctx, input)
        duration := time.Since(start)
        
        // Extract trace
        state := core.GetExecutionState(ctx)
        trace := &Trace{
            ID:        uuid.New().String(),
            Timestamp: start,
            Duration:  duration,
            Input:     input,
            Output:    output,
            Error:     err,
            Steps:     t.extractSteps(state),
            Metrics:   t.calculateMetrics(state),
        }
        
        // Store and export
        t.storage.Save(trace)
        t.exporter.Export(trace.Metrics)
        
        return output, trace, err
    }
    
    func (t *ExecutionTracer) extractSteps(state *core.ExecutionState) []Step {
        steps := []Step{}
        
        for _, moduleStep := range state.GetAllSteps() {
            steps = append(steps, Step{
                Module:     moduleStep.ModuleID,
                Prompt:     moduleStep.Prompt,
                Response:   moduleStep.Response,
                TokenCount: moduleStep.TokenCount,
                Latency:    moduleStep.Duration,
                ToolCalls:  moduleStep.ToolCalls,
            })
        }
        
        return steps
    }

    3.3 Active Learning: Smart Tool Registry

    type AdaptiveAgent struct {
        reactor  *modules.ReAct
        registry *tools.SmartToolRegistry
        learner  *BayesianLearner
        memory   *memory.BufferMemory
    }
    
    func (a *AdaptiveAgent) ExecuteWithLearning(ctx context.Context, task string) (map[string]interface{}, error) {
        // Trace execution for learning
        ctx = core.WithExecutionState(ctx)
        
        // Select tools based on learned preferences
        selectedTools := a.registry.SelectBest(ctx, task, 3)
        
        // Execute with selected tools
        result, err := a.reactor.Process(ctx, map[string]interface{}{
            "task":   task,
            "tools":  selectedTools,
            "memory": a.memory.Get(ctx),
        })
        
        // Extract performance data
        trace := core.GetExecutionState(ctx)
        toolUsage := a.extractToolUsage(trace)
        
        // Update tool preferences based on outcome
        success := err == nil && a.assessSuccess(result)
        for tool, usage := range toolUsage {
            if success {
                a.registry.Promote(tool, usage.Effectiveness)
            } else {
                a.registry.Demote(tool, usage.FailureReason)
            }
        }
        
        // Bayesian update for future predictions
        a.learner.Update(task, selectedTools, success)
        
        // Persist learned state
        a.registry.Save("tool_preferences.json")
        a.learner.Save("bayesian_model.json")
        
        return result, err
    }

    Part 4: Production Patterns & Best Practices

    4.1 Deployment Architecture

    // Configuration management
    type ProductionConfig struct {
        CompiledPath   string
        CacheDir       string
        MaxWorkers     int
        TimeoutSeconds int
        RetryPolicy    RetryConfig
    }
    
    // Production server with all optimizations
    type ProductionServer struct {
        config    ProductionConfig
        program   *CompiledProgram
        cache     *cache.MultiLevelCache
        pool      *RequestPool
        monitor   *RuntimeMonitor
        optimizer *ContinuousOptimizer
    }
    
    func (s *ProductionServer) Initialize() error {
        // Load compiled program
        s.program = modules.Load(s.config.CompiledPath)
        
        // Configure caching
        s.cache = cache.NewMultiLevelCache(
            cache.WithMemoryCache(1000),
            cache.WithDiskCache(s.config.CacheDir),
            cache.WithSemanticDedup(true),
        )
        
        // Setup request pooling
        s.pool = NewRequestPool(
            WithMaxWorkers(s.config.MaxWorkers),
            WithBatchWindow(10 * time.Millisecond),
            WithSignatureGrouping(true),
        )
        
        // Initialize monitoring
        s.monitor = NewRuntimeMonitor(
            WithPrometheus(true),
            WithTracing(true),
            WithAnomalyDetection(true),
        )
        
        // Setup continuous optimization
        s.optimizer = NewContinuousOptimizer(
            WithReoptimizationInterval(24 * time.Hour),
            WithMinExamples(1000),
            WithOptimizer(optimizers.NewSIMBA()),
        )
        
        return nil
    }

    4.2 Performance Optimization Checklist

  • Compile-Time Optimizations
  • - [ ] Use teacher-student pattern for cost reduction - [ ] Optimize with 200+ diverse examples minimum - [ ] Test multiple optimizers (MIPRO, GEPA, SIMBA) - [ ] Save compiled artifacts for production

  • Runtime Optimizations
  • - [ ] Implement request pooling (10-50x reduction) - [ ] Enable semantic caching (avoid duplicate work) - [ ] Use BestOfN/Refine selectively based on criticality - [ ] Parallelize independent operations

  • Continuous Learning
  • - [ ] Enable execution tracing in production - [ ] Implement feedback loops for tool selection - [ ] Schedule periodic reoptimization - [ ] Monitor drift in performance metrics

    4.3 Common Patterns by Use Case

    Pattern: Log Analysis Pipeline

    type LogAnalyzer struct {
        classifier   *modules.ChainOfThought  // Categorize errors
        rootCauser   *modules.ReAct          // Find root cause
        recommender  *modules.Refine         // Suggest fixes
    }

    Pattern: Incident Response System

    type IncidentResponder struct {
        detector     *modules.Predict        // Detect incident type
        orchestrator *workflows.ChainWorkflow // Execute response
        narrator     *modules.ChainOfThought // Generate report
    }

    Pattern: Code Review Agent

    type CodeReviewer struct {
        styleChecker     *modules.Predict      // Check style guide
        securityScanner  *modules.ReAct       // Security analysis
        perfAnalyzer     *modules.ChainOfThought // Performance review
        synthesizer      *modules.Refine      // Combine feedback
    }

    Part 5: Retrieval-Augmented Generation (RAG) Architecture

    5.0 Getting Started: Minimal RAG Pattern

    Before diving into production complexity, here's the simplest viable RAG implementation:

    // Minimal RAG Signature
    type SimpleRAGSignature struct {
        Question string `dspy:"input" desc:"User question"`
        Context  string `dspy:"input" desc:"Retrieved documents"`
        Answer   string `dspy:"output" desc:"Answer using context"`
    }
    
    // Basic RAG Module - Direct equivalent to Python example
    type BasicRAGModule struct {
        retrieve *modules.Retrieve
        predict  *modules.Predict
    }
    
    func NewBasicRAG() *BasicRAGModule {
        // Simple retriever with k=3 documents
        retriever := modules.NewRetrieve(
            modules.WithTopK(3),
        )
        
        signature := core.NewSignature(
            []core.InputField{
                {Field: core.NewField("question")},
                {Field: core.NewField("context")},
            },
            []core.OutputField{
                {Field: core.NewField("answer")},
            },
        )
        
        return &BasicRAGModule{
            retrieve: retriever,
            predict:  modules.NewPredict(signature),
        }
    }
    
    func (r *BasicRAGModule) Forward(ctx context.Context, question string) (string, error) {
        // 1. Retrieve documents
        docs := r.retrieve.Process(ctx, map[string]interface{}{
            "query": question,
        })
        
        // 2. Join documents as context
        passages := docs["passages"].([]string)
        context := strings.Join(passages, "\n")
        
        // 3. Generate answer using context
        result := r.predict.Process(ctx, map[string]interface{}{
            "question": question,
            "context":  context,
        })
        
        return result["answer"].(string), nil
    }
    
    // Usage
    rag := NewBasicRAG()
    answer, _ := rag.Forward(ctx, "What is DSPy?")

    Progressive Enhancement Path

    Step 1: Add Basic Assertions

    // Simple citation assertion - equivalent to Python lambda
    func citationAssertion(output map[string]interface{}) bool {
        answer := output["answer"].(string)
        context := output["context"].(string)
        
        // Check if answer contains any snippet from context
        for _, line := range strings.Split(context, "\n") {
            if len(line) > 10 && strings.Contains(answer, line[:10]) {
                return true
            }
        }
        return false
    }
    
    // Wrap with assertion
    assertedRAG := modules.NewWithAssertion(
        basicRAG,
        citationAssertion,
        "Answer must cite context",
    )

    Step 2: Apply Optimization

    // Optimize with MIPROv2 - direct equivalent to Python
    optimizer := optimizers.NewMIPRO(
        metricFunc,
        optimizers.WithMode(optimizers.LightMode),
    )
    
    optimizedRAG, _ := optimizer.Compile(
        ctx,
        basicRAG,
        trainExamples, // 20 Q/A pairs
        nil,
    )

    Step 3: Add Multi-Chain Comparison

    // Test different k values
    multiRAG := NewMultiChainRAG(
        []int{3, 5, 10}, // Different k values
        basicRAG,
    )
    
    bestAnswer := multiRAG.SelectBest(ctx, question)

    RAG combines retrieval with generation, creating a knowledge-grounded inference pipeline. DSPy-Go provides first-class support through composable modules.

    // RAG Signature with context grounding
    type RAGSignature struct {
        Question string `dspy:"input" desc:"User question requiring factual answer"`
        Context  string `dspy:"input" desc:"Retrieved documents as evidence"`
        Answer   string `dspy:"output" desc:"Answer grounded in context"`
        Citations []string `dspy:"output" desc:"Specific quotes from context"`
    }
    
    // Production RAG Module
    type RAGModule struct {
        retriever   *modules.Retrieve
        reranker    *modules.Predict
        generator   *modules.ChainOfThought
        validator   *modules.Predict
        cache       *cache.SemanticCache
    }
    
    func NewRAGModule(vectorDB VectorDatabase) *RAGModule {
        // Configure retriever with vector database
        retriever := modules.NewRetrieve(
            modules.WithVectorDB(vectorDB),
            modules.WithTopK(5),
            modules.WithSimilarityThreshold(0.7),
        )
        
        // Reranker for relevance optimization
        rerankerSig := core.NewSignature(
            []core.InputField{
                {Field: core.NewField("query")},
                {Field: core.NewField("documents")},
            },
            []core.OutputField{
                {Field: core.NewField("ranked_docs")},
                {Field: core.NewField("relevance_scores")},
            },
        )
        
        return &RAGModule{
            retriever: retriever,
            reranker:  modules.NewPredict(rerankerSig),
            generator: modules.NewChainOfThought(RAGSignature{}),
            validator: modules.NewPredict(CitationValidatorSignature{}),
            cache:     cache.NewSemanticCache(10000),
        }
    }
    
    func (r *RAGModule) Forward(ctx context.Context, question string) (map[string]interface{}, error) {
        // Check cache first
        if cached := r.cache.Get(question); cached != nil {
            return cached.(map[string]interface{}), nil
        }
        
        // 1. Retrieve documents
        docs := r.retriever.Process(ctx, map[string]interface{}{
            "query": question,
        })
        
        // 2. Rerank for relevance
        ranked := r.reranker.Process(ctx, map[string]interface{}{
            "query":     question,
            "documents": docs["passages"],
        })
        
        // 3. Generate answer with context
        context := strings.Join(ranked["ranked_docs"].([]string), "\n\n")
        answer := r.generator.Process(ctx, map[string]interface{}{
            "question": question,
            "context":  context,
        })
        
        // 4. Validate citations
        validated := r.validator.Process(ctx, map[string]interface{}{
            "answer":    answer["answer"],
            "citations": answer["citations"],
            "context":   context,
        })
        
        result := map[string]interface{}{
            "answer":    validated["answer"],
            "citations": validated["citations"],
            "confidence": validated["confidence"],
        }
        
        // Cache high-quality answers
        if validated["confidence"].(float64) > 0.8 {
            r.cache.Store(question, result)
        }
        
        return result, nil
    }

    5.2 RAG with Assertions and Constraints

    // Citation Assertion ensures answers are grounded
    type CitationAssertion struct {
        minCitations int
        maxLength    int
    }
    
    func (c *CitationAssertion) Validate(output map[string]interface{}) bool {
        citations := output["citations"].([]string)
        answer := output["answer"].(string)
        context := output["context"].(string)
        
        // Must have minimum citations
        if len(citations) < c.minCitations {
            return false
        }
        
        // Each citation must exist in context
        for _, citation := range citations {
            if !strings.Contains(context, citation) {
                return false
            }
        }
        
        // Answer must reference cited material
        for _, citation := range citations {
            keywords := extractKeywords(citation)
            if !containsAnyKeyword(answer, keywords) {
                return false
            }
        }
        
        return true
    }
    
    // EnforcedRAG wraps RAG with citation enforcement
    type EnforcedRAG struct {
        base      *RAGModule
        assertion *CitationAssertion
        refiner   *modules.Refine
    }
    
    func NewEnforcedRAG(vectorDB VectorDatabase) *EnforcedRAG {
        base := NewRAGModule(vectorDB)
        assertion := &CitationAssertion{
            minCitations: 2,
            maxLength:    500,
        }
        
        // Refine with citation reward
        refiner := modules.NewRefine(
            base,
            modules.RefineConfig{
                N: 3,
                RewardFn: func(args, pred map[string]interface{}) float64 {
                    if !assertion.Validate(pred) {
                        return 0.0
                    }
                    
                    // Score based on citation quality
                    citations := pred["citations"].([]string)
                    score := float64(len(citations)) / 5.0 // Max 5 citations
                    
                    // Bonus for answer coherence
                    if assessCoherence(pred["answer"].(string)) {
                        score += 0.3
                    }
                    
                    return math.Min(score, 1.0)
                },
                Threshold: 0.8,
            },
        )
        
        return &EnforcedRAG{
            base:      base,
            assertion: assertion,
            refiner:   refiner,
        }
    }

    5.3 Optimized RAG with MIPRO

    // RAGOptimizer uses MIPRO to tune retrieval and generation
    type RAGOptimizer struct {
        optimizer *optimizers.MIPRO
        dataset   *RAGDataset
    }
    
    func (o *RAGOptimizer) Optimize(ctx context.Context, rag *RAGModule) (*RAGModule, error) {
        // Define composite metric
        metric := func(example, prediction map[string]interface{}) float64 {
            score := 0.0
            
            // Correctness (40%)
            if prediction["answer"] == example["answer"] {
                score += 0.4
            }
            
            // Citation accuracy (30%)
            citations := prediction["citations"].([]string)
            for _, cite := range citations {
                if strings.Contains(example["context"].(string), cite) {
                    score += 0.1
                }
            }
            
            // Relevance (30%)
            relevance := assessRelevance(
                example["question"].(string),
                prediction["answer"].(string),
            )
            score += relevance * 0.3
            
            return score
        }
        
        // Optimize with MIPRO
        optimized, err := o.optimizer.Compile(
            ctx,
            rag,
            o.dataset,
            metric,
        )
        
        return optimized.(*RAGModule), err
    }

    5.4 Multi-Chain RAG Comparison

    // MultiChainRAG runs multiple retrieval strategies in parallel
    type MultiChainRAG struct {
        chains []RAGChain
        voter  *ConsensusVoter
    }
    
    type RAGChain struct {
        Name      string
        Retriever *modules.Retrieve
        TopK      int
        Rerank    bool
    }
    
    func NewMultiChainRAG() *MultiChainRAG {
        return &MultiChainRAG{
            chains: []RAGChain{
                {Name: "dense", TopK: 3, Rerank: false},
                {Name: "sparse", TopK: 5, Rerank: true},
                {Name: "hybrid", TopK: 10, Rerank: true},
            },
            voter: NewConsensusVoter(0.66),
        }
    }
    
    func (m *MultiChainRAG) Process(ctx context.Context, question string) (map[string]interface{}, error) {
        results := make([]map[string]interface{}, len(m.chains))
        var wg sync.WaitGroup
        
        // Run chains in parallel
        for i, chain := range m.chains {
            wg.Add(1)
            go func(idx int, c RAGChain) {
                defer wg.Done()
                
                // Retrieve with chain-specific strategy
                docs := c.Retriever.Process(ctx, map[string]interface{}{
                    "query": question,
                    "top_k": c.TopK,
                })
                
                // Optional reranking
                if c.Rerank {
                    docs = rerank(ctx, question, docs)
                }
                
                // Generate answer
                answer := generateAnswer(ctx, question, docs)
                results[idx] = answer
            }(i, chain)
        }
        
        wg.Wait()
        
        // Vote on best answer
        return m.voter.SelectBest(results), nil
    }

    5.5 Hybrid RAG with Structured Hints

    // HybridRAGSignature includes metadata for better grounding
    type HybridRAGSignature struct {
        Question     string            `dspy:"input"`
        Context      string            `dspy:"input"`
        Metadata     map[string]string `dspy:"input" desc:"Document metadata"`
        UserRole     string            `dspy:"input" desc:"User's role/permissions"`
        Tags         []string          `dspy:"input" desc:"Relevant tags"`
        Answer       string            `dspy:"output"`
        Confidence   float64           `dspy:"output" min:"0.0" max:"1.0"`
        Sources      []string          `dspy:"output" desc:"Source document IDs"`
    }
    
    // ProductionHybridRAG with all enhancements
    type ProductionHybridRAG struct {
        vectorDB     VectorDatabase
        metadataDB   MetadataDatabase
        preprocessor *QueryPreprocessor
        multichain   *MultiChainRAG
        enforcer     *EnforcedRAG
        optimizer    *RAGOptimizer
        monitor      *RAGMonitor
    }
    
    func (p *ProductionHybridRAG) Query(ctx context.Context, input QueryInput) (*QueryOutput, error) {
        // Preprocess query
        processed := p.preprocessor.Process(input.Question)
        
        // Extract metadata hints
        metadata := p.metadataDB.GetMetadata(processed.Keywords)
        
        // Run multi-chain retrieval
        candidates := p.multichain.Process(ctx, processed.Query)
        
        // Enforce citations
        validated := p.enforcer.Process(ctx, map[string]interface{}{
            "question":  input.Question,
            "context":   candidates["context"],
            "metadata":  metadata,
            "user_role": input.UserRole,
            "tags":      processed.Tags,
        })
        
        // Monitor performance
        p.monitor.Record(input, validated)
        
        // Trigger reoptimization if needed
        if p.monitor.ShouldReoptimize() {
            go p.optimizer.Optimize(ctx, p.enforcer.base)
        }
        
        return &QueryOutput{
            Answer:     validated["answer"].(string),
            Citations:  validated["citations"].([]string),
            Confidence: validated["confidence"].(float64),
            Sources:    validated["sources"].([]string),
        }, nil
    }

    5.6 RAG Backend Integration

    // Vector Database Adapters
    type VectorDBAdapter interface {
        Search(ctx context.Context, query string, k int) ([]Document, error)
        Index(ctx context.Context, docs []Document) error
    }
    
    // FAISS Adapter
    type FAISSAdapter struct {
        index  *faiss.Index
        mapper *DocumentMapper
    }
    
    func (f *FAISSAdapter) Search(ctx context.Context, query string, k int) ([]Document, error) {
        embedding := f.mapper.Embed(query)
        distances, indices := f.index.Search(embedding, k)
        return f.mapper.GetDocuments(indices), nil
    }
    
    // Elasticsearch Adapter
    type ElasticsearchAdapter struct {
        client *elastic.Client
        index  string
    }
    
    func (e *ElasticsearchAdapter) Search(ctx context.Context, query string, k int) ([]Document, error) {
        searchResult, err := e.client.Search().
            Index(e.index).
            Query(elastic.NewMatchQuery("content", query)).
            Size(k).
            Do(ctx)
        
        if err != nil {
            return nil, err
        }
        
        return e.parseResults(searchResult), nil
    }
    
    // Weaviate Adapter
    type WeaviateAdapter struct {
        client *weaviate.Client
        class  string
    }
    
    func (w *WeaviateAdapter) Search(ctx context.Context, query string, k int) ([]Document, error) {
        result, err := w.client.GraphQL().
            Get().
            WithClassName(w.class).
            WithNearText(&graphql.NearTextArgument{
                Concepts: []string{query},
            }).
            WithLimit(k).
            Do(ctx)
        
        return w.parseResults(result), err
    }

    5.7 RAG Performance Optimization

    // Optimized retrieval with caching and prefetching
    type OptimizedRetriever struct {
        vectorDB    VectorDBAdapter
        cache       *cache.MultiLevelCache
        prefetcher  *Prefetcher
        embedder    *Embedder
    }
    
    func (o *OptimizedRetriever) Retrieve(ctx context.Context, query string) ([]Document, error) {
        // Check cache
        cacheKey := o.generateCacheKey(query)
        if cached := o.cache.Get(cacheKey); cached != nil {
            return cached.([]Document), nil
        }
        
        // Prefetch related queries
        o.prefetcher.PrefetchRelated(query)
        
        // Parallel search across multiple indices
        results := o.parallelSearch(ctx, query)
        
        // Cache results
        o.cache.Store(cacheKey, results)
        
        return results, nil
    }
    
    func (o *OptimizedRetriever) parallelSearch(ctx context.Context, query string) []Document {
        var wg sync.WaitGroup
        resultChan := make(chan []Document, 3)
        
        // Search dense index
        wg.Add(1)
        go func() {
            defer wg.Done()
            docs, _ := o.vectorDB.Search(ctx, query, 5)
            resultChan <- docs
        }()
        
        // Search sparse index (BM25)
        wg.Add(1)
        go func() {
            defer wg.Done()
            docs, _ := o.searchSparse(ctx, query, 5)
            resultChan <- docs
        }()
        
        // Search metadata
        wg.Add(1)
        go func() {
            defer wg.Done()
            docs, _ := o.searchMetadata(ctx, query, 5)
            resultChan <- docs
        }()
        
        wg.Wait()
        close(resultChan)
        
        // Merge and deduplicate results
        return o.mergeResults(resultChan)
    }

    RAG Deployment Checklist

    Architecture Requirements

  • [x] Vector database integration (FAISS/Elasticsearch/Weaviate)
  • [x] Multi-stage pipeline (retrieve → rerank → generate → validate)
  • [x] Citation enforcement through assertions
  • [x] Caching layer for semantic deduplication
  • [x] Parallel retrieval strategies
  • Optimization Steps

  • Collect Q&A pairs: 20+ examples with ground truth
  • Run MIPRO optimization: Tune retrieval and generation jointly
  • Enable multi-chain comparison: Test k=3, 5, 10 retrievals
  • Add citation assertions: Ensure grounded answers
  • Deploy with monitoring: Track relevance and accuracy
  • Performance Targets

  • Retrieval latency: < 50ms
  • Generation latency: < 500ms
  • Citation accuracy: > 90%
  • Cache hit rate: > 40%
  • Overall accuracy: > 85%
  • Performance Benchmarks

    | Operation | Before | After | Improvement | |-----------|--------|-------|-------------| | 1000 predictions | 300s | 8s | 37.5x | | Demo selection | 5ms | 0.1ms | 50x | | Assertion check | 300ms | 0.001ms | 300,000x | | Token usage | 500K | 25K | 20x |

    Implementation Checklist

  • Batch all LLM calls - Process in chunks of 50-100
  • Compile demonstrations once - Store embeddings permanently
  • Convert assertions to patterns - Regex first, LLM never
  • Pool requests by signature - 10ms batching window
  • Preload modules predictively - Start before needed
  • Cache aggressively - Memory cheaper than API calls
  • Parallelize everything - Goroutines are free
  • Advanced Modules

    ChainOfThought

  • Adds "reasoning" field automatically to signatures
  • Generates step-by-step rationale before final answer
  • Optimization: Cache reasoning patterns for similar inputs
  • Example: cot := modules.NewChainOfThought(signature)
  • ReAct (Reasoning & Acting)

  • Integrates tools with LLM reasoning loop
  • Max 5 iterations by default (configurable)
  • Pattern: Thought → Action → Observation → Loop
  • Tool Integration: Register custom tools implementing core.Tool interface
  • Example: react := modules.NewReAct(signature, []core.Tool{calculator, search})
  • MultiChainComparison

  • Compares N reasoning attempts (default 3)
  • Synthesizes holistic evaluation
  • Config: NewMultiChainComparison(signature, numChains, temperature)
  • Output: Combined rationale + best solution
  • Refine

  • Multiple attempts with varying temperatures
  • Custom reward functions for quality assessment
  • Config: N attempts, reward threshold, temperature range
  • Pattern: Generate → Score → Select best
  • Parallel Module

  • Wraps any module for concurrent batch processing
  • Config: WithMaxWorkers(n), WithStopOnFirstError(bool)
  • Performance: Linear speedup with worker count
  • Maintains input order in results
  • Sequential prediction loops
  • Re-generating demonstrations
  • LLM calls for assertion checking
  • Synchronous module loading
  • Unbatched API requests
  • Not using request pooling for same signatures
  • Missing parallel execution for independent ops
  • No caching of compiled modules
  • Critical Code Snippets

    // Batch wrapper
    func BatchPredict(p Predictor, inputs []string) []string
    
    // Demo caching
    var demoCache = &sync.Map{}
    
    // Parallel execution
    go func(idx int, m Module) { outputs[idx] = m.Forward(input) }

    Configuration Guidelines

  • Batch size: 50-100 items
  • Pool timeout: 10ms
  • Cache TTL: Infinite for demos
  • Goroutine pool: 2x CPU cores
  • Retry limit: 2 with exponential backoff
  • LLM Provider Configuration

    Anthropic Claude

    llm, _ := llms.NewAnthropicLLM("api-key", core.ModelAnthropicSonnet)

    Google Gemini (Multimodal)

    llm, _ := llms.NewGeminiLLM("api-key", "gemini-pro")

    OpenAI & Compatible APIs

    // Standard OpenAI
    llm, _ := llms.NewOpenAI(core.ModelOpenAIGPT4, "api-key")
    
    // LiteLLM (100+ LLMs)
    llm, _ := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
        llms.WithAPIKey("key"),
        llms.WithOpenAIBaseURL("http://localhost:4000"))
    
    // Azure OpenAI
    llm, _ := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
        llms.WithOpenAIBaseURL("https://resource.openai.azure.com"),
        llms.WithOpenAIPath("/openai/deployments/model/chat/completions"),
        llms.WithHeader("api-version", "2024-02-15-preview"))

    Local Models

    // Ollama
    llm, _ := llms.NewOllamaLLM("http://localhost:11434", "llama2")
    
    // LlamaCPP
    llm, _ := llms.NewLlamacppLLM("http://localhost:8080")
    
    // LocalAI
    llm, _ := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
        llms.WithOpenAIBaseURL("http://localhost:8080"),
        llms.WithOpenAIPath("/v1/chat/completions"))

    Setting Default LLM

    ``go llms.SetDefaultLLM(llm) // Or per-module module.SetLLM(llm) ## Optimizers Deep Dive ### BootstrapFewShot - Automatically selects high-quality examples from dataset - **Strategy**: Score examples → Select top K → Add to prompt - **Config**: Dataset, metric function, num_examples - **Use when**: You have labeled training data ### MIPRO (Multi-step Interactive Prompt Optimization) - Uses TPE (Tree-structured Parzen Estimator) search - **Modes**: Light (fast), Standard, Heavy (thorough) - **Config**: WithNumTrials(n), WithTPEGamma(0.25)`

  • Use when: Need systematic prompt optimization
  • SIMBA (Stochastic Introspective Mini-Batch Ascent)

  • Introspective learning with self-analysis
  • Config: Batch size, max steps, num candidates
  • Features: Introspection log, state tracking
  • Use when: Need insights into optimization process
  • GEPA (Generative Evolutionary Prompt Adaptation)

  • Multi-objective Pareto optimization
  • 7 Dimensions: Success rate, quality, efficiency, robustness, generalization, diversity, innovation
  • Config: Population size, generations, mutation/crossover rates
  • LLM Self-Reflection: Every N generations for prompt critique
  • Use when: Need optimal trade-offs across multiple objectives
  • COPRO (Collaborative Prompt Optimization)

  • Multi-module collaborative optimization
  • Use when: Optimizing interconnected modules