Atlas — LiveKit Adapter¶
Language: Python 3.9+
Source: ~/PycharmProjects/AI/atlas/src/voice_pipeline
Purpose¶
Atlas is the central orchestrator of the Voice AI pipeline. It runs as a LiveKit Agent plugin, connecting the media layer (LiveKit) to the AI services (STT, LLM, TTS). Every voice call passes through Atlas.
Responsibilities¶
| Responsibility | Implementation |
|---|---|
| LiveKit job handling | VoicePipelineService.entrypoint() — called when agent joins a room |
| Agent config fetch | fetch_agent_config() → Compass GET /tenants/agents/{agent_id} |
| Audio pipeline | RoomIO → VAD → STT → LLM → TTS → RoomIO |
| Tool execution | Local tools + MCP servers via MCPManager |
| Billing publish | UsageMetricsCollector → Billing Service post-call |
| Error handling & escalation | SessionErrorHandler → human agent handoff |
| Observability | OpenTelemetry + Langfuse tracing |
Pipeline Architecture¶
graph LR
LK[LiveKit\nRoom] -->|Audio| VAD[VAD\nSilero]
VAD -->|Speech frames| SA[Stream\nAdapter]
SA -->|Audio| STT[STT Service]
STT -->|Transcript| LLM[LLM Service]
LLM -->|Tool calls| Tools[MCP / Local Tools]
Tools -->|Results| LLM
LLM -->|Response text| TTS[TTS Service]
TTS -->|Audio| LK
LLM -->|Escalation| HA[Human Agent\nQueue]
LK -->|Call end| BL[Billing\nService]
Session Lifecycle¶
sequenceDiagram
participant LK as LiveKit
participant Atlas
participant Compass
participant STT
participant LLM
participant TTS
participant Billing
LK->>Atlas: Job assigned (room joined)
Atlas->>LK: ctx.connect()
Atlas->>Atlas: wait_for_room_metadata() — parse tenant_id, agent_id, call_id
Atlas->>Compass: GET /tenants/agents/{agent_id}\n(x-tenantId header)
Compass-->>Atlas: Agent config (prompt, voice_config, skills, greetings)
Atlas->>Atlas: Build PipelineConfig\n(STT/LLM/TTS providers from voice_config)
Atlas->>Atlas: Instantiate VoiceAgent / RealtimeAgent
Atlas->>LK: session.start()
Atlas->>TTS: Speak base_greeting
loop Conversation turn
LK->>Atlas: Audio
Atlas->>STT: Audio frames (post-VAD)
STT-->>Atlas: Transcript
Atlas->>LLM: Transcript + ChatContext
LLM-->>Atlas: Response text (or tool call)
opt Tool call
Atlas->>Tools: Execute tool
Tools-->>Atlas: Result
Atlas->>LLM: Tool result
LLM-->>Atlas: Final response text
end
Atlas->>TTS: Response text
TTS-->>Atlas: Audio
Atlas->>LK: Audio (to caller)
end
Atlas->>Billing: POST /usage/events (AI_CALL_COMPLETED)
Agent Config Usage¶
Atlas fetches the agent record from Compass at the start of every call and uses these fields:
| Field | Used for |
|---|---|
agent_prompt |
LLM system message (STT-LLM-TTS mode) |
prompt_realtime |
Immutable system prompt for audio-native models |
language |
STT language config, language detection |
base_greeting |
First utterance to caller |
metadata.voice_config |
Provider selection for STT, LLM, TTS |
metadata.voice_config.pipeline_mode |
stt_llm_tts (default) or realtime |
agent_skills |
Registered as RawFunctionTools for the LLM |
tool_prompts |
Per-tool instruction overrides |
mcp_server_url |
MCP tool orchestration endpoint |
human_in_loop |
Escalation trigger condition |
language_styles |
Per-language prompt style guides |
STT Providers¶
Configured via voice_config.stt_provider in the agent record.
| Provider | Notes |
|---|---|
openai |
gpt-4o-transcribe — recommended, streaming |
google |
chirp_3, latest_long — multilingual |
groq |
whisper-large-v3-turbo — fast |
deepgram |
nova-3 — fallback default |
elevenlabs |
scribe_v2_realtime — 99+ languages |
cartesia |
ink-whisper — code-switching |
gladia |
Solaria-1 — per-utterance language detection |
huggingface |
Whisper via Inference API — on-premise / Arabic |
baseten |
Whisper large v3, streaming |
VAD: Silero VAD filters audio before STT. A Whisper hallucination filter suppresses echo/silence artefacts within a 3 s post-TTS window.
LLM Providers¶
Configured via voice_config.llm_model in the agent record.
| Provider | Models |
|---|---|
google |
gemini-2.5-flash, gemini-3-flash (default) |
openai |
gpt-4o, gpt-4o-mini |
| Custom | HuggingFace / Together AI / Groq via OpenAI-compatible endpoint |
Context management: Sliding window over conversation history. Tools are injected per-turn from agent_skills.
TTS Providers¶
Configured via voice_config.tts_provider and voice_config.voice in the agent record.
| Provider | Notes |
|---|---|
openai |
gpt-4o-mini-tts — default |
gemini |
gemini-2.5-flash-preview-tts — fast |
elevenlabs |
eleven_turbo_v2_5 — natural, multilingual |
google |
Chirp3-HD — streaming, Indian languages |
cartesia |
sonic-2/3 — emotion / speed control |
hume |
octave v1/v2 — expressive |
huggingface |
Chatterbox — on-premise |
on_premise |
Custom API at localhost:5000 |
Text processing: Abbreviation expansion (Redis-cached), number normalisation, markdown stripping before synthesis.
Agent Types¶
| Type | Class | When Used |
|---|---|---|
VoiceAgent |
agents.py |
Standard STT→LLM→TTS voice conversation |
RealtimeAgent |
agents.py |
Audio-native models (Gemini Live, OpenAI Realtime) — lower latency |
OutboundAgent |
agents.py |
Outbound/campaign calls — fetches call script from Call Service |
PureIVRAgent |
agents.py |
Menu-based IVR with no conversational AI |
Tool Execution¶
Atlas supports three tool sources:
1. Local tools (always available)
| Tool | Purpose |
|---|---|
transfer_to_human_agent |
Trigger escalation to human agent queue |
switch_language |
Change conversation language mid-call |
end_call |
Gracefully terminate the call |
search_knowledge |
Query agent's knowledge base |
2. Agent skills from Compass
agent_skills are registered as RawFunctionTools. Calls routed via MCPManager to the configured mcp_server_url.
3. MCP servers
Discovered per agent config. MCPManager deduplicates tool calls within a 30 s window to prevent concurrent duplicate execution.
Error Handling & Escalation¶
Managed by SessionErrorHandler (error_handler.py):
| Scenario | Response |
|---|---|
| Recoverable LLM error | Speak filler ("Bear with me one moment") while SDK retries |
| Recoverable STT/TTS error | Silent retry |
| Non-recoverable (1st failure) | "Please repeat that?" |
| Non-recoverable (2nd failure) | Force transfer to human agent queue |
Escalation messages are language-aware (EN, AR, HI). The human_in_loop condition from Compass is also evaluated by the LLM each turn.
Billing¶
UsageMetricsCollector tracks per-call metrics and publishes to the Billing Service at call end.
Event published: AI_CALL_COMPLETED
{
"eventType": "AI_CALL_COMPLETED",
"callId": "uuid",
"tenantId": "string",
"metrics": {
"durationSeconds": 120,
"stt": {
"provider": "openai",
"model": "gpt-4o-transcribe",
"durationSeconds": 45,
"transcriptChars": 1200
},
"llm": {
"provider": "google",
"model": "gemini-2.5-flash",
"inputTokens": 500,
"outputTokens": 150,
"turnCount": 5
},
"tts": {
"provider": "openai",
"characters": 800
}
}
}
Observability¶
- OpenTelemetry traces exported to Langfuse (
LANGFUSE_*env vars) - Session ID =
call_id, User ID = caller phone number - VAD diagnostics: frame arrival rates, Silero inference stats, speech/silence transitions
- STT diagnostics: per-utterance RMS/peak, hallucination filter decisions, ghost segment detection
- Tool execution: all tool inputs/outputs logged at DEBUG level
Key Environment Variables¶
LIVEKIT_URL=...
LIVEKIT_API_KEY=...
LIVEKIT_API_SECRET=...
LIVEKIT_AI_PROVISIONING_API_URL=http://compass-service:8000
CALL_SERVICE_URL=http://call-service:8000
BILLING_SERVICE_URL=http://billing-service:8080
TOOL_SERVICE_URL=http://tool-management-service:8009
STT_PROVIDER=openai
LLM_PROVIDER=google
TTS_PROVIDER=openai
WORKER_NUM_IDLE_PROCESSES=5
WORKER_LOAD_THRESHOLD=0.75
LANGFUSE_PUBLIC_KEY=...
LANGFUSE_SECRET_KEY=...
LANGFUSE_HOST=...
INSIGHTS_ENABLED=true
INSIGHTS_REDIS_URL=redis://...