Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.burki.dev/llms.txt

Use this file to discover all available pages before exploring further.

All Burki SDKs support real-time WebSocket streaming for live transcripts during calls and campaign progress monitoring. This enables building powerful real-time dashboards, monitoring tools, and analytics systems.

Overview

Burki provides two real-time streaming endpoints:
FeatureDescriptionUse Cases
Live TranscriptsStream real-time transcriptions during active callsCall monitoring, live captions, sentiment analysis
Campaign ProgressStream campaign execution updatesProgress dashboards, real-time analytics, alerting

Live Transcript Streaming

Stream real-time transcripts as they are generated during calls. Perfect for building:
  • Call monitoring dashboards
  • Live supervisor interfaces
  • Real-time sentiment analysis
  • Accessibility features (live captions)
  • Training and coaching tools

Event Types

Event TypeDescription
connection_establishedWebSocket connection successful
transcriptNew transcript segment from user or assistant
call_statusCall status change (in-progress, completed, failed)
errorError message
pongResponse to ping (keepalive)

Transcript Event Data

{
  "type": "transcript",
  "call_sid": "CA123abc...",
  "timestamp": "2026-01-19T12:00:05.123Z",
  "data": {
    "content": "Hello, how can I help you today?",
    "speaker": "assistant",
    "is_final": true,
    "confidence": 0.95,
    "segment_start": 5.2,
    "segment_end": 7.1,
    "created_at": "2026-01-19T12:00:05.123Z"
  }
}

Implementation Examples

The Python SDK uses async context managers for streaming:
import asyncio
from burki import BurkiClient

async def monitor_call(call_sid: str):
    client = BurkiClient(api_key="your-api-key")
    
    async with client.realtime.live_transcript(call_sid=call_sid) as stream:
        async for event in stream:
            if event.type == "transcript":
                speaker = event.data.speaker
                content = event.data.content
                print(f"[{speaker}]: {content}")
                
            elif event.type == "call_status":
                print(f"Call status: {event.status}")
                if event.status == "completed":
                    break
                    
            elif event.type == "error":
                print(f"Error: {event.message}")
                break

# Run the monitor
asyncio.run(monitor_call("CA123abc..."))

Features:

  • Automatic connection management with context manager
  • Async iteration for events
  • Clean disconnection on exit

Campaign Progress Streaming

Monitor campaign execution in real-time. Perfect for building:
  • Campaign progress dashboards
  • Real-time analytics displays
  • Automated alerting systems
  • Performance monitoring tools

Event Types

Event TypeDescription
connection_establishedWebSocket connection successful
progressPeriodic progress update with counts
contact_completedIndividual contact call completed
campaign_completedCampaign finished (all contacts processed)
errorError message

Progress Event Data

{
  "type": "progress",
  "campaign_id": 123,
  "timestamp": "2026-01-19T12:05:00.000Z",
  "data": {
    "total_contacts": 100,
    "completed_contacts": 45,
    "successful_contacts": 38,
    "failed_contacts": 7,
    "pending_contacts": 55,
    "success_rate": 84.4
  }
}

Contact Completed Event

{
  "type": "contact_completed",
  "campaign_id": 123,
  "timestamp": "2026-01-19T12:05:30.000Z",
  "data": {
    "phone_number": "+14155551234",
    "name": "John Smith",
    "outcome": "success",
    "duration": 125,
    "call_sid": "CA456def..."
  }
}

Implementation Examples

import asyncio
from burki import BurkiClient

async def monitor_campaign(campaign_id: int):
    client = BurkiClient(api_key="your-api-key")
    
    async with client.realtime.campaign_progress(campaign_id=campaign_id) as stream:
        async for event in stream:
            if event.type == "progress":
                completed = event.data.completed_contacts
                total = event.data.total_contacts
                rate = event.data.success_rate
                print(f"Progress: {completed}/{total} ({rate:.1f}% success)")
                
            elif event.type == "contact_completed":
                phone = event.data.phone_number
                outcome = event.data.outcome
                print(f"Completed: {phone} - {outcome}")
                
            elif event.type == "campaign_completed":
                print(f"\nCampaign finished!")
                print(f"Total contacts: {event.data.total_contacts}")
                print(f"Success rate: {event.data.success_rate:.1f}%")
                break

# Run the monitor
asyncio.run(monitor_campaign(123))

Connection Management

Keepalive / Ping

Send periodic pings to keep the WebSocket connection alive, especially for long-running sessions:
import asyncio

async def with_keepalive(stream, interval=30):
    async def ping_loop():
        while True:
            await asyncio.sleep(interval)
            await stream.send_ping()
    
    # Start ping task
    ping_task = asyncio.create_task(ping_loop())
    
    try:
        async for event in stream:
            yield event
    finally:
        ping_task.cancel()

Reconnection Strategy

Implement exponential backoff for automatic reconnection:
import asyncio
from burki import BurkiClient

async def resilient_stream(call_sid: str, max_retries=5):
    client = BurkiClient(api_key="your-api-key")
    retries = 0
    
    while retries < max_retries:
        try:
            async with client.realtime.live_transcript(call_sid) as stream:
                retries = 0  # Reset on successful connection
                async for event in stream:
                    yield event
                    
        except Exception as e:
            retries += 1
            if retries >= max_retries:
                raise
            
            # Exponential backoff
            delay = min(2 ** retries, 60)
            print(f"Reconnecting in {delay}s (attempt {retries})")
            await asyncio.sleep(delay)

Best Practices

Use Streaming When:
  • You need real-time updates (< 1 second latency)
  • Building live dashboards or monitoring interfaces
  • Processing events as they happen (e.g., sentiment analysis)
  • Monitoring long-running operations (campaigns, calls)
Use Polling When:
  • Periodic status checks are sufficient (> 30 seconds)
  • Simple status display without live updates
  • Lower resource usage is a priority
  • Connection stability is a concern
Always Clean Up:
  • Close streams when done (close(), disconnect(), or context manager exit)
  • Cancel keepalive tasks/goroutines
  • Handle disconnection events gracefully
Memory Considerations:
  • Don’t store all events in memory for long streams
  • Process or persist events as they arrive
  • Use bounded buffers if accumulating data
Connection Limits:
  • Maximum 10 concurrent connections per call
  • Maximum 100 messages per second per connection
  • Consider connection pooling for multiple streams
Handle All Error Types:
  • Connection errors (network issues)
  • Authentication errors (invalid API key)
  • Not found errors (invalid call_sid or campaign_id)
  • Rate limit errors (too many connections)
Graceful Degradation:
  • Fall back to polling if streaming fails
  • Show stale data with “updating…” indicator
  • Log errors for debugging
User Experience:
  • Show connection status to users
  • Indicate when data may be stale
  • Provide manual refresh option
API Key Protection:
  • Never expose API keys in client-side code (browser)
  • Use server-side proxies for browser applications
  • Store keys in environment variables
Data Privacy:
  • Transcripts may contain sensitive information
  • Implement appropriate access controls
  • Consider data retention policies
Connection Security:
  • Always use WSS (WebSocket Secure) in production
  • Validate SSL certificates
  • Implement authentication tokens if needed

Use Case Examples

Call Monitoring Dashboard

Build a real-time call center monitoring interface:
class CallMonitoringDashboard {
  private activeStreams = new Map<string, RealtimeStream>();
  
  async monitorCall(callSid: string, displayElement: HTMLElement) {
    const stream = client.realtime.liveTranscript(callSid);
    await stream.connect();
    
    this.activeStreams.set(callSid, stream);
    
    for await (const event of stream) {
      if (event.type === 'transcript') {
        this.updateTranscriptDisplay(displayElement, event);
      } else if (event.type === 'call_status' && event.status === 'completed') {
        this.stopMonitoring(callSid);
        break;
      }
    }
  }
  
  stopMonitoring(callSid: string) {
    const stream = this.activeStreams.get(callSid);
    if (stream) {
      stream.disconnect();
      this.activeStreams.delete(callSid);
    }
  }
  
  stopAll() {
    for (const [callSid, stream] of this.activeStreams) {
      stream.disconnect();
    }
    this.activeStreams.clear();
  }
}

Real-time Sentiment Analysis

Analyze call sentiment as transcripts arrive:
from textblob import TextBlob

async def analyze_sentiment(call_sid: str):
    client = BurkiClient(api_key="your-api-key")
    sentiment_scores = []
    
    async with client.realtime.live_transcript(call_sid) as stream:
        async for event in stream:
            if event.type == "transcript" and event.data.speaker == "user":
                # Analyze user sentiment
                blob = TextBlob(event.data.content)
                score = blob.sentiment.polarity
                sentiment_scores.append(score)
                
                # Alert on negative sentiment
                if score < -0.5:
                    await alert_supervisor(call_sid, event.data.content, score)
                
                # Calculate rolling average
                avg = sum(sentiment_scores[-10:]) / min(len(sentiment_scores), 10)
                print(f"Rolling sentiment: {avg:.2f}")

Campaign Progress Webhook

Forward campaign events to external systems:
func forwardCampaignEvents(campaignID int, webhookURL string) {
    client := burki.NewClient(os.Getenv("BURKI_API_KEY"))
    stream := client.Realtime.CampaignProgress(campaignID)
    
    if err := stream.Connect(); err != nil {
        log.Fatal(err)
    }
    defer stream.Close()

    httpClient := &http.Client{Timeout: 10 * time.Second}
    
    for {
        select {
        case event := <-stream.Events:
            // Forward to webhook
            jsonData, _ := json.Marshal(event)
            resp, err := httpClient.Post(webhookURL, "application/json", 
                bytes.NewBuffer(jsonData))
            if err != nil {
                log.Printf("Webhook error: %v", err)
            } else {
                resp.Body.Close()
            }
            
        case <-stream.Done:
            return
        }
    }
}

Next Steps

Python SDK

Full Python SDK documentation

JavaScript SDK

Full JavaScript/TypeScript SDK documentation

Go SDK

Full Go SDK documentation

Live Transcript API

REST API WebSocket reference