Skip to content

Atlas — LiveKit Adapter

Language: Python 3.9+
Source: ~/PycharmProjects/AI/atlas/src/voice_pipeline

Purpose

Atlas is the central orchestrator of the Voice AI pipeline. It runs as a LiveKit Agent plugin, connecting the media layer (LiveKit) to the AI services (STT, LLM, TTS). Every voice call passes through Atlas.

Responsibilities

Responsibility Implementation
LiveKit job handling VoicePipelineService.entrypoint() — called when agent joins a room
Agent config fetch fetch_agent_config() → Compass GET /tenants/agents/{agent_id}
Audio pipeline RoomIO → VAD → STT → LLM → TTS → RoomIO
Tool execution Local tools + MCP servers via MCPManager
Billing publish UsageMetricsCollector → Billing Service post-call
Error handling & escalation SessionErrorHandler → human agent handoff
Observability OpenTelemetry + Langfuse tracing

Pipeline Architecture

graph LR
    LK[LiveKit\nRoom] -->|Audio| VAD[VAD\nSilero]
    VAD -->|Speech frames| SA[Stream\nAdapter]
    SA -->|Audio| STT[STT Service]
    STT -->|Transcript| LLM[LLM Service]
    LLM -->|Tool calls| Tools[MCP / Local Tools]
    Tools -->|Results| LLM
    LLM -->|Response text| TTS[TTS Service]
    TTS -->|Audio| LK
    LLM -->|Escalation| HA[Human Agent\nQueue]
    LK -->|Call end| BL[Billing\nService]

Session Lifecycle

sequenceDiagram
    participant LK as LiveKit
    participant Atlas
    participant Compass
    participant STT
    participant LLM
    participant TTS
    participant Billing

    LK->>Atlas: Job assigned (room joined)
    Atlas->>LK: ctx.connect()
    Atlas->>Atlas: wait_for_room_metadata() — parse tenant_id, agent_id, call_id
    Atlas->>Compass: GET /tenants/agents/{agent_id}\n(x-tenantId header)
    Compass-->>Atlas: Agent config (prompt, voice_config, skills, greetings)
    Atlas->>Atlas: Build PipelineConfig\n(STT/LLM/TTS providers from voice_config)
    Atlas->>Atlas: Instantiate VoiceAgent / RealtimeAgent
    Atlas->>LK: session.start()
    Atlas->>TTS: Speak base_greeting

    loop Conversation turn
        LK->>Atlas: Audio
        Atlas->>STT: Audio frames (post-VAD)
        STT-->>Atlas: Transcript
        Atlas->>LLM: Transcript + ChatContext
        LLM-->>Atlas: Response text (or tool call)
        opt Tool call
            Atlas->>Tools: Execute tool
            Tools-->>Atlas: Result
            Atlas->>LLM: Tool result
            LLM-->>Atlas: Final response text
        end
        Atlas->>TTS: Response text
        TTS-->>Atlas: Audio
        Atlas->>LK: Audio (to caller)
    end

    Atlas->>Billing: POST /usage/events (AI_CALL_COMPLETED)

Agent Config Usage

Atlas fetches the agent record from Compass at the start of every call and uses these fields:

Field Used for
agent_prompt LLM system message (STT-LLM-TTS mode)
prompt_realtime Immutable system prompt for audio-native models
language STT language config, language detection
base_greeting First utterance to caller
metadata.voice_config Provider selection for STT, LLM, TTS
metadata.voice_config.pipeline_mode stt_llm_tts (default) or realtime
agent_skills Registered as RawFunctionTools for the LLM
tool_prompts Per-tool instruction overrides
mcp_server_url MCP tool orchestration endpoint
human_in_loop Escalation trigger condition
language_styles Per-language prompt style guides

STT Providers

Configured via voice_config.stt_provider in the agent record.

Provider Notes
openai gpt-4o-transcribe — recommended, streaming
google chirp_3, latest_long — multilingual
groq whisper-large-v3-turbo — fast
deepgram nova-3 — fallback default
elevenlabs scribe_v2_realtime — 99+ languages
cartesia ink-whisper — code-switching
gladia Solaria-1 — per-utterance language detection
huggingface Whisper via Inference API — on-premise / Arabic
baseten Whisper large v3, streaming

VAD: Silero VAD filters audio before STT. A Whisper hallucination filter suppresses echo/silence artefacts within a 3 s post-TTS window.


LLM Providers

Configured via voice_config.llm_model in the agent record.

Provider Models
google gemini-2.5-flash, gemini-3-flash (default)
openai gpt-4o, gpt-4o-mini
Custom HuggingFace / Together AI / Groq via OpenAI-compatible endpoint

Context management: Sliding window over conversation history. Tools are injected per-turn from agent_skills.


TTS Providers

Configured via voice_config.tts_provider and voice_config.voice in the agent record.

Provider Notes
openai gpt-4o-mini-tts — default
gemini gemini-2.5-flash-preview-tts — fast
elevenlabs eleven_turbo_v2_5 — natural, multilingual
google Chirp3-HD — streaming, Indian languages
cartesia sonic-2/3 — emotion / speed control
hume octave v1/v2 — expressive
huggingface Chatterbox — on-premise
on_premise Custom API at localhost:5000

Text processing: Abbreviation expansion (Redis-cached), number normalisation, markdown stripping before synthesis.


Agent Types

Type Class When Used
VoiceAgent agents.py Standard STT→LLM→TTS voice conversation
RealtimeAgent agents.py Audio-native models (Gemini Live, OpenAI Realtime) — lower latency
OutboundAgent agents.py Outbound/campaign calls — fetches call script from Call Service
PureIVRAgent agents.py Menu-based IVR with no conversational AI

Tool Execution

Atlas supports three tool sources:

1. Local tools (always available)

Tool Purpose
transfer_to_human_agent Trigger escalation to human agent queue
switch_language Change conversation language mid-call
end_call Gracefully terminate the call
search_knowledge Query agent's knowledge base

2. Agent skills from Compass
agent_skills are registered as RawFunctionTools. Calls routed via MCPManager to the configured mcp_server_url.

3. MCP servers
Discovered per agent config. MCPManager deduplicates tool calls within a 30 s window to prevent concurrent duplicate execution.


Error Handling & Escalation

Managed by SessionErrorHandler (error_handler.py):

Scenario Response
Recoverable LLM error Speak filler ("Bear with me one moment") while SDK retries
Recoverable STT/TTS error Silent retry
Non-recoverable (1st failure) "Please repeat that?"
Non-recoverable (2nd failure) Force transfer to human agent queue

Escalation messages are language-aware (EN, AR, HI). The human_in_loop condition from Compass is also evaluated by the LLM each turn.


Billing

UsageMetricsCollector tracks per-call metrics and publishes to the Billing Service at call end.

Event published: AI_CALL_COMPLETED

{
  "eventType": "AI_CALL_COMPLETED",
  "callId": "uuid",
  "tenantId": "string",
  "metrics": {
    "durationSeconds": 120,
    "stt": {
      "provider": "openai",
      "model": "gpt-4o-transcribe",
      "durationSeconds": 45,
      "transcriptChars": 1200
    },
    "llm": {
      "provider": "google",
      "model": "gemini-2.5-flash",
      "inputTokens": 500,
      "outputTokens": 150,
      "turnCount": 5
    },
    "tts": {
      "provider": "openai",
      "characters": 800
    }
  }
}

Observability

  • OpenTelemetry traces exported to Langfuse (LANGFUSE_* env vars)
  • Session ID = call_id, User ID = caller phone number
  • VAD diagnostics: frame arrival rates, Silero inference stats, speech/silence transitions
  • STT diagnostics: per-utterance RMS/peak, hallucination filter decisions, ghost segment detection
  • Tool execution: all tool inputs/outputs logged at DEBUG level

Key Environment Variables

LIVEKIT_URL=...
LIVEKIT_API_KEY=...
LIVEKIT_API_SECRET=...

LIVEKIT_AI_PROVISIONING_API_URL=http://compass-service:8000
CALL_SERVICE_URL=http://call-service:8000
BILLING_SERVICE_URL=http://billing-service:8080
TOOL_SERVICE_URL=http://tool-management-service:8009

STT_PROVIDER=openai
LLM_PROVIDER=google
TTS_PROVIDER=openai

WORKER_NUM_IDLE_PROCESSES=5
WORKER_LOAD_THRESHOLD=0.75

LANGFUSE_PUBLIC_KEY=...
LANGFUSE_SECRET_KEY=...
LANGFUSE_HOST=...

INSIGHTS_ENABLED=true
INSIGHTS_REDIS_URL=redis://...