On this tutorial, we construct an end-to-end streaming voice agent that mirrors how fashionable low-latency conversational methods function in actual time. We simulate the entire pipeline, from chunked audio enter and streaming speech recognition to incremental language mannequin reasoning and streamed text-to-speech output, whereas explicitly monitoring latency at each stage. By working with strict latency budgets and observing metrics comparable to time to first token and time to first audio, we give attention to the sensible engineering trade-offs that form responsive voice-based person experiences. Take a look at the FULL CODES right here.
import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import Record, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt
@dataclass
class LatencyMetrics:
audio_chunk_received: float = 0.0
asr_started: float = 0.0
asr_partial: float = 0.0
asr_complete: float = 0.0
llm_started: float = 0.0
llm_first_token: float = 0.0
llm_complete: float = 0.0
tts_started: float = 0.0
tts_first_chunk: float = 0.0
tts_complete: float = 0.0
def get_time_to_first_audio(self) -> float:
return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0
def get_total_latency(self) -> float:
return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0
@dataclass
class LatencyBudgets:
asr_processing: float = 0.1
asr_finalization: float = 0.3
llm_first_token: float = 0.5
llm_token_generation: float = 0.02
tts_first_chunk: float = 0.2
tts_chunk_generation: float = 0.05
time_to_first_audio: float = 1.0
class AgentState(Enum):
LISTENING = "listening"
PROCESSING_SPEECH = "processing_speech"
THINKING = "pondering"
SPEAKING = "talking"
INTERRUPTED = "interrupted"
We outline the core knowledge constructions and state representations that permit us to trace latency throughout all the voice pipeline. We formalize timing indicators for ASR, LLM, and TTS to make sure constant measurement throughout all levels. We additionally set up a transparent agent state machine that guides how the system transitions throughout a conversational flip. Take a look at the FULL CODES right here.
class AudioInputStream:
def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
async def stream_audio(self, textual content: str) -> AsyncIterator[np.ndarray]:
chars_per_second = (150 * 5) / 60
duration_seconds = len(textual content) / chars_per_second
num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)
for _ in vary(num_chunks):
chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
await asyncio.sleep(self.chunk_duration_ms / 1000)
yield chunk
We simulate real-time audio enter by breaking speech into fixed-duration chunks that arrive asynchronously. We mannequin life like talking charges and streaming conduct to imitate reside microphone enter. We use this stream as the inspiration for testing downstream latency-sensitive parts. Take a look at the FULL CODES right here.
class StreamingASR:
def __init__(self, latency_budget: float = 0.1):
self.latency_budget = latency_budget
self.silence_threshold = 0.5
async def transcribe_stream(
self,
audio_stream: AsyncIterator[np.ndarray],
ground_truth: str
) -> AsyncIterator[tuple[str, bool]]:
phrases = ground_truth.break up()
words_transcribed = 0
silence_duration = 0.0
chunk_count = 0
async for chunk in audio_stream:
chunk_count += 1
await asyncio.sleep(self.latency_budget)
if chunk_count % 3 == 0 and words_transcribed < len(phrases):
words_transcribed += 1
yield " ".be a part of(phrases[:words_transcribed]), False
audio_power = np.imply(np.abs(chunk))
silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0
if silence_duration >= self.silence_threshold:
await asyncio.sleep(0.2)
yield ground_truth, True
return
yield ground_truth, True
We implement a streaming ASR module that produces partial transcriptions earlier than emitting a remaining end result. We progressively reveal phrases to mirror how fashionable ASR methods function in actual time. We additionally introduce silence-based finalization to approximate end-of-utterance detection. Take a look at the FULL CODES right here.
class StreamingLLM:
def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
self.time_to_first_token = time_to_first_token
self.tokens_per_second = tokens_per_second
async def generate_response(self, immediate: str) -> AsyncIterator[str]:
responses = {
"howdy": "Whats up! How can I provide help to immediately?",
"climate": "The climate is sunny with a temperature of 72°F.",
"time": "The present time is 2:30 PM.",
"default": "I perceive. Let me provide help to with that."
}
response = responses["default"]
for key in responses:
if key in immediate.decrease():
response = responses[key]
break
await asyncio.sleep(self.time_to_first_token)
for phrase in response.break up():
yield phrase + " "
await asyncio.sleep(1.0 / self.tokens_per_second)
class StreamingTTS:
def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
self.time_to_first_chunk = time_to_first_chunk
self.chars_per_second = chars_per_second
async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
first_chunk = True
buffer = ""
async for textual content in text_stream:
buffer += textual content
if len(buffer) >= 20 or first_chunk:
if first_chunk:
await asyncio.sleep(self.time_to_first_chunk)
first_chunk = False
length = len(buffer) / self.chars_per_second
yield np.random.randn(int(16000 * length)).astype(np.float32) * 0.1
buffer = ""
await asyncio.sleep(length * 0.5)
On this snippet, we mannequin a streaming language mannequin and a streaming text-to-speech engine working collectively. We generate responses token by token to seize time-to-first-token conduct. We then convert incremental textual content into audio chunks to simulate early and steady speech synthesis. Take a look at the FULL CODES right here.
class StreamingVoiceAgent:
def __init__(self, latency_budgets: LatencyBudgets):
self.budgets = latency_budgets
self.audio_stream = AudioInputStream()
self.asr = StreamingASR(latency_budgets.asr_processing)
self.llm = StreamingLLM(
latency_budgets.llm_first_token,
1.0 / latency_budgets.llm_token_generation
)
self.tts = StreamingTTS(
latency_budgets.tts_first_chunk,
1.0 / latency_budgets.tts_chunk_generation
)
self.state = AgentState.LISTENING
self.metrics_history: Record[LatencyMetrics] = []
async def process_turn(self, user_input: str) -> LatencyMetrics:
metrics = LatencyMetrics()
start_time = time.time()
metrics.audio_chunk_received = time.time() - start_time
audio_gen = self.audio_stream.stream_audio(user_input)
metrics.asr_started = time.time() - start_time
async for textual content, remaining in self.asr.transcribe_stream(audio_gen, user_input):
if remaining:
metrics.asr_complete = time.time() - start_time
transcription = textual content
metrics.llm_started = time.time() - start_time
response = ""
async for token in self.llm.generate_response(transcription):
if not metrics.llm_first_token:
metrics.llm_first_token = time.time() - start_time
response += token
metrics.llm_complete = time.time() - start_time
metrics.tts_started = time.time() - start_time
async def text_stream():
for phrase in response.break up():
yield phrase + " "
async for _ in self.tts.synthesize_stream(text_stream()):
if not metrics.tts_first_chunk:
metrics.tts_first_chunk = time.time() - start_time
metrics.tts_complete = time.time() - start_time
self.metrics_history.append(metrics)
return metrics
We orchestrate the total voice agent by wiring audio enter, ASR, LLM, and TTS right into a single asynchronous stream. We document exact timestamps at every transition to compute essential latency metrics. We deal with every person flip as an remoted experiment to allow systematic efficiency evaluation. Take a look at the FULL CODES right here.
async def run_demo():
budgets = LatencyBudgets(
asr_processing=0.08,
llm_first_token=0.3,
llm_token_generation=0.02,
tts_first_chunk=0.15,
time_to_first_audio=0.8
)
agent = StreamingVoiceAgent(budgets)
inputs = [
"Hello, how are you today?",
"What's the weather like?",
"Can you tell me the time?"
]
for textual content in inputs:
await agent.process_turn(textual content)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(run_demo())
We run all the system throughout a number of conversational turns to watch latency consistency and variance. We apply aggressive latency budgets to emphasize the pipeline underneath life like constraints. We use these runs to validate whether or not the system meets responsiveness targets throughout interactions.
In conclusion, we demonstrated how a totally streaming voice agent will be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable efficiency ensures. We confirmed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived latency, even when complete computation time stays non-trivial. This strategy helps us purpose systematically about turn-taking, responsiveness, and optimization levers, and it offers a strong basis for extending the system towards real-world deployments utilizing manufacturing ASR, LLM, and TTS fashions.
Take a look at the FULL CODES right here. Additionally, be at liberty to comply with us on Twitter and don’t overlook to affix our 100k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be a part of us on telegram as nicely.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.
