Tuesday, January 20, 2026

The right way to Design a Absolutely Streaming Voice Agent with Finish-to-Finish Latency Budgets, Incremental ASR, LLM Streaming, and Actual-Time TTS


On this tutorial, we construct an end-to-end streaming voice agent that mirrors how fashionable low-latency conversational methods function in actual time. We simulate the entire pipeline, from chunked audio enter and streaming speech recognition to incremental language mannequin reasoning and streamed text-to-speech output, whereas explicitly monitoring latency at each stage. By working with strict latency budgets and observing metrics comparable to time to first token and time to first audio, we give attention to the sensible engineering trade-offs that form responsive voice-based person experiences. Take a look at the FULL CODES right here.

import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import Record, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt


@dataclass
class LatencyMetrics:
   audio_chunk_received: float = 0.0
   asr_started: float = 0.0
   asr_partial: float = 0.0
   asr_complete: float = 0.0
   llm_started: float = 0.0
   llm_first_token: float = 0.0
   llm_complete: float = 0.0
   tts_started: float = 0.0
   tts_first_chunk: float = 0.0
   tts_complete: float = 0.0


   def get_time_to_first_audio(self) -> float:
       return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0


   def get_total_latency(self) -> float:
       return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0


@dataclass
class LatencyBudgets:
   asr_processing: float = 0.1
   asr_finalization: float = 0.3
   llm_first_token: float = 0.5
   llm_token_generation: float = 0.02
   tts_first_chunk: float = 0.2
   tts_chunk_generation: float = 0.05
   time_to_first_audio: float = 1.0


class AgentState(Enum):
   LISTENING = "listening"
   PROCESSING_SPEECH = "processing_speech"
   THINKING = "pondering"
   SPEAKING = "talking"
   INTERRUPTED = "interrupted"

We outline the core knowledge constructions and state representations that permit us to trace latency throughout all the voice pipeline. We formalize timing indicators for ASR, LLM, and TTS to make sure constant measurement throughout all levels. We additionally set up a transparent agent state machine that guides how the system transitions throughout a conversational flip. Take a look at the FULL CODES right here.

class AudioInputStream:
   def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
       self.sample_rate = sample_rate
       self.chunk_duration_ms = chunk_duration_ms
       self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)


   async def stream_audio(self, textual content: str) -> AsyncIterator[np.ndarray]:
       chars_per_second = (150 * 5) / 60
       duration_seconds = len(textual content) / chars_per_second
       num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)


       for _ in vary(num_chunks):
           chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
           await asyncio.sleep(self.chunk_duration_ms / 1000)
           yield chunk

We simulate real-time audio enter by breaking speech into fixed-duration chunks that arrive asynchronously. We mannequin life like talking charges and streaming conduct to imitate reside microphone enter. We use this stream as the inspiration for testing downstream latency-sensitive parts. Take a look at the FULL CODES right here.

class StreamingASR:
   def __init__(self, latency_budget: float = 0.1):
       self.latency_budget = latency_budget
       self.silence_threshold = 0.5


   async def transcribe_stream(
       self,
       audio_stream: AsyncIterator[np.ndarray],
       ground_truth: str
   ) -> AsyncIterator[tuple[str, bool]]:
       phrases = ground_truth.break up()
       words_transcribed = 0
       silence_duration = 0.0
       chunk_count = 0


       async for chunk in audio_stream:
           chunk_count += 1
           await asyncio.sleep(self.latency_budget)


           if chunk_count % 3 == 0 and words_transcribed < len(phrases):
               words_transcribed += 1
               yield " ".be a part of(phrases[:words_transcribed]), False


           audio_power = np.imply(np.abs(chunk))
           silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0


           if silence_duration >= self.silence_threshold:
               await asyncio.sleep(0.2)
               yield ground_truth, True
               return


       yield ground_truth, True

We implement a streaming ASR module that produces partial transcriptions earlier than emitting a remaining end result. We progressively reveal phrases to mirror how fashionable ASR methods function in actual time. We additionally introduce silence-based finalization to approximate end-of-utterance detection. Take a look at the FULL CODES right here.

class StreamingLLM:
   def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
       self.time_to_first_token = time_to_first_token
       self.tokens_per_second = tokens_per_second


   async def generate_response(self, immediate: str) -> AsyncIterator[str]:
       responses = {
           "howdy": "Whats up! How can I provide help to immediately?",
           "climate": "The climate is sunny with a temperature of 72°F.",
           "time": "The present time is 2:30 PM.",
           "default": "I perceive. Let me provide help to with that."
       }


       response = responses["default"]
       for key in responses:
           if key in immediate.decrease():
               response = responses[key]
               break


       await asyncio.sleep(self.time_to_first_token)


       for phrase in response.break up():
           yield phrase + " "
           await asyncio.sleep(1.0 / self.tokens_per_second)


class StreamingTTS:
   def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
       self.time_to_first_chunk = time_to_first_chunk
       self.chars_per_second = chars_per_second


   async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
       first_chunk = True
       buffer = ""


       async for textual content in text_stream:
           buffer += textual content
           if len(buffer) >= 20 or first_chunk:
               if first_chunk:
                   await asyncio.sleep(self.time_to_first_chunk)
                   first_chunk = False


               length = len(buffer) / self.chars_per_second
               yield np.random.randn(int(16000 * length)).astype(np.float32) * 0.1
               buffer = ""
               await asyncio.sleep(length * 0.5)

On this snippet, we mannequin a streaming language mannequin and a streaming text-to-speech engine working collectively. We generate responses token by token to seize time-to-first-token conduct. We then convert incremental textual content into audio chunks to simulate early and steady speech synthesis. Take a look at the FULL CODES right here.

class StreamingVoiceAgent:
   def __init__(self, latency_budgets: LatencyBudgets):
       self.budgets = latency_budgets
       self.audio_stream = AudioInputStream()
       self.asr = StreamingASR(latency_budgets.asr_processing)
       self.llm = StreamingLLM(
           latency_budgets.llm_first_token,
           1.0 / latency_budgets.llm_token_generation
       )
       self.tts = StreamingTTS(
           latency_budgets.tts_first_chunk,
           1.0 / latency_budgets.tts_chunk_generation
       )
       self.state = AgentState.LISTENING
       self.metrics_history: Record[LatencyMetrics] = []


   async def process_turn(self, user_input: str) -> LatencyMetrics:
       metrics = LatencyMetrics()
       start_time = time.time()


       metrics.audio_chunk_received = time.time() - start_time
       audio_gen = self.audio_stream.stream_audio(user_input)


       metrics.asr_started = time.time() - start_time
       async for textual content, remaining in self.asr.transcribe_stream(audio_gen, user_input):
           if remaining:
               metrics.asr_complete = time.time() - start_time
               transcription = textual content


       metrics.llm_started = time.time() - start_time
       response = ""
       async for token in self.llm.generate_response(transcription):
           if not metrics.llm_first_token:
               metrics.llm_first_token = time.time() - start_time
           response += token


       metrics.llm_complete = time.time() - start_time
       metrics.tts_started = time.time() - start_time


       async def text_stream():
           for phrase in response.break up():
               yield phrase + " "


       async for _ in self.tts.synthesize_stream(text_stream()):
           if not metrics.tts_first_chunk:
               metrics.tts_first_chunk = time.time() - start_time


       metrics.tts_complete = time.time() - start_time
       self.metrics_history.append(metrics)
       return metrics

We orchestrate the total voice agent by wiring audio enter, ASR, LLM, and TTS right into a single asynchronous stream. We document exact timestamps at every transition to compute essential latency metrics. We deal with every person flip as an remoted experiment to allow systematic efficiency evaluation. Take a look at the FULL CODES right here.

async def run_demo():
   budgets = LatencyBudgets(
       asr_processing=0.08,
       llm_first_token=0.3,
       llm_token_generation=0.02,
       tts_first_chunk=0.15,
       time_to_first_audio=0.8
   )


   agent = StreamingVoiceAgent(budgets)


   inputs = [
       "Hello, how are you today?",
       "What's the weather like?",
       "Can you tell me the time?"
   ]


   for textual content in inputs:
       await agent.process_turn(textual content)
       await asyncio.sleep(1)


if __name__ == "__main__":
   asyncio.run(run_demo())

We run all the system throughout a number of conversational turns to watch latency consistency and variance. We apply aggressive latency budgets to emphasize the pipeline underneath life like constraints. We use these runs to validate whether or not the system meets responsiveness targets throughout interactions.

In conclusion, we demonstrated how a totally streaming voice agent will be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable efficiency ensures. We confirmed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived latency, even when complete computation time stays non-trivial. This strategy helps us purpose systematically about turn-taking, responsiveness, and optimization levers, and it offers a strong basis for extending the system towards real-world deployments utilizing manufacturing ASR, LLM, and TTS fashions.


Take a look at the FULL CODES right here. Additionally, be at liberty to comply with us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be a part of us on telegram as nicely.


Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.

Related Articles

Latest Articles