DSGo - Disgo is coming to a repo near you!
DSPy-Go Production Reference: The Complete Technical Guide
Architectural Foundation: From Prompting to Programming
The Paradigm Shift
DSPy transforms LLM development from artisanal prompt engineering to systematic software engineering. Prompts become learnable parameters in a computational graph, optimized against defined metrics rather than hand-tuned through trial and error.
Core Philosophy
- Declarative Contracts: Define what, not how
- Systematic Optimization: Replace guesswork with formal optimization loops
- Composable Modules: Build complex systems from simple, testable units
- Measurable Outcomes: Every decision backed by quantitative metrics
Part 1: Core Programming Model - Building the Computational Graph
1.1 Signatures: Declarative Task Definition
Signatures are atomic units of task definition - typed schemas that specify intent without dictating implementation.
// Simple signature with field descriptions
type BasicQA struct {
Question string `dspy:"input" desc:"User's question"`
Answer string `dspy:"output" desc:"Short factual answer, 1-5 words"`
}
// Complex signature with constraints
type ClassifyAndExtract struct {
Message string `dspy:"input"`
Category string `dspy:"output" literal:"Sales,Support,Technical"`
Urgency int `dspy:"output" min:"1" max:"5"`
Entities []string `dspy:"output" desc:"Company names identified"`
}
// Multi-stage signature with dependencies
type AnalysisSignature struct {
Document string `dspy:"input"`
Summary string `dspy:"output" max_words:"100"`
KeyPoints []string `dspy:"output" min_items:"3" max_items:"7"`
Sentiment float64 `dspy:"output" min:"0.0" max:"1.0"`
}
Best Practices:
literal
, min
, max
) for validation1.2 Modules: Execution Strategy Units
Core Module Types
// Predict: Direct transformation
predict := modules.NewPredict(signature)
// ChainOfThought: Adds reasoning field automatically
cot := modules.NewChainOfThought(signature)
// Transforms: input -> rationale -> output
// ReAct: Tool-augmented reasoning
react := modules.NewReAct(signature, []core.Tool{
calculator,
searchTool,
codeExecutor,
})
// ProgramOfThought: Mathematical reasoning
pot := modules.NewProgramOfThought(signature)
Module Composition Pattern
type RAGPipeline struct {
queryRewriter *modules.ChainOfThought
retriever *modules.Retrieve
reranker *modules.Predict
synthesizer *modules.ChainOfThought
validator *modules.Predict
}
func (p *RAGPipeline) Forward(ctx context.Context, question string) (map[string]interface{}, error) {
// 1. Rewrite query for better retrieval
rewritten := p.queryRewriter.Process(ctx, map[string]interface{}{
"question": question,
})
// 2. Retrieve documents
docs := p.retriever.Process(ctx, map[string]interface{}{
"query": rewritten["improved_query"],
"top_k": 10,
})
// 3. Rerank for relevance
ranked := p.reranker.Process(ctx, map[string]interface{}{
"query": question,
"documents": docs["documents"],
})
// 4. Synthesize answer
answer := p.synthesizer.Process(ctx, map[string]interface{}{
"question": question,
"documents": ranked["top_documents"],
})
// 5. Validate factuality
validated := p.validator.Process(ctx, map[string]interface{}{
"answer": answer["answer"],
"documents": ranked["top_documents"],
})
return validated, nil
}
Part 2: Compilation Engine - Systematic Optimization
2.1 Metrics: Quantifying Success
// Binary metric for exact match
func exactMatchMetric(example, prediction map[string]interface{}) float64 {
if prediction["answer"] == example["answer"] {
return 1.0
}
return 0.0
}
// Graduated scoring with partial credit
func gradedMetric(example, prediction map[string]interface{}) float64 {
score := 0.0
// Presence check (30%)
if prediction["answer"] != nil {
score += 0.3
}
// Correctness (50%)
if prediction["answer"] == example["answer"] {
score += 0.5
}
// Format compliance (20%)
if meetsFormatRequirements(prediction) {
score += 0.2
}
return score
}
// LLM-as-Judge for nuanced evaluation
type FactualityJudge struct {
judge *modules.ChainOfThought
}
func (f *FactualityJudge) Evaluate(example, prediction map[string]interface{}) float64 {
result := f.judge.Process(context.Background(), map[string]interface{}{
"statement": prediction["answer"],
"context": example["context"],
"question": example["question"],
})
confidence := result["confidence"].(float64)
isFactual := result["is_factual"].(bool)
if isFactual {
return confidence
}
return confidence * 0.3 // Penalize non-factual answers
}
// Feedback-driven metric for GEPA
func feedbackMetric(example, prediction map[string]interface{}) (float64, string) {
score := gradedMetric(example, prediction)
feedback := ""
if score < 0.5 {
feedback = analyzeFailure(example, prediction)
} else if score < 0.8 {
feedback = suggestImprovements(example, prediction)
}
return score, feedback
}
2.2 Optimizer Configurations
// BootstrapFewShot: Standard workhorse
bootstrap := optimizers.NewBootstrapFewShot(
dataset,
metricFunc,
optimizers.WithMaxBootstrappedDemos(8),
optimizers.WithMaxLabeledDemos(3),
)
// MIPROv2: Bayesian optimization
mipro := optimizers.NewMIPRO(
metricFunc,
optimizers.WithMode(optimizers.StandardMode),
optimizers.WithNumTrials(20),
optimizers.WithTPEGamma(0.25),
)
// GEPA: Reflective evolution with feedback
gepa := optimizers.NewGEPA(&optimizers.GEPAConfig{
PopulationSize: 20,
MaxGenerations: 15,
SelectionStrategy: "adaptive_pareto",
MutationRate: 0.3,
CrossoverRate: 0.7,
ReflectionFreq: 3, // LLM reflection every 3 generations
ElitismRate: 0.1,
FeedbackMetric: feedbackMetric, // Uses textual feedback
})
// SIMBA: Introspective mini-batch ascent
simba := optimizers.NewSIMBA(
optimizers.WithSIMBABatchSize(8),
optimizers.WithSIMBAMaxSteps(12),
optimizers.WithSIMBANumCandidates(6),
optimizers.WithIntrospection(true), // Enable self-analysis
)
2.3 Teacher-Student Optimization Pattern
type TeacherStudentOptimizer struct {
teacher core.LLM // Large, expensive model
student core.LLM // Small, efficient model
optimizer *optimizers.MIPROv2
}
func (t *TeacherStudentOptimizer) Compile(ctx context.Context, program core.Module, dataset core.Dataset) (*CompiledProgram, error) {
// Phase 1: Generate high-quality training data with teacher
teacherData := t.generateTeacherData(ctx, dataset)
// Phase 2: Use teacher for prompt generation
t.optimizer.SetPromptModel(t.teacher)
t.optimizer.SetTaskModel(t.student)
// Phase 3: Compile student with teacher's knowledge
compiled, err := t.optimizer.Compile(ctx, program, teacherData, nil)
if err != nil {
return nil, err
}
// Phase 4: Distillation - further optimize student
distilled := t.distill(ctx, compiled, teacherData)
return distilled, nil
}
func (t *TeacherStudentOptimizer) generateTeacherData(ctx context.Context, dataset core.Dataset) core.Dataset {
enriched := datasets.NewInMemoryDataset()
for _, example := range dataset.GetExamples() {
// Use teacher to generate high-quality outputs
teacherOutput := t.teacher.Generate(ctx, example.Input)
// Add reasoning traces
enriched.AddExample(map[string]interface{}{
"input": example.Input,
"output": teacherOutput,
"reasoning": t.extractReasoning(teacherOutput),
"confidence": t.assessConfidence(teacherOutput),
})
}
return enriched
}
Part 3: Runtime Environment - Reliability & Continuous Learning
3.1 Runtime Self-Correction Architecture
// Production deployment with layered reliability
type ProductionSystem struct {
compiled *CompiledProgram // Optimized base program
bestOfN *modules.BestOfN // Diversity-based retry
refine *modules.Refine // Feedback-based iteration
monitor *RuntimeMonitor // Performance tracking
}
func (p *ProductionSystem) Process(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error) {
// Layer 1: Try compiled program
output, err := p.compiled.Process(ctx, input)
if err == nil && p.monitor.AssessQuality(output) > 0.8 {
return output, nil
}
// Layer 2: BestOfN for diversity
if err != nil || p.monitor.AssessQuality(output) < 0.6 {
output, err = p.bestOfN.Process(ctx, input)
if err == nil && p.monitor.AssessQuality(output) > 0.8 {
return output, nil
}
}
// Layer 3: Refine with feedback
if p.monitor.AssessQuality(output) < 0.8 {
output, err = p.refine.Process(ctx, input)
}
// Record for continuous learning
p.monitor.Record(input, output, err)
return output, err
}
3.2 Observability: Full Execution Tracing
type ExecutionTracer struct {
storage TraceStorage
exporter MetricsExporter
}
func (t *ExecutionTracer) TraceExecution(ctx context.Context, program core.Module, input map[string]interface{}) (map[string]interface{}, *Trace, error) {
// Enable tracing
ctx = core.WithExecutionState(ctx)
// Execute with timing
start := time.Now()
output, err := program.Process(ctx, input)
duration := time.Since(start)
// Extract trace
state := core.GetExecutionState(ctx)
trace := &Trace{
ID: uuid.New().String(),
Timestamp: start,
Duration: duration,
Input: input,
Output: output,
Error: err,
Steps: t.extractSteps(state),
Metrics: t.calculateMetrics(state),
}
// Store and export
t.storage.Save(trace)
t.exporter.Export(trace.Metrics)
return output, trace, err
}
func (t *ExecutionTracer) extractSteps(state *core.ExecutionState) []Step {
steps := []Step{}
for _, moduleStep := range state.GetAllSteps() {
steps = append(steps, Step{
Module: moduleStep.ModuleID,
Prompt: moduleStep.Prompt,
Response: moduleStep.Response,
TokenCount: moduleStep.TokenCount,
Latency: moduleStep.Duration,
ToolCalls: moduleStep.ToolCalls,
})
}
return steps
}
3.3 Active Learning: Smart Tool Registry
type AdaptiveAgent struct {
reactor *modules.ReAct
registry *tools.SmartToolRegistry
learner *BayesianLearner
memory *memory.BufferMemory
}
func (a *AdaptiveAgent) ExecuteWithLearning(ctx context.Context, task string) (map[string]interface{}, error) {
// Trace execution for learning
ctx = core.WithExecutionState(ctx)
// Select tools based on learned preferences
selectedTools := a.registry.SelectBest(ctx, task, 3)
// Execute with selected tools
result, err := a.reactor.Process(ctx, map[string]interface{}{
"task": task,
"tools": selectedTools,
"memory": a.memory.Get(ctx),
})
// Extract performance data
trace := core.GetExecutionState(ctx)
toolUsage := a.extractToolUsage(trace)
// Update tool preferences based on outcome
success := err == nil && a.assessSuccess(result)
for tool, usage := range toolUsage {
if success {
a.registry.Promote(tool, usage.Effectiveness)
} else {
a.registry.Demote(tool, usage.FailureReason)
}
}
// Bayesian update for future predictions
a.learner.Update(task, selectedTools, success)
// Persist learned state
a.registry.Save("tool_preferences.json")
a.learner.Save("bayesian_model.json")
return result, err
}
Part 4: Production Patterns & Best Practices
4.1 Deployment Architecture
// Configuration management
type ProductionConfig struct {
CompiledPath string
CacheDir string
MaxWorkers int
TimeoutSeconds int
RetryPolicy RetryConfig
}
// Production server with all optimizations
type ProductionServer struct {
config ProductionConfig
program *CompiledProgram
cache *cache.MultiLevelCache
pool *RequestPool
monitor *RuntimeMonitor
optimizer *ContinuousOptimizer
}
func (s *ProductionServer) Initialize() error {
// Load compiled program
s.program = modules.Load(s.config.CompiledPath)
// Configure caching
s.cache = cache.NewMultiLevelCache(
cache.WithMemoryCache(1000),
cache.WithDiskCache(s.config.CacheDir),
cache.WithSemanticDedup(true),
)
// Setup request pooling
s.pool = NewRequestPool(
WithMaxWorkers(s.config.MaxWorkers),
WithBatchWindow(10 * time.Millisecond),
WithSignatureGrouping(true),
)
// Initialize monitoring
s.monitor = NewRuntimeMonitor(
WithPrometheus(true),
WithTracing(true),
WithAnomalyDetection(true),
)
// Setup continuous optimization
s.optimizer = NewContinuousOptimizer(
WithReoptimizationInterval(24 * time.Hour),
WithMinExamples(1000),
WithOptimizer(optimizers.NewSIMBA()),
)
return nil
}
4.2 Performance Optimization Checklist
- [ ] Use teacher-student pattern for cost reduction - [ ] Optimize with 200+ diverse examples minimum - [ ] Test multiple optimizers (MIPRO, GEPA, SIMBA) - [ ] Save compiled artifacts for production
- [ ] Implement request pooling (10-50x reduction) - [ ] Enable semantic caching (avoid duplicate work) - [ ] Use BestOfN/Refine selectively based on criticality - [ ] Parallelize independent operations
- [ ] Enable execution tracing in production - [ ] Implement feedback loops for tool selection - [ ] Schedule periodic reoptimization - [ ] Monitor drift in performance metrics
4.3 Common Patterns by Use Case
Pattern: Log Analysis Pipeline
type LogAnalyzer struct {
classifier *modules.ChainOfThought // Categorize errors
rootCauser *modules.ReAct // Find root cause
recommender *modules.Refine // Suggest fixes
}
Pattern: Incident Response System
type IncidentResponder struct {
detector *modules.Predict // Detect incident type
orchestrator *workflows.ChainWorkflow // Execute response
narrator *modules.ChainOfThought // Generate report
}
Pattern: Code Review Agent
type CodeReviewer struct {
styleChecker *modules.Predict // Check style guide
securityScanner *modules.ReAct // Security analysis
perfAnalyzer *modules.ChainOfThought // Performance review
synthesizer *modules.Refine // Combine feedback
}
Part 5: Retrieval-Augmented Generation (RAG) Architecture
5.0 Getting Started: Minimal RAG Pattern
Before diving into production complexity, here's the simplest viable RAG implementation:
// Minimal RAG Signature
type SimpleRAGSignature struct {
Question string `dspy:"input" desc:"User question"`
Context string `dspy:"input" desc:"Retrieved documents"`
Answer string `dspy:"output" desc:"Answer using context"`
}
// Basic RAG Module - Direct equivalent to Python example
type BasicRAGModule struct {
retrieve *modules.Retrieve
predict *modules.Predict
}
func NewBasicRAG() *BasicRAGModule {
// Simple retriever with k=3 documents
retriever := modules.NewRetrieve(
modules.WithTopK(3),
)
signature := core.NewSignature(
[]core.InputField{
{Field: core.NewField("question")},
{Field: core.NewField("context")},
},
[]core.OutputField{
{Field: core.NewField("answer")},
},
)
return &BasicRAGModule{
retrieve: retriever,
predict: modules.NewPredict(signature),
}
}
func (r *BasicRAGModule) Forward(ctx context.Context, question string) (string, error) {
// 1. Retrieve documents
docs := r.retrieve.Process(ctx, map[string]interface{}{
"query": question,
})
// 2. Join documents as context
passages := docs["passages"].([]string)
context := strings.Join(passages, "\n")
// 3. Generate answer using context
result := r.predict.Process(ctx, map[string]interface{}{
"question": question,
"context": context,
})
return result["answer"].(string), nil
}
// Usage
rag := NewBasicRAG()
answer, _ := rag.Forward(ctx, "What is DSPy?")
Progressive Enhancement Path
Step 1: Add Basic Assertions
// Simple citation assertion - equivalent to Python lambda
func citationAssertion(output map[string]interface{}) bool {
answer := output["answer"].(string)
context := output["context"].(string)
// Check if answer contains any snippet from context
for _, line := range strings.Split(context, "\n") {
if len(line) > 10 && strings.Contains(answer, line[:10]) {
return true
}
}
return false
}
// Wrap with assertion
assertedRAG := modules.NewWithAssertion(
basicRAG,
citationAssertion,
"Answer must cite context",
)
Step 2: Apply Optimization
// Optimize with MIPROv2 - direct equivalent to Python
optimizer := optimizers.NewMIPRO(
metricFunc,
optimizers.WithMode(optimizers.LightMode),
)
optimizedRAG, _ := optimizer.Compile(
ctx,
basicRAG,
trainExamples, // 20 Q/A pairs
nil,
)
Step 3: Add Multi-Chain Comparison
// Test different k values
multiRAG := NewMultiChainRAG(
[]int{3, 5, 10}, // Different k values
basicRAG,
)
bestAnswer := multiRAG.SelectBest(ctx, question)
RAG combines retrieval with generation, creating a knowledge-grounded inference pipeline. DSPy-Go provides first-class support through composable modules.
// RAG Signature with context grounding
type RAGSignature struct {
Question string `dspy:"input" desc:"User question requiring factual answer"`
Context string `dspy:"input" desc:"Retrieved documents as evidence"`
Answer string `dspy:"output" desc:"Answer grounded in context"`
Citations []string `dspy:"output" desc:"Specific quotes from context"`
}
// Production RAG Module
type RAGModule struct {
retriever *modules.Retrieve
reranker *modules.Predict
generator *modules.ChainOfThought
validator *modules.Predict
cache *cache.SemanticCache
}
func NewRAGModule(vectorDB VectorDatabase) *RAGModule {
// Configure retriever with vector database
retriever := modules.NewRetrieve(
modules.WithVectorDB(vectorDB),
modules.WithTopK(5),
modules.WithSimilarityThreshold(0.7),
)
// Reranker for relevance optimization
rerankerSig := core.NewSignature(
[]core.InputField{
{Field: core.NewField("query")},
{Field: core.NewField("documents")},
},
[]core.OutputField{
{Field: core.NewField("ranked_docs")},
{Field: core.NewField("relevance_scores")},
},
)
return &RAGModule{
retriever: retriever,
reranker: modules.NewPredict(rerankerSig),
generator: modules.NewChainOfThought(RAGSignature{}),
validator: modules.NewPredict(CitationValidatorSignature{}),
cache: cache.NewSemanticCache(10000),
}
}
func (r *RAGModule) Forward(ctx context.Context, question string) (map[string]interface{}, error) {
// Check cache first
if cached := r.cache.Get(question); cached != nil {
return cached.(map[string]interface{}), nil
}
// 1. Retrieve documents
docs := r.retriever.Process(ctx, map[string]interface{}{
"query": question,
})
// 2. Rerank for relevance
ranked := r.reranker.Process(ctx, map[string]interface{}{
"query": question,
"documents": docs["passages"],
})
// 3. Generate answer with context
context := strings.Join(ranked["ranked_docs"].([]string), "\n\n")
answer := r.generator.Process(ctx, map[string]interface{}{
"question": question,
"context": context,
})
// 4. Validate citations
validated := r.validator.Process(ctx, map[string]interface{}{
"answer": answer["answer"],
"citations": answer["citations"],
"context": context,
})
result := map[string]interface{}{
"answer": validated["answer"],
"citations": validated["citations"],
"confidence": validated["confidence"],
}
// Cache high-quality answers
if validated["confidence"].(float64) > 0.8 {
r.cache.Store(question, result)
}
return result, nil
}
5.2 RAG with Assertions and Constraints
// Citation Assertion ensures answers are grounded
type CitationAssertion struct {
minCitations int
maxLength int
}
func (c *CitationAssertion) Validate(output map[string]interface{}) bool {
citations := output["citations"].([]string)
answer := output["answer"].(string)
context := output["context"].(string)
// Must have minimum citations
if len(citations) < c.minCitations {
return false
}
// Each citation must exist in context
for _, citation := range citations {
if !strings.Contains(context, citation) {
return false
}
}
// Answer must reference cited material
for _, citation := range citations {
keywords := extractKeywords(citation)
if !containsAnyKeyword(answer, keywords) {
return false
}
}
return true
}
// EnforcedRAG wraps RAG with citation enforcement
type EnforcedRAG struct {
base *RAGModule
assertion *CitationAssertion
refiner *modules.Refine
}
func NewEnforcedRAG(vectorDB VectorDatabase) *EnforcedRAG {
base := NewRAGModule(vectorDB)
assertion := &CitationAssertion{
minCitations: 2,
maxLength: 500,
}
// Refine with citation reward
refiner := modules.NewRefine(
base,
modules.RefineConfig{
N: 3,
RewardFn: func(args, pred map[string]interface{}) float64 {
if !assertion.Validate(pred) {
return 0.0
}
// Score based on citation quality
citations := pred["citations"].([]string)
score := float64(len(citations)) / 5.0 // Max 5 citations
// Bonus for answer coherence
if assessCoherence(pred["answer"].(string)) {
score += 0.3
}
return math.Min(score, 1.0)
},
Threshold: 0.8,
},
)
return &EnforcedRAG{
base: base,
assertion: assertion,
refiner: refiner,
}
}
5.3 Optimized RAG with MIPRO
// RAGOptimizer uses MIPRO to tune retrieval and generation
type RAGOptimizer struct {
optimizer *optimizers.MIPRO
dataset *RAGDataset
}
func (o *RAGOptimizer) Optimize(ctx context.Context, rag *RAGModule) (*RAGModule, error) {
// Define composite metric
metric := func(example, prediction map[string]interface{}) float64 {
score := 0.0
// Correctness (40%)
if prediction["answer"] == example["answer"] {
score += 0.4
}
// Citation accuracy (30%)
citations := prediction["citations"].([]string)
for _, cite := range citations {
if strings.Contains(example["context"].(string), cite) {
score += 0.1
}
}
// Relevance (30%)
relevance := assessRelevance(
example["question"].(string),
prediction["answer"].(string),
)
score += relevance * 0.3
return score
}
// Optimize with MIPRO
optimized, err := o.optimizer.Compile(
ctx,
rag,
o.dataset,
metric,
)
return optimized.(*RAGModule), err
}
5.4 Multi-Chain RAG Comparison
// MultiChainRAG runs multiple retrieval strategies in parallel
type MultiChainRAG struct {
chains []RAGChain
voter *ConsensusVoter
}
type RAGChain struct {
Name string
Retriever *modules.Retrieve
TopK int
Rerank bool
}
func NewMultiChainRAG() *MultiChainRAG {
return &MultiChainRAG{
chains: []RAGChain{
{Name: "dense", TopK: 3, Rerank: false},
{Name: "sparse", TopK: 5, Rerank: true},
{Name: "hybrid", TopK: 10, Rerank: true},
},
voter: NewConsensusVoter(0.66),
}
}
func (m *MultiChainRAG) Process(ctx context.Context, question string) (map[string]interface{}, error) {
results := make([]map[string]interface{}, len(m.chains))
var wg sync.WaitGroup
// Run chains in parallel
for i, chain := range m.chains {
wg.Add(1)
go func(idx int, c RAGChain) {
defer wg.Done()
// Retrieve with chain-specific strategy
docs := c.Retriever.Process(ctx, map[string]interface{}{
"query": question,
"top_k": c.TopK,
})
// Optional reranking
if c.Rerank {
docs = rerank(ctx, question, docs)
}
// Generate answer
answer := generateAnswer(ctx, question, docs)
results[idx] = answer
}(i, chain)
}
wg.Wait()
// Vote on best answer
return m.voter.SelectBest(results), nil
}
5.5 Hybrid RAG with Structured Hints
// HybridRAGSignature includes metadata for better grounding
type HybridRAGSignature struct {
Question string `dspy:"input"`
Context string `dspy:"input"`
Metadata map[string]string `dspy:"input" desc:"Document metadata"`
UserRole string `dspy:"input" desc:"User's role/permissions"`
Tags []string `dspy:"input" desc:"Relevant tags"`
Answer string `dspy:"output"`
Confidence float64 `dspy:"output" min:"0.0" max:"1.0"`
Sources []string `dspy:"output" desc:"Source document IDs"`
}
// ProductionHybridRAG with all enhancements
type ProductionHybridRAG struct {
vectorDB VectorDatabase
metadataDB MetadataDatabase
preprocessor *QueryPreprocessor
multichain *MultiChainRAG
enforcer *EnforcedRAG
optimizer *RAGOptimizer
monitor *RAGMonitor
}
func (p *ProductionHybridRAG) Query(ctx context.Context, input QueryInput) (*QueryOutput, error) {
// Preprocess query
processed := p.preprocessor.Process(input.Question)
// Extract metadata hints
metadata := p.metadataDB.GetMetadata(processed.Keywords)
// Run multi-chain retrieval
candidates := p.multichain.Process(ctx, processed.Query)
// Enforce citations
validated := p.enforcer.Process(ctx, map[string]interface{}{
"question": input.Question,
"context": candidates["context"],
"metadata": metadata,
"user_role": input.UserRole,
"tags": processed.Tags,
})
// Monitor performance
p.monitor.Record(input, validated)
// Trigger reoptimization if needed
if p.monitor.ShouldReoptimize() {
go p.optimizer.Optimize(ctx, p.enforcer.base)
}
return &QueryOutput{
Answer: validated["answer"].(string),
Citations: validated["citations"].([]string),
Confidence: validated["confidence"].(float64),
Sources: validated["sources"].([]string),
}, nil
}
5.6 RAG Backend Integration
// Vector Database Adapters
type VectorDBAdapter interface {
Search(ctx context.Context, query string, k int) ([]Document, error)
Index(ctx context.Context, docs []Document) error
}
// FAISS Adapter
type FAISSAdapter struct {
index *faiss.Index
mapper *DocumentMapper
}
func (f *FAISSAdapter) Search(ctx context.Context, query string, k int) ([]Document, error) {
embedding := f.mapper.Embed(query)
distances, indices := f.index.Search(embedding, k)
return f.mapper.GetDocuments(indices), nil
}
// Elasticsearch Adapter
type ElasticsearchAdapter struct {
client *elastic.Client
index string
}
func (e *ElasticsearchAdapter) Search(ctx context.Context, query string, k int) ([]Document, error) {
searchResult, err := e.client.Search().
Index(e.index).
Query(elastic.NewMatchQuery("content", query)).
Size(k).
Do(ctx)
if err != nil {
return nil, err
}
return e.parseResults(searchResult), nil
}
// Weaviate Adapter
type WeaviateAdapter struct {
client *weaviate.Client
class string
}
func (w *WeaviateAdapter) Search(ctx context.Context, query string, k int) ([]Document, error) {
result, err := w.client.GraphQL().
Get().
WithClassName(w.class).
WithNearText(&graphql.NearTextArgument{
Concepts: []string{query},
}).
WithLimit(k).
Do(ctx)
return w.parseResults(result), err
}
5.7 RAG Performance Optimization
// Optimized retrieval with caching and prefetching
type OptimizedRetriever struct {
vectorDB VectorDBAdapter
cache *cache.MultiLevelCache
prefetcher *Prefetcher
embedder *Embedder
}
func (o *OptimizedRetriever) Retrieve(ctx context.Context, query string) ([]Document, error) {
// Check cache
cacheKey := o.generateCacheKey(query)
if cached := o.cache.Get(cacheKey); cached != nil {
return cached.([]Document), nil
}
// Prefetch related queries
o.prefetcher.PrefetchRelated(query)
// Parallel search across multiple indices
results := o.parallelSearch(ctx, query)
// Cache results
o.cache.Store(cacheKey, results)
return results, nil
}
func (o *OptimizedRetriever) parallelSearch(ctx context.Context, query string) []Document {
var wg sync.WaitGroup
resultChan := make(chan []Document, 3)
// Search dense index
wg.Add(1)
go func() {
defer wg.Done()
docs, _ := o.vectorDB.Search(ctx, query, 5)
resultChan <- docs
}()
// Search sparse index (BM25)
wg.Add(1)
go func() {
defer wg.Done()
docs, _ := o.searchSparse(ctx, query, 5)
resultChan <- docs
}()
// Search metadata
wg.Add(1)
go func() {
defer wg.Done()
docs, _ := o.searchMetadata(ctx, query, 5)
resultChan <- docs
}()
wg.Wait()
close(resultChan)
// Merge and deduplicate results
return o.mergeResults(resultChan)
}
RAG Deployment Checklist
Architecture Requirements
Optimization Steps
Performance Targets
Performance Benchmarks
| Operation | Before | After | Improvement | |-----------|--------|-------|-------------| | 1000 predictions | 300s | 8s | 37.5x | | Demo selection | 5ms | 0.1ms | 50x | | Assertion check | 300ms | 0.001ms | 300,000x | | Token usage | 500K | 25K | 20x |
Implementation Checklist
Advanced Modules
ChainOfThought
cot := modules.NewChainOfThought(signature)
ReAct (Reasoning & Acting)
react := modules.NewReAct(signature, []core.Tool{calculator, search})
MultiChainComparison
NewMultiChainComparison(signature, numChains, temperature)
Refine
Parallel Module
WithMaxWorkers(n)
, WithStopOnFirstError(bool)
Critical Code Snippets
// Batch wrapper
func BatchPredict(p Predictor, inputs []string) []string
// Demo caching
var demoCache = &sync.Map{}
// Parallel execution
go func(idx int, m Module) { outputs[idx] = m.Forward(input) }
Configuration Guidelines
LLM Provider Configuration
Anthropic Claude
llm, _ := llms.NewAnthropicLLM("api-key", core.ModelAnthropicSonnet)
Google Gemini (Multimodal)
llm, _ := llms.NewGeminiLLM("api-key", "gemini-pro")
OpenAI & Compatible APIs
// Standard OpenAI
llm, _ := llms.NewOpenAI(core.ModelOpenAIGPT4, "api-key")
// LiteLLM (100+ LLMs)
llm, _ := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
llms.WithAPIKey("key"),
llms.WithOpenAIBaseURL("http://localhost:4000"))
// Azure OpenAI
llm, _ := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
llms.WithOpenAIBaseURL("https://resource.openai.azure.com"),
llms.WithOpenAIPath("/openai/deployments/model/chat/completions"),
llms.WithHeader("api-version", "2024-02-15-preview"))
Local Models
// Ollama
llm, _ := llms.NewOllamaLLM("http://localhost:11434", "llama2")
// LlamaCPP
llm, _ := llms.NewLlamacppLLM("http://localhost:8080")
// LocalAI
llm, _ := llms.NewOpenAILLM(core.ModelOpenAIGPT4,
llms.WithOpenAIBaseURL("http://localhost:8080"),
llms.WithOpenAIPath("/v1/chat/completions"))
Setting Default LLM
``go
llms.SetDefaultLLM(llm)
// Or per-module
module.SetLLM(llm)
## Optimizers Deep Dive
### BootstrapFewShot
- Automatically selects high-quality examples from dataset
- **Strategy**: Score examples → Select top K → Add to prompt
- **Config**: Dataset, metric function, num_examples
- **Use when**: You have labeled training data
### MIPRO (Multi-step Interactive Prompt Optimization)
- Uses TPE (Tree-structured Parzen Estimator) search
- **Modes**: Light (fast), Standard, Heavy (thorough)
- **Config**:
WithNumTrials(n),
WithTPEGamma(0.25)`