Adapters¶
Built-in agent adapters for different invocation styles.
adapters
¶
Built-in agent adapters for different invocation styles.
GeminiAgent
¶
GeminiAgent(*, client: Any | None = None, model: str = 'gemini-2.0-flash', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None)
Agent adapter that wraps a google.genai.Client object directly.
Sends audio to Gemini via client.aio.models.generate_content()
and auto-parses function-call responses with :class:GeminiResponseParser.
Usage::
from google import genai
client = genai.Client(api_key="...")
agent = GeminiAgent(
client=client,
model="gemini-2.0-flash",
tools=[book_flight_declaration],
)
response = await agent.run(audio)
For Vertex AI::
client = genai.Client(vertexai=True, project="my-project", location="us-central1")
agent = GeminiAgent(client=client, model="gemini-2.0-flash", tools=[...])
Source code in src/russo/adapters/gemini.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio to Gemini and parse the tool-call response.
Source code in src/russo/adapters/gemini.py
GeminiLiveAgent
¶
GeminiLiveAgent(*, client: Any | None = None, session: Any | None = None, model: str = 'gemini-live-2.5-flash-native-audio', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None, response_timeout: float = 30.0)
Agent adapter for Gemini's Live API (streaming/real-time).
Connects via client.aio.live.connect(), sends audio via
send_realtime_input, and collects function-call responses.
Accepts either a google.genai.Client (new session per run) or
a pre-existing Live session.
Source code in src/russo/adapters/gemini.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio to a Live session and collect function calls.
Source code in src/russo/adapters/gemini.py
HttpAgent
¶
HttpAgent(*, url: str, parser: ResponseParser | None = None, method: str = 'POST', headers: dict[str, str] | None = None, audio_field: str = 'audio', format_field: str = 'format', timeout: float = 60.0)
Agent adapter that sends audio to an HTTP endpoint.
Sends audio as base64-encoded JSON and parses the response using an optional ResponseParser.
Usage
agent = HttpAgent( url="http://localhost:8000/voice-agent", parser=russo.parsers.GeminiResponseParser(), ) response = await agent.run(audio)
Source code in src/russo/adapters/http.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio to the HTTP endpoint and parse the response.
Source code in src/russo/adapters/http.py
OpenAIAgent
¶
OpenAIAgent(*, client: Any, model: str = 'gpt-4o-audio-preview', tools: list[Any] | None = None, system_prompt: str | None = None, extra_create_kwargs: dict[str, Any] | None = None)
Agent adapter that wraps an AsyncOpenAI client for Chat Completions.
Sends audio via the Chat Completions API and auto-parses tool-call
responses with :class:OpenAIResponseParser.
Usage::
from openai import AsyncOpenAI
client = AsyncOpenAI()
agent = OpenAIAgent(
client=client,
model="gpt-4o-audio-preview",
tools=[{
"type": "function",
"function": {
"name": "book_flight",
"parameters": {"type": "object", "properties": {...}},
},
}],
)
response = await agent.run(audio)
| PARAMETER | DESCRIPTION |
|---|---|
client
|
An
TYPE:
|
model
|
Model name supporting audio input.
TYPE:
|
tools
|
OpenAI tool definitions (function-calling format).
TYPE:
|
system_prompt
|
Optional system message prepended to the conversation.
TYPE:
|
extra_create_kwargs
|
Additional kwargs forwarded to
TYPE:
|
Source code in src/russo/adapters/openai.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio via Chat Completions and parse the tool-call response.
Source code in src/russo/adapters/openai.py
OpenAIRealtimeAgent
¶
OpenAIRealtimeAgent(*, client: Any | None = None, connection: Any | None = None, model: str = 'gpt-4o-realtime-preview', tools: list[Any] | None = None, response_timeout: float = 30.0)
Agent adapter for OpenAI's Realtime API.
Connects via the SDK's client.beta.realtime.connect() interface,
sends audio, and collects response.function_call_arguments.done events.
Accepts either:
- An
AsyncOpenAIclient (creates a new connection perrun()) - A pre-existing realtime connection (reuses it, no session config sent)
Usage with client::
from openai import AsyncOpenAI
client = AsyncOpenAI()
agent = OpenAIRealtimeAgent(
client=client,
model="gpt-4o-realtime-preview",
tools=[{
"type": "function",
"name": "book_flight",
"description": "Book a flight",
"parameters": {"type": "object", "properties": {...}},
}],
)
response = await agent.run(audio)
Usage with pre-existing connection::
async with client.beta.realtime.connect(model="gpt-4o-realtime-preview") as conn:
agent = OpenAIRealtimeAgent(connection=conn)
response = await agent.run(audio)
Note
The Realtime API expects pcm16 audio at 24 kHz mono by default. If you pass WAV audio, the adapter automatically strips the WAV header to extract raw PCM frames.
| PARAMETER | DESCRIPTION |
|---|---|
client
|
An
TYPE:
|
connection
|
A pre-existing realtime connection (from
TYPE:
|
model
|
Realtime model name.
TYPE:
|
tools
|
Tool definitions sent during session configuration.
TYPE:
|
response_timeout
|
Max seconds to wait for a complete response.
TYPE:
|
Source code in src/russo/adapters/openai.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio via the Realtime API and collect function calls.
Source code in src/russo/adapters/openai.py
WebSocketAgent
¶
WebSocketAgent(*, url: str, parser: ResponseParser | None = None, headers: dict[str, str] | None = None, send_bytes: bool = False, audio_field: str = 'audio', format_field: str = 'format', response_timeout: float = 30.0, max_messages: int = 100, on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None, on_send: Callable[[Audio], str | bytes] | None = None, is_complete: Callable[[list[Any]], bool] | None = None, aggregate: Callable[[list[Any]], Any] | None = None, open_timeout: float = 10.0, close_timeout: float = 5.0, extra_ws_kwargs: dict[str, Any] | None = None)
Agent adapter that communicates over WebSocket.
Supports two usage modes:
High-level — call :meth:run with an :class:~russo.Audio object.
The adapter connects, runs the on_connect hook (if set), sends the
audio, collects responses until a completion condition is met, then
parses tool calls.
Session-based — use :meth:session as an async context manager to
obtain a :class:WebSocketSession and drive the protocol yourself::
async with agent.session() as sess:
# Wait for server-ready before sending anything
await sess.wait_for(lambda m: m.get("ready"))
await sess.send_json({"audio": base64_data, "format": "wav"})
async for msg in sess:
if is_done(msg):
break
Send modes
- json (default):
{"audio": "<base64>", "format": "wav"} - bytes: raw audio bytes on the wire (
send_bytes=True)
Hooks (apply to both :meth:run and :meth:session)
on_connect: called with the :class:WebSocketSessionright after the connection opens, before anything else — use it to wait for a server-ready messageon_send: transform an :class:~russo.Audiointo the wire format your server expects (bypasses send_bytes / audio_field)is_complete: returnTruewhen :meth:runshould stop collecting response messagesaggregate: fold multiple response messages into a single object before parsing
Examples::
# Simple JSON protocol
agent = WebSocketAgent(url="ws://localhost:8000/ws/agent")
# Raw bytes (e.g. streaming PCM)
agent = WebSocketAgent(url="ws://localhost:8000/ws/stream", send_bytes=True)
# Wait for setupComplete, then send a custom format
async def wait_ready(session: WebSocketSession) -> None:
await session.wait_for(lambda m: isinstance(m, dict) and m.get("setupComplete"))
agent = WebSocketAgent(
url="ws://localhost:8000/ws/agent",
on_connect=wait_ready,
on_send=lambda audio: json.dumps({"pcm": base64.b64encode(audio.data).decode()}),
is_complete=lambda msgs: any(isinstance(m, dict) and m.get("done") for m in msgs),
)
Source code in src/russo/adapters/websocket.py
session
async
¶
session() -> AsyncIterator[WebSocketSession]
Open a WebSocket connection and yield a :class:WebSocketSession.
The connection is kept alive for the duration of the async with
block and closed cleanly on exit. If an on_connect hook is
configured it is invoked before the session is yielded, so the caller
always receives a fully-initialised session.
Example::
async with agent.session() as sess:
await sess.send_after(
lambda m: isinstance(m, dict) and m.get("ready"),
json_data={"audio": base64_audio},
)
result = await sess.receive(timeout=30.0)
Source code in src/russo/adapters/websocket.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio over WebSocket and collect the response.
Source code in src/russo/adapters/websocket.py
WebSocketSession
¶
Active WebSocket session with full-duplex communication.
Obtained via :meth:WebSocketAgent.session. Wraps a live websockets
connection and adds structured send/receive primitives plus event-triggered
sending.
Sending
- :meth:
send_text— send a raw text frame - :meth:
send_bytes— send a raw binary frame - :meth:
send_json— JSON-encode an object and send as a text frame
Receiving
- :meth:
receive— pull the next message (drains internal buffer first) - :meth:
wait_for— block until a matching message arrives; non-matching messages are pushed into an internal buffer so they are not lost - :meth:
send_after— convenience: wait for a trigger event then immediately send, in a single call
Iteration
The session is async-iterable::
async for msg in session:
... # stops when the connection closes
Internal buffer
:meth:wait_for may read ahead to find a matching message. Any messages
that arrive before the match are kept in an internal FIFO buffer.
Subsequent calls to :meth:receive or async iteration drain that buffer
before touching the wire again.
Source code in src/russo/adapters/websocket.py
send_text
async
¶
send_bytes
async
¶
send_json
async
¶
receive
async
¶
Return the next incoming message.
Drains the internal buffer before reading from the wire.
| PARAMETER | DESCRIPTION |
|---|---|
timeout
|
Seconds to wait.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
TimeoutError
|
If timeout elapses with no message available. |
Source code in src/russo/adapters/websocket.py
wait_for
async
¶
Wait until a message satisfying predicate arrives.
Messages that do not match are pushed into the internal buffer so they
are not lost; subsequent :meth:receive calls or async iteration will
return them in order.
| PARAMETER | DESCRIPTION |
|---|---|
predicate
|
Callable that receives a parsed message and returns
TYPE:
|
timeout
|
Maximum seconds to wait (default 30).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The first message for which predicate returned |
| RAISES | DESCRIPTION |
|---|---|
TimeoutError
|
If no matching message arrives within timeout. |
Example::
# Block until the server signals readiness
await session.wait_for(
lambda m: isinstance(m, dict) and m.get("ready")
)
Source code in src/russo/adapters/websocket.py
send_after
async
¶
send_after(predicate: Callable[[Any], bool], *, text: str | None = None, data: bytes | None = None, json_data: Any = None, timeout: float = 30.0) -> Any
Wait for a matching event, then send — in a single call.
Exactly one of text, data, or json_data should be provided.
Non-matching messages received while waiting are buffered (see
:meth:wait_for).
| PARAMETER | DESCRIPTION |
|---|---|
predicate
|
Same semantics as :meth:
TYPE:
|
text
|
Text frame to send after the trigger.
TYPE:
|
data
|
Binary frame to send after the trigger.
TYPE:
|
json_data
|
Object to JSON-encode and send after the trigger.
TYPE:
|
timeout
|
Forwarded to :meth:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The triggering message (the one matched by predicate). |
Example::
# Send audio only after the server signals setup is complete
await session.send_after(
lambda m: isinstance(m, dict) and m.get("setupComplete"),
json_data={"audio": base64_audio, "format": "wav"},
)
Source code in src/russo/adapters/websocket.py
callable
¶
Callable agent adapter — wraps async functions as Agents.
Re-exports from _helpers for organizational clarity. The actual implementation lives in russo._helpers to keep the public API flat (russo.agent decorator).
agent
¶
agent(fn: Callable[[Audio], Coroutine[Any, Any, AgentResponse]]) -> _CallableAgent
Decorator to turn an async function into an Agent.
Usage
@russo.agent async def my_agent(audio: russo.Audio) -> russo.AgentResponse: result = await call_my_api(audio.data) return russo.AgentResponse(tool_calls=[...])
Source code in src/russo/_helpers.py
gemini
¶
Gemini SDK agent adapters — wraps a google-genai Client as an Agent.
Provides two adapters:
- GeminiAgent: standard
generate_content(request/response) - GeminiLiveAgent: Live API over WebSocket (streaming/real-time)
Send audio directly to a Gemini model via the SDK, no HTTP endpoint needed.
Requires the google-genai package (already a core dependency).
GeminiAgent
¶
GeminiAgent(*, client: Any | None = None, model: str = 'gemini-2.0-flash', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None)
Agent adapter that wraps a google.genai.Client object directly.
Sends audio to Gemini via client.aio.models.generate_content()
and auto-parses function-call responses with :class:GeminiResponseParser.
Usage::
from google import genai
client = genai.Client(api_key="...")
agent = GeminiAgent(
client=client,
model="gemini-2.0-flash",
tools=[book_flight_declaration],
)
response = await agent.run(audio)
For Vertex AI::
client = genai.Client(vertexai=True, project="my-project", location="us-central1")
agent = GeminiAgent(client=client, model="gemini-2.0-flash", tools=[...])
Source code in src/russo/adapters/gemini.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio to Gemini and parse the tool-call response.
Source code in src/russo/adapters/gemini.py
GeminiLiveAgent
¶
GeminiLiveAgent(*, client: Any | None = None, session: Any | None = None, model: str = 'gemini-live-2.5-flash-native-audio', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None, response_timeout: float = 30.0)
Agent adapter for Gemini's Live API (streaming/real-time).
Connects via client.aio.live.connect(), sends audio via
send_realtime_input, and collects function-call responses.
Accepts either a google.genai.Client (new session per run) or
a pre-existing Live session.
Source code in src/russo/adapters/gemini.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio to a Live session and collect function calls.
Source code in src/russo/adapters/gemini.py
http
¶
HTTP agent adapter — sends audio to an API endpoint.
HttpAgent
¶
HttpAgent(*, url: str, parser: ResponseParser | None = None, method: str = 'POST', headers: dict[str, str] | None = None, audio_field: str = 'audio', format_field: str = 'format', timeout: float = 60.0)
Agent adapter that sends audio to an HTTP endpoint.
Sends audio as base64-encoded JSON and parses the response using an optional ResponseParser.
Usage
agent = HttpAgent( url="http://localhost:8000/voice-agent", parser=russo.parsers.GeminiResponseParser(), ) response = await agent.run(audio)
Source code in src/russo/adapters/http.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio to the HTTP endpoint and parse the response.
Source code in src/russo/adapters/http.py
openai
¶
OpenAI SDK agent adapters — wraps OpenAI clients as Agents.
Provides two adapters:
- OpenAIAgent: standard Chat Completions with audio input
(
gpt-4o-audio-previewand similar models) - OpenAIRealtimeAgent: Realtime API over WebSocket
(
gpt-4o-realtime-preview)
Requires the openai package: pip install russo[openai]
OpenAIAgent
¶
OpenAIAgent(*, client: Any, model: str = 'gpt-4o-audio-preview', tools: list[Any] | None = None, system_prompt: str | None = None, extra_create_kwargs: dict[str, Any] | None = None)
Agent adapter that wraps an AsyncOpenAI client for Chat Completions.
Sends audio via the Chat Completions API and auto-parses tool-call
responses with :class:OpenAIResponseParser.
Usage::
from openai import AsyncOpenAI
client = AsyncOpenAI()
agent = OpenAIAgent(
client=client,
model="gpt-4o-audio-preview",
tools=[{
"type": "function",
"function": {
"name": "book_flight",
"parameters": {"type": "object", "properties": {...}},
},
}],
)
response = await agent.run(audio)
| PARAMETER | DESCRIPTION |
|---|---|
client
|
An
TYPE:
|
model
|
Model name supporting audio input.
TYPE:
|
tools
|
OpenAI tool definitions (function-calling format).
TYPE:
|
system_prompt
|
Optional system message prepended to the conversation.
TYPE:
|
extra_create_kwargs
|
Additional kwargs forwarded to
TYPE:
|
Source code in src/russo/adapters/openai.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio via Chat Completions and parse the tool-call response.
Source code in src/russo/adapters/openai.py
OpenAIRealtimeAgent
¶
OpenAIRealtimeAgent(*, client: Any | None = None, connection: Any | None = None, model: str = 'gpt-4o-realtime-preview', tools: list[Any] | None = None, response_timeout: float = 30.0)
Agent adapter for OpenAI's Realtime API.
Connects via the SDK's client.beta.realtime.connect() interface,
sends audio, and collects response.function_call_arguments.done events.
Accepts either:
- An
AsyncOpenAIclient (creates a new connection perrun()) - A pre-existing realtime connection (reuses it, no session config sent)
Usage with client::
from openai import AsyncOpenAI
client = AsyncOpenAI()
agent = OpenAIRealtimeAgent(
client=client,
model="gpt-4o-realtime-preview",
tools=[{
"type": "function",
"name": "book_flight",
"description": "Book a flight",
"parameters": {"type": "object", "properties": {...}},
}],
)
response = await agent.run(audio)
Usage with pre-existing connection::
async with client.beta.realtime.connect(model="gpt-4o-realtime-preview") as conn:
agent = OpenAIRealtimeAgent(connection=conn)
response = await agent.run(audio)
Note
The Realtime API expects pcm16 audio at 24 kHz mono by default. If you pass WAV audio, the adapter automatically strips the WAV header to extract raw PCM frames.
| PARAMETER | DESCRIPTION |
|---|---|
client
|
An
TYPE:
|
connection
|
A pre-existing realtime connection (from
TYPE:
|
model
|
Realtime model name.
TYPE:
|
tools
|
Tool definitions sent during session configuration.
TYPE:
|
response_timeout
|
Max seconds to wait for a complete response.
TYPE:
|
Source code in src/russo/adapters/openai.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio via the Realtime API and collect function calls.
Source code in src/russo/adapters/openai.py
websocket
¶
WebSocket agent adapter — sends audio to a WebSocket endpoint.
Requires the websockets package: pip install russo[ws]
WebSocketSession
¶
Active WebSocket session with full-duplex communication.
Obtained via :meth:WebSocketAgent.session. Wraps a live websockets
connection and adds structured send/receive primitives plus event-triggered
sending.
Sending
- :meth:
send_text— send a raw text frame - :meth:
send_bytes— send a raw binary frame - :meth:
send_json— JSON-encode an object and send as a text frame
Receiving
- :meth:
receive— pull the next message (drains internal buffer first) - :meth:
wait_for— block until a matching message arrives; non-matching messages are pushed into an internal buffer so they are not lost - :meth:
send_after— convenience: wait for a trigger event then immediately send, in a single call
Iteration
The session is async-iterable::
async for msg in session:
... # stops when the connection closes
Internal buffer
:meth:wait_for may read ahead to find a matching message. Any messages
that arrive before the match are kept in an internal FIFO buffer.
Subsequent calls to :meth:receive or async iteration drain that buffer
before touching the wire again.
Source code in src/russo/adapters/websocket.py
send_text
async
¶
send_bytes
async
¶
send_json
async
¶
receive
async
¶
Return the next incoming message.
Drains the internal buffer before reading from the wire.
| PARAMETER | DESCRIPTION |
|---|---|
timeout
|
Seconds to wait.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
TimeoutError
|
If timeout elapses with no message available. |
Source code in src/russo/adapters/websocket.py
wait_for
async
¶
Wait until a message satisfying predicate arrives.
Messages that do not match are pushed into the internal buffer so they
are not lost; subsequent :meth:receive calls or async iteration will
return them in order.
| PARAMETER | DESCRIPTION |
|---|---|
predicate
|
Callable that receives a parsed message and returns
TYPE:
|
timeout
|
Maximum seconds to wait (default 30).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The first message for which predicate returned |
| RAISES | DESCRIPTION |
|---|---|
TimeoutError
|
If no matching message arrives within timeout. |
Example::
# Block until the server signals readiness
await session.wait_for(
lambda m: isinstance(m, dict) and m.get("ready")
)
Source code in src/russo/adapters/websocket.py
send_after
async
¶
send_after(predicate: Callable[[Any], bool], *, text: str | None = None, data: bytes | None = None, json_data: Any = None, timeout: float = 30.0) -> Any
Wait for a matching event, then send — in a single call.
Exactly one of text, data, or json_data should be provided.
Non-matching messages received while waiting are buffered (see
:meth:wait_for).
| PARAMETER | DESCRIPTION |
|---|---|
predicate
|
Same semantics as :meth:
TYPE:
|
text
|
Text frame to send after the trigger.
TYPE:
|
data
|
Binary frame to send after the trigger.
TYPE:
|
json_data
|
Object to JSON-encode and send after the trigger.
TYPE:
|
timeout
|
Forwarded to :meth:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The triggering message (the one matched by predicate). |
Example::
# Send audio only after the server signals setup is complete
await session.send_after(
lambda m: isinstance(m, dict) and m.get("setupComplete"),
json_data={"audio": base64_audio, "format": "wav"},
)
Source code in src/russo/adapters/websocket.py
WebSocketAgent
¶
WebSocketAgent(*, url: str, parser: ResponseParser | None = None, headers: dict[str, str] | None = None, send_bytes: bool = False, audio_field: str = 'audio', format_field: str = 'format', response_timeout: float = 30.0, max_messages: int = 100, on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None, on_send: Callable[[Audio], str | bytes] | None = None, is_complete: Callable[[list[Any]], bool] | None = None, aggregate: Callable[[list[Any]], Any] | None = None, open_timeout: float = 10.0, close_timeout: float = 5.0, extra_ws_kwargs: dict[str, Any] | None = None)
Agent adapter that communicates over WebSocket.
Supports two usage modes:
High-level — call :meth:run with an :class:~russo.Audio object.
The adapter connects, runs the on_connect hook (if set), sends the
audio, collects responses until a completion condition is met, then
parses tool calls.
Session-based — use :meth:session as an async context manager to
obtain a :class:WebSocketSession and drive the protocol yourself::
async with agent.session() as sess:
# Wait for server-ready before sending anything
await sess.wait_for(lambda m: m.get("ready"))
await sess.send_json({"audio": base64_data, "format": "wav"})
async for msg in sess:
if is_done(msg):
break
Send modes
- json (default):
{"audio": "<base64>", "format": "wav"} - bytes: raw audio bytes on the wire (
send_bytes=True)
Hooks (apply to both :meth:run and :meth:session)
on_connect: called with the :class:WebSocketSessionright after the connection opens, before anything else — use it to wait for a server-ready messageon_send: transform an :class:~russo.Audiointo the wire format your server expects (bypasses send_bytes / audio_field)is_complete: returnTruewhen :meth:runshould stop collecting response messagesaggregate: fold multiple response messages into a single object before parsing
Examples::
# Simple JSON protocol
agent = WebSocketAgent(url="ws://localhost:8000/ws/agent")
# Raw bytes (e.g. streaming PCM)
agent = WebSocketAgent(url="ws://localhost:8000/ws/stream", send_bytes=True)
# Wait for setupComplete, then send a custom format
async def wait_ready(session: WebSocketSession) -> None:
await session.wait_for(lambda m: isinstance(m, dict) and m.get("setupComplete"))
agent = WebSocketAgent(
url="ws://localhost:8000/ws/agent",
on_connect=wait_ready,
on_send=lambda audio: json.dumps({"pcm": base64.b64encode(audio.data).decode()}),
is_complete=lambda msgs: any(isinstance(m, dict) and m.get("done") for m in msgs),
)
Source code in src/russo/adapters/websocket.py
session
async
¶
session() -> AsyncIterator[WebSocketSession]
Open a WebSocket connection and yield a :class:WebSocketSession.
The connection is kept alive for the duration of the async with
block and closed cleanly on exit. If an on_connect hook is
configured it is invoked before the session is yielded, so the caller
always receives a fully-initialised session.
Example::
async with agent.session() as sess:
await sess.send_after(
lambda m: isinstance(m, dict) and m.get("ready"),
json_data={"audio": base64_audio},
)
result = await sess.receive(timeout=30.0)
Source code in src/russo/adapters/websocket.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio over WebSocket and collect the response.