WebSocket Adapter¶
WebSocket agent adapter — sends audio to a WebSocket endpoint.
Note
Requires the ws extra: pip install "russo[ws]"
websocket
¶
WebSocket agent adapter — sends audio to a WebSocket endpoint.
Requires the websockets package: pip install russo[ws]
WebSocketSession
¶
Active WebSocket session with full-duplex communication.
Obtained via :meth:WebSocketAgent.session. Wraps a live websockets
connection and adds structured send/receive primitives plus event-triggered
sending.
Sending
- :meth:
send_text— send a raw text frame - :meth:
send_bytes— send a raw binary frame - :meth:
send_json— JSON-encode an object and send as a text frame
Receiving
- :meth:
receive— pull the next message (drains internal buffer first) - :meth:
wait_for— block until a matching message arrives; non-matching messages are pushed into an internal buffer so they are not lost - :meth:
send_after— convenience: wait for a trigger event then immediately send, in a single call
Iteration
The session is async-iterable::
async for msg in session:
... # stops when the connection closes
Internal buffer
:meth:wait_for may read ahead to find a matching message. Any messages
that arrive before the match are kept in an internal FIFO buffer.
Subsequent calls to :meth:receive or async iteration drain that buffer
before touching the wire again.
Source code in src/russo/adapters/websocket.py
send_text
async
¶
send_bytes
async
¶
send_json
async
¶
receive
async
¶
Return the next incoming message.
Drains the internal buffer before reading from the wire.
| PARAMETER | DESCRIPTION |
|---|---|
timeout
|
Seconds to wait.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
TimeoutError
|
If timeout elapses with no message available. |
Source code in src/russo/adapters/websocket.py
wait_for
async
¶
Wait until a message satisfying predicate arrives.
Messages that do not match are pushed into the internal buffer so they
are not lost; subsequent :meth:receive calls or async iteration will
return them in order.
| PARAMETER | DESCRIPTION |
|---|---|
predicate
|
Callable that receives a parsed message and returns
TYPE:
|
timeout
|
Maximum seconds to wait (default 30).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The first message for which predicate returned |
| RAISES | DESCRIPTION |
|---|---|
TimeoutError
|
If no matching message arrives within timeout. |
Example::
# Block until the server signals readiness
await session.wait_for(
lambda m: isinstance(m, dict) and m.get("ready")
)
Source code in src/russo/adapters/websocket.py
send_after
async
¶
send_after(predicate: Callable[[Any], bool], *, text: str | None = None, data: bytes | None = None, json_data: Any = None, timeout: float = 30.0) -> Any
Wait for a matching event, then send — in a single call.
Exactly one of text, data, or json_data should be provided.
Non-matching messages received while waiting are buffered (see
:meth:wait_for).
| PARAMETER | DESCRIPTION |
|---|---|
predicate
|
Same semantics as :meth:
TYPE:
|
text
|
Text frame to send after the trigger.
TYPE:
|
data
|
Binary frame to send after the trigger.
TYPE:
|
json_data
|
Object to JSON-encode and send after the trigger.
TYPE:
|
timeout
|
Forwarded to :meth:
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Any
|
The triggering message (the one matched by predicate). |
Example::
# Send audio only after the server signals setup is complete
await session.send_after(
lambda m: isinstance(m, dict) and m.get("setupComplete"),
json_data={"audio": base64_audio, "format": "wav"},
)
Source code in src/russo/adapters/websocket.py
WebSocketAgent
¶
WebSocketAgent(*, url: str, parser: ResponseParser | None = None, headers: dict[str, str] | None = None, send_bytes: bool = False, audio_field: str = 'audio', format_field: str = 'format', response_timeout: float = 30.0, max_messages: int = 100, on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None, on_send: Callable[[Audio], str | bytes] | None = None, is_complete: Callable[[list[Any]], bool] | None = None, aggregate: Callable[[list[Any]], Any] | None = None, open_timeout: float = 10.0, close_timeout: float = 5.0, extra_ws_kwargs: dict[str, Any] | None = None)
Agent adapter that communicates over WebSocket.
Supports two usage modes:
High-level — call :meth:run with an :class:~russo.Audio object.
The adapter connects, runs the on_connect hook (if set), sends the
audio, collects responses until a completion condition is met, then
parses tool calls.
Session-based — use :meth:session as an async context manager to
obtain a :class:WebSocketSession and drive the protocol yourself::
async with agent.session() as sess:
# Wait for server-ready before sending anything
await sess.wait_for(lambda m: m.get("ready"))
await sess.send_json({"audio": base64_data, "format": "wav"})
async for msg in sess:
if is_done(msg):
break
Send modes
- json (default):
{"audio": "<base64>", "format": "wav"} - bytes: raw audio bytes on the wire (
send_bytes=True)
Hooks (apply to both :meth:run and :meth:session)
on_connect: called with the :class:WebSocketSessionright after the connection opens, before anything else — use it to wait for a server-ready messageon_send: transform an :class:~russo.Audiointo the wire format your server expects (bypasses send_bytes / audio_field)is_complete: returnTruewhen :meth:runshould stop collecting response messagesaggregate: fold multiple response messages into a single object before parsing
Examples::
# Simple JSON protocol
agent = WebSocketAgent(url="ws://localhost:8000/ws/agent")
# Raw bytes (e.g. streaming PCM)
agent = WebSocketAgent(url="ws://localhost:8000/ws/stream", send_bytes=True)
# Wait for setupComplete, then send a custom format
async def wait_ready(session: WebSocketSession) -> None:
await session.wait_for(lambda m: isinstance(m, dict) and m.get("setupComplete"))
agent = WebSocketAgent(
url="ws://localhost:8000/ws/agent",
on_connect=wait_ready,
on_send=lambda audio: json.dumps({"pcm": base64.b64encode(audio.data).decode()}),
is_complete=lambda msgs: any(isinstance(m, dict) and m.get("done") for m in msgs),
)
Source code in src/russo/adapters/websocket.py
session
async
¶
session() -> AsyncIterator[WebSocketSession]
Open a WebSocket connection and yield a :class:WebSocketSession.
The connection is kept alive for the duration of the async with
block and closed cleanly on exit. If an on_connect hook is
configured it is invoked before the session is yielded, so the caller
always receives a fully-initialised session.
Example::
async with agent.session() as sess:
await sess.send_after(
lambda m: isinstance(m, dict) and m.get("ready"),
json_data={"audio": base64_audio},
)
result = await sess.receive(timeout=30.0)
Source code in src/russo/adapters/websocket.py
run
async
¶
run(audio: Audio) -> AgentResponse
Send audio over WebSocket and collect the response.