Production AI System Architecture
Design and build production-grade AI systems. Learn about scalability, reliability, monitoring, cost optimization, and advanced deployment patterns.
Topics Covered:
Prerequisites:
- Advanced AI Engineering experience
- System design knowledge
- Production deployment experience
Overview
Building AI systems that work in production requires careful architecture, monitoring, and optimization. This expert-level tutorial covers designing, deploying, and maintaining production AI systems at scale.
AI System Architecture Patterns
Production AI systems need robust architectures that handle scale, reliability, and cost. Key Patterns: • Microservices architecture • Async processing with queues • Caching layers • Load balancing • Circuit breakers • Rate limiting Architecture Components: • API Gateway: Entry point, routing, auth • Request Queue: Handles traffic spikes • Processing Workers: Execute AI operations • Cache Layer: Reduces API calls • Database: Stores results, user data • Monitoring: Tracks performance, errors Design Principles: • Stateless services • Horizontal scalability • Graceful degradation • Idempotent operations • Event-driven architecture
Scalability and Performance
AI systems face unique scalability challenges due to API costs and latency. Strategies: • Async processing for long operations • Request batching to reduce API calls • Response caching • Connection pooling • CDN for static content • Database optimization Handling Traffic: • Queue-based architecture • Auto-scaling workers • Rate limiting per user • Request prioritization • Load balancing
# Async AI processing with queues
from celery import Celery
from langchain.llms import OpenAI
import redis
# Celery for async tasks
app = Celery('ai_worker', broker='redis://localhost:6379')
@app.task
def process_ai_request(user_id, prompt, request_id):
"""Process AI request asynchronously."""
try:
llm = OpenAI(temperature=0.7)
result = llm.predict(prompt)
# Store result
store_result(request_id, result, user_id)
# Notify user (WebSocket, webhook, etc.)
notify_user(user_id, request_id, "completed")
return result
except Exception as e:
notify_user(user_id, request_id, "failed", str(e))
raise
# Request batching
from langchain.llms import OpenAI
import asyncio
async def batch_process_requests(requests):
"""Batch multiple requests to reduce API calls."""
llm = OpenAI(temperature=0.7)
# Group similar requests
batches = group_similar_requests(requests)
results = []
for batch in batches:
# Process batch
batch_results = await asyncio.gather(*[
process_single_request(llm, req) for req in batch
])
results.extend(batch_results)
return results
# Response caching
from functools import lru_cache
import hashlib
import json
def cache_key(prompt, model, temperature):
"""Generate cache key from request parameters."""
data = json.dumps({
"prompt": prompt,
"model": model,
"temperature": temperature
})
return hashlib.md5(data.encode()).hexdigest()
@lru_cache(maxsize=1000)
def cached_llm_call(cache_key, prompt):
"""Cached LLM call."""
llm = OpenAI(temperature=0.7)
return llm.predict(prompt)
# Usage
key = cache_key("What is AI?", "gpt-3.5-turbo", 0.7)
result = cached_llm_call(key, "What is AI?") # Cached on second callAsync processing, batching, and caching are essential for scalable AI systems. These patterns reduce costs, improve performance, and handle traffic spikes.
Reliability and Error Handling
Production systems must handle failures gracefully. Error Types: • Transient errors (network, rate limits) • Permanent errors (invalid input, auth failures) • Partial failures (timeouts, incomplete responses) Strategies: • Retries with exponential backoff • Circuit breakers • Fallback mechanisms • Dead letter queues • Health checks • Graceful degradation
# Retry with exponential backoff
import time
from functools import wraps
def retry_with_backoff(max_retries=3, initial_delay=1, backoff_factor=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(delay)
delay *= backoff_factor
return None
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def call_ai_api(prompt):
"""AI API call with automatic retry."""
llm = OpenAI(temperature=0.7)
return llm.predict(prompt)
# Circuit breaker pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.timeout:
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
# Fallback mechanism
def get_ai_response_with_fallback(prompt):
"""AI response with fallback to simpler model."""
try:
# Try primary model
llm = OpenAI(model_name="gpt-4", temperature=0.7)
return llm.predict(prompt)
except Exception as e:
# Fallback to cheaper model
try:
llm = OpenAI(model_name="gpt-3.5-turbo", temperature=0.7)
return llm.predict(prompt)
except Exception as e2:
# Final fallback
return "I'm sorry, I'm having trouble processing your request right now."Robust error handling with retries, circuit breakers, and fallbacks ensures production systems remain available even when components fail.
Monitoring and Observability
Comprehensive monitoring is essential for production AI systems. What to Monitor: • API latency and response times • Token usage and costs • Error rates and types • User activity and patterns • System resource usage • Model performance metrics Tools: • Application Performance Monitoring (APM) • Logging (structured logs) • Metrics (Prometheus, Datadog) • Tracing (distributed tracing) • Alerting (PagerDuty, etc.)
# Comprehensive monitoring
import time
import logging
from functools import wraps
from prometheus_client import Counter, Histogram, Gauge
# Metrics
api_calls = Counter('ai_api_calls_total', 'Total API calls', ['model', 'status'])
api_latency = Histogram('ai_api_latency_seconds', 'API call latency')
token_usage = Counter('ai_tokens_total', 'Total tokens used', ['type'])
active_requests = Gauge('ai_active_requests', 'Active requests')
# Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def monitor_ai_call(func):
"""Decorator to monitor AI API calls."""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
active_requests.inc()
try:
result = func(*args, **kwargs)
# Log success
latency = time.time() - start_time
api_latency.observe(latency)
api_calls.labels(model=kwargs.get('model', 'unknown'), status='success').inc()
logger.info("AI call succeeded", extra={
"latency": latency,
"model": kwargs.get('model'),
"tokens": result.get('usage', {}).get('total_tokens', 0)
})
# Track tokens
if 'usage' in result:
token_usage.labels(type='prompt').inc(result['usage'].get('prompt_tokens', 0))
token_usage.labels(type='completion').inc(result['usage'].get('completion_tokens', 0))
return result
except Exception as e:
# Log failure
api_calls.labels(model=kwargs.get('model', 'unknown'), status='error').inc()
logger.error("AI call failed", extra={
"error": str(e),
"model": kwargs.get('model')
})
raise
finally:
active_requests.dec()
return wrapper
@monitor_ai_call
def call_ai_api(prompt, model="gpt-3.5-turbo"):
"""Monitored AI API call."""
llm = OpenAI(model_name=model)
return llm.predict(prompt)
# Health check endpoint
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/health')
def health_check():
"""Health check endpoint."""
checks = {
"api": check_api_connectivity(),
"database": check_database(),
"cache": check_cache(),
"status": "healthy" if all([checks["api"], checks["database"], checks["cache"]]) else "unhealthy"
}
status_code = 200 if checks["status"] == "healthy" else 503
return jsonify(checks), status_codeComprehensive monitoring tracks performance, costs, errors, and system health. This enables proactive issue detection and optimization.
Cost Optimization
AI systems can be expensive. Cost optimization is crucial. Strategies: • Use appropriate models (GPT-3.5 vs GPT-4) • Cache responses aggressively • Batch requests when possible • Optimize prompts (shorter = cheaper) • Set usage limits per user • Monitor and alert on costs • Use streaming for better UX • Consider fine-tuned models for specific tasks
# Cost tracking and optimization
class CostTracker:
def __init__(self, budget_per_day=100):
self.budget_per_day = budget_per_day
self.daily_spend = 0
self.model_costs = {
"gpt-4": 0.03 / 1000, # per 1K tokens
"gpt-3.5-turbo": 0.002 / 1000
}
def estimate_cost(self, model, prompt_tokens, completion_tokens):
"""Estimate cost for API call."""
cost_per_1k = self.model_costs.get(model, 0.002)
total_tokens = prompt_tokens + completion_tokens
return (total_tokens / 1000) * cost_per_1k
def should_use_cheaper_model(self, prompt):
"""Decide if cheaper model is sufficient."""
# Simple heuristic: use GPT-3.5 for short, simple prompts
if len(prompt) < 500:
return True
return False
def track_spend(self, cost):
"""Track spending and check budget."""
self.daily_spend += cost
if self.daily_spend > self.budget_per_day:
raise Exception("Daily budget exceeded")
return self.daily_spend
# Smart model selection
def get_ai_response_optimized(prompt):
"""Get AI response with cost optimization."""
tracker = CostTracker()
# Choose model based on complexity
if tracker.should_use_cheaper_model(prompt):
model = "gpt-3.5-turbo"
else:
model = "gpt-4"
# Check cache first
cached = get_from_cache(prompt, model)
if cached:
return cached
# Make API call
result = call_ai_api(prompt, model=model)
# Estimate and track cost
cost = tracker.estimate_cost(
model,
result['usage']['prompt_tokens'],
result['usage']['completion_tokens']
)
tracker.track_spend(cost)
# Cache result
cache_result(prompt, model, result)
return resultCost optimization is essential for sustainable AI systems. Track spending, use appropriate models, cache aggressively, and set budgets.
Conclusion
Production AI systems require careful architecture, monitoring, and optimization. Focus on scalability, reliability, observability, and cost management. Start with solid foundations, then optimize based on real usage patterns. Remember: production systems are never done - continuous improvement is essential.