Skip to content

Adapters

Built-in agent adapters for different invocation styles.

adapters

Built-in agent adapters for different invocation styles.

GeminiAgent

GeminiAgent(*, client: Any | None = None, model: str = 'gemini-2.0-flash', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None)

Agent adapter that wraps a google.genai.Client object directly.

Sends audio to Gemini via client.aio.models.generate_content() and auto-parses function-call responses with :class:GeminiResponseParser.

Usage::

from google import genai

client = genai.Client(api_key="...")
agent = GeminiAgent(
    client=client,
    model="gemini-2.0-flash",
    tools=[book_flight_declaration],
)
response = await agent.run(audio)

For Vertex AI::

client = genai.Client(vertexai=True, project="my-project", location="us-central1")
agent = GeminiAgent(client=client, model="gemini-2.0-flash", tools=[...])
Source code in src/russo/adapters/gemini.py
def __init__(
    self,
    *,
    client: Any | None = None,
    model: str = "gemini-2.0-flash",
    tools: list[Any] | None = None,
    system_instruction: str | None = None,
    config: Any | None = None,
) -> None:
    self.client = client if client is not None else _make_client()
    self.model = model
    self.tools = tools
    self.system_instruction = system_instruction
    self.config = config
    self._parser = GeminiResponseParser()

run async

run(audio: Audio) -> AgentResponse

Send audio to Gemini and parse the tool-call response.

Source code in src/russo/adapters/gemini.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio to Gemini and parse the tool-call response."""
    from google.genai import types

    data, mime_type = AudioManager.prepare_for_generate_content(audio)
    contents = [types.Part.from_bytes(data=data, mime_type=mime_type)]
    config = self._build_config(types)

    logger.debug("Sending %d bytes of %s audio to %s", len(data), mime_type, self.model)

    response = await self.client.aio.models.generate_content(
        model=self.model,
        contents=contents,
        config=config,
    )
    return self._parser.parse(response)

GeminiLiveAgent

GeminiLiveAgent(*, client: Any | None = None, session: Any | None = None, model: str = 'gemini-live-2.5-flash-native-audio', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None, response_timeout: float = 30.0)

Agent adapter for Gemini's Live API (streaming/real-time).

Connects via client.aio.live.connect(), sends audio via send_realtime_input, and collects function-call responses.

Accepts either a google.genai.Client (new session per run) or a pre-existing Live session.

Source code in src/russo/adapters/gemini.py
def __init__(
    self,
    *,
    client: Any | None = None,
    session: Any | None = None,
    model: str = "gemini-live-2.5-flash-native-audio",
    tools: list[Any] | None = None,
    system_instruction: str | None = None,
    config: Any | None = None,
    response_timeout: float = 30.0,
) -> None:
    if session is None and client is None:
        client = _make_client()
    self.client = client
    self.session = session
    self.model = model
    self.tools = tools
    self.system_instruction = system_instruction
    self.config = config
    self.response_timeout = response_timeout

run async

run(audio: Audio) -> AgentResponse

Send audio to a Live session and collect function calls.

Source code in src/russo/adapters/gemini.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio to a Live session and collect function calls."""
    if self.session is not None:
        return await self._run_on(self.session, audio)

    from google.genai import types

    assert self.client is not None
    config = self._build_config(types)
    async with self.client.aio.live.connect(model=self.model, config=config) as session:
        return await self._run_on(session, audio)

HttpAgent

HttpAgent(*, url: str, parser: ResponseParser | None = None, method: str = 'POST', headers: dict[str, str] | None = None, audio_field: str = 'audio', format_field: str = 'format', timeout: float = 60.0)

Agent adapter that sends audio to an HTTP endpoint.

Sends audio as base64-encoded JSON and parses the response using an optional ResponseParser.

Usage

agent = HttpAgent( url="http://localhost:8000/voice-agent", parser=russo.parsers.GeminiResponseParser(), ) response = await agent.run(audio)

Source code in src/russo/adapters/http.py
def __init__(
    self,
    *,
    url: str,
    parser: ResponseParser | None = None,
    method: str = "POST",
    headers: dict[str, str] | None = None,
    audio_field: str = "audio",
    format_field: str = "format",
    timeout: float = 60.0,
) -> None:
    self.url = url
    self.parser = parser
    self.method = method
    self.headers = headers or {}
    self.audio_field = audio_field
    self.format_field = format_field
    self.timeout = timeout

run async

run(audio: Audio) -> AgentResponse

Send audio to the HTTP endpoint and parse the response.

Source code in src/russo/adapters/http.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio to the HTTP endpoint and parse the response."""
    payload = {
        self.audio_field: base64.b64encode(audio.data).decode("ascii"),
        self.format_field: audio.format,
    }

    raw_response = await self._send(payload)

    if self.parser:
        return self.parser.parse(raw_response)

    return self._default_parse(raw_response)

OpenAIAgent

OpenAIAgent(*, client: Any, model: str = 'gpt-4o-audio-preview', tools: list[Any] | None = None, system_prompt: str | None = None, extra_create_kwargs: dict[str, Any] | None = None)

Agent adapter that wraps an AsyncOpenAI client for Chat Completions.

Sends audio via the Chat Completions API and auto-parses tool-call responses with :class:OpenAIResponseParser.

Usage::

from openai import AsyncOpenAI

client = AsyncOpenAI()
agent = OpenAIAgent(
    client=client,
    model="gpt-4o-audio-preview",
    tools=[{
        "type": "function",
        "function": {
            "name": "book_flight",
            "parameters": {"type": "object", "properties": {...}},
        },
    }],
)
response = await agent.run(audio)
PARAMETER DESCRIPTION
client

An openai.AsyncOpenAI instance.

TYPE: Any

model

Model name supporting audio input.

TYPE: str DEFAULT: 'gpt-4o-audio-preview'

tools

OpenAI tool definitions (function-calling format).

TYPE: list[Any] | None DEFAULT: None

system_prompt

Optional system message prepended to the conversation.

TYPE: str | None DEFAULT: None

extra_create_kwargs

Additional kwargs forwarded to client.chat.completions.create().

TYPE: dict[str, Any] | None DEFAULT: None

Source code in src/russo/adapters/openai.py
def __init__(
    self,
    *,
    client: Any,
    model: str = "gpt-4o-audio-preview",
    tools: list[Any] | None = None,
    system_prompt: str | None = None,
    extra_create_kwargs: dict[str, Any] | None = None,
) -> None:
    """
    Args:
        client: An ``openai.AsyncOpenAI`` instance.
        model: Model name supporting audio input.
        tools: OpenAI tool definitions (function-calling format).
        system_prompt: Optional system message prepended to the conversation.
        extra_create_kwargs: Additional kwargs forwarded to
            ``client.chat.completions.create()``.
    """
    self.client = client
    self.model = model
    self.tools = tools
    self.system_prompt = system_prompt
    self.extra_create_kwargs = extra_create_kwargs or {}
    self._parser = OpenAIResponseParser()

run async

run(audio: Audio) -> AgentResponse

Send audio via Chat Completions and parse the tool-call response.

Source code in src/russo/adapters/openai.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio via Chat Completions and parse the tool-call response."""
    audio_b64 = base64.b64encode(audio.data).decode("ascii")

    messages: list[dict[str, Any]] = []
    if self.system_prompt:
        messages.append({"role": "system", "content": self.system_prompt})

    messages.append(
        {
            "role": "user",
            "content": [
                {
                    "type": "input_audio",
                    "input_audio": {"data": audio_b64, "format": audio.format},
                }
            ],
        }
    )

    kwargs: dict[str, Any] = {
        "model": self.model,
        "messages": messages,
        **self.extra_create_kwargs,
    }
    if self.tools:
        kwargs["tools"] = self.tools

    logger.debug(
        "Sending %d bytes of %s audio to %s",
        len(audio.data),
        audio.format,
        self.model,
    )

    response = await self.client.chat.completions.create(**kwargs)
    return self._parser.parse(response)

OpenAIRealtimeAgent

OpenAIRealtimeAgent(*, client: Any | None = None, connection: Any | None = None, model: str = 'gpt-4o-realtime-preview', tools: list[Any] | None = None, response_timeout: float = 30.0)

Agent adapter for OpenAI's Realtime API.

Connects via the SDK's client.beta.realtime.connect() interface, sends audio, and collects response.function_call_arguments.done events.

Accepts either:

  • An AsyncOpenAI client (creates a new connection per run())
  • A pre-existing realtime connection (reuses it, no session config sent)

Usage with client::

from openai import AsyncOpenAI

client = AsyncOpenAI()
agent = OpenAIRealtimeAgent(
    client=client,
    model="gpt-4o-realtime-preview",
    tools=[{
        "type": "function",
        "name": "book_flight",
        "description": "Book a flight",
        "parameters": {"type": "object", "properties": {...}},
    }],
)
response = await agent.run(audio)

Usage with pre-existing connection::

async with client.beta.realtime.connect(model="gpt-4o-realtime-preview") as conn:
    agent = OpenAIRealtimeAgent(connection=conn)
    response = await agent.run(audio)
Note

The Realtime API expects pcm16 audio at 24 kHz mono by default. If you pass WAV audio, the adapter automatically strips the WAV header to extract raw PCM frames.

PARAMETER DESCRIPTION
client

An openai.AsyncOpenAI instance. Mutually preferred with connection.

TYPE: Any | None DEFAULT: None

connection

A pre-existing realtime connection (from client.beta.realtime.connect()).

TYPE: Any | None DEFAULT: None

model

Realtime model name.

TYPE: str DEFAULT: 'gpt-4o-realtime-preview'

tools

Tool definitions sent during session configuration.

TYPE: list[Any] | None DEFAULT: None

response_timeout

Max seconds to wait for a complete response.

TYPE: float DEFAULT: 30.0

Source code in src/russo/adapters/openai.py
def __init__(
    self,
    *,
    client: Any | None = None,
    connection: Any | None = None,
    model: str = "gpt-4o-realtime-preview",
    tools: list[Any] | None = None,
    response_timeout: float = 30.0,
) -> None:
    """
    Args:
        client: An ``openai.AsyncOpenAI`` instance. Mutually preferred with *connection*.
        connection: A pre-existing realtime connection (from ``client.beta.realtime.connect()``).
        model: Realtime model name.
        tools: Tool definitions sent during session configuration.
        response_timeout: Max seconds to wait for a complete response.
    """
    if client is None and connection is None:
        msg = "Provide either 'client' (AsyncOpenAI) or 'connection' (realtime connection)"
        raise ValueError(msg)
    self.client = client
    self.connection = connection
    self.model = model
    self.tools = tools
    self.response_timeout = response_timeout

run async

run(audio: Audio) -> AgentResponse

Send audio via the Realtime API and collect function calls.

Source code in src/russo/adapters/openai.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio via the Realtime API and collect function calls."""
    if self.connection is not None:
        return await self._run_on(self.connection, audio, configure=False)

    async with self.client.beta.realtime.connect(model=self.model) as conn:
        return await self._run_on(conn, audio, configure=True)

WebSocketAgent

WebSocketAgent(*, url: str, parser: ResponseParser | None = None, headers: dict[str, str] | None = None, send_bytes: bool = False, audio_field: str = 'audio', format_field: str = 'format', response_timeout: float = 30.0, max_messages: int = 100, on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None, on_send: Callable[[Audio], str | bytes] | None = None, is_complete: Callable[[list[Any]], bool] | None = None, aggregate: Callable[[list[Any]], Any] | None = None, open_timeout: float = 10.0, close_timeout: float = 5.0, extra_ws_kwargs: dict[str, Any] | None = None)

Agent adapter that communicates over WebSocket.

Supports two usage modes:

High-level — call :meth:run with an :class:~russo.Audio object. The adapter connects, runs the on_connect hook (if set), sends the audio, collects responses until a completion condition is met, then parses tool calls.

Session-based — use :meth:session as an async context manager to obtain a :class:WebSocketSession and drive the protocol yourself::

async with agent.session() as sess:
    # Wait for server-ready before sending anything
    await sess.wait_for(lambda m: m.get("ready"))

    await sess.send_json({"audio": base64_data, "format": "wav"})

    async for msg in sess:
        if is_done(msg):
            break

Send modes

  • json (default): {"audio": "<base64>", "format": "wav"}
  • bytes: raw audio bytes on the wire (send_bytes=True)

Hooks (apply to both :meth:run and :meth:session)

  • on_connect: called with the :class:WebSocketSession right after the connection opens, before anything else — use it to wait for a server-ready message
  • on_send: transform an :class:~russo.Audio into the wire format your server expects (bypasses send_bytes / audio_field)
  • is_complete: return True when :meth:run should stop collecting response messages
  • aggregate: fold multiple response messages into a single object before parsing

Examples::

# Simple JSON protocol
agent = WebSocketAgent(url="ws://localhost:8000/ws/agent")

# Raw bytes (e.g. streaming PCM)
agent = WebSocketAgent(url="ws://localhost:8000/ws/stream", send_bytes=True)

# Wait for setupComplete, then send a custom format
async def wait_ready(session: WebSocketSession) -> None:
    await session.wait_for(lambda m: isinstance(m, dict) and m.get("setupComplete"))

agent = WebSocketAgent(
    url="ws://localhost:8000/ws/agent",
    on_connect=wait_ready,
    on_send=lambda audio: json.dumps({"pcm": base64.b64encode(audio.data).decode()}),
    is_complete=lambda msgs: any(isinstance(m, dict) and m.get("done") for m in msgs),
)
Source code in src/russo/adapters/websocket.py
def __init__(
    self,
    *,
    url: str,
    parser: ResponseParser | None = None,
    headers: dict[str, str] | None = None,
    # Send options
    send_bytes: bool = False,
    audio_field: str = "audio",
    format_field: str = "format",
    # Response collection
    response_timeout: float = 30.0,
    max_messages: int = 100,
    # Hooks
    on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None,
    on_send: Callable[[Audio], str | bytes] | None = None,
    is_complete: Callable[[list[Any]], bool] | None = None,
    aggregate: Callable[[list[Any]], Any] | None = None,
    # Connection
    open_timeout: float = 10.0,
    close_timeout: float = 5.0,
    extra_ws_kwargs: dict[str, Any] | None = None,
) -> None:
    if not _HAS_WEBSOCKETS:
        msg = "WebSocketAgent requires the 'websockets' package. Install with: pip install russo[ws]"
        raise ImportError(msg)

    self.url = url
    self.parser = parser
    self.headers = headers or {}
    self.send_bytes = send_bytes
    self.audio_field = audio_field
    self.format_field = format_field
    self.response_timeout = response_timeout
    self.max_messages = max_messages
    self.on_connect = on_connect
    self.on_send = on_send
    self.is_complete = is_complete
    self.aggregate = aggregate
    self.open_timeout = open_timeout
    self.close_timeout = close_timeout
    self.extra_ws_kwargs = extra_ws_kwargs or {}

session async

session() -> AsyncIterator[WebSocketSession]

Open a WebSocket connection and yield a :class:WebSocketSession.

The connection is kept alive for the duration of the async with block and closed cleanly on exit. If an on_connect hook is configured it is invoked before the session is yielded, so the caller always receives a fully-initialised session.

Example::

async with agent.session() as sess:
    await sess.send_after(
        lambda m: isinstance(m, dict) and m.get("ready"),
        json_data={"audio": base64_audio},
    )
    result = await sess.receive(timeout=30.0)
Source code in src/russo/adapters/websocket.py
@contextlib.asynccontextmanager
async def session(self) -> AsyncIterator[WebSocketSession]:
    """Open a WebSocket connection and yield a :class:`WebSocketSession`.

    The connection is kept alive for the duration of the ``async with``
    block and closed cleanly on exit.  If an ``on_connect`` hook is
    configured it is invoked before the session is yielded, so the caller
    always receives a fully-initialised session.

    Example::

        async with agent.session() as sess:
            await sess.send_after(
                lambda m: isinstance(m, dict) and m.get("ready"),
                json_data={"audio": base64_audio},
            )
            result = await sess.receive(timeout=30.0)
    """
    import websockets

    async with websockets.connect(
        self.url,
        additional_headers=self.headers or None,
        open_timeout=self.open_timeout,
        close_timeout=self.close_timeout,
        **self.extra_ws_kwargs,
    ) as ws:
        sess = WebSocketSession(ws, self._parse_incoming)
        if self.on_connect:
            await self.on_connect(sess)
        yield sess

run async

run(audio: Audio) -> AgentResponse

Send audio over WebSocket and collect the response.

Source code in src/russo/adapters/websocket.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio over WebSocket and collect the response."""
    async with self.session() as sess:
        message = self._prepare_message(audio)
        if isinstance(message, bytes):
            await sess.send_bytes(message)
        else:
            await sess.send_text(message)
        logger.debug(
            "Sent %s message (%d bytes)",
            "binary" if isinstance(message, bytes) else "text",
            len(message),
        )
        messages = await self._collect_from_session(sess)
        logger.debug("Collected %d response messages", len(messages))

    raw = self._aggregate(messages)
    if self.parser:
        return self.parser.parse(raw)
    return self._default_parse(raw)

WebSocketSession

WebSocketSession(ws: Any, parse_incoming: Callable[[str | bytes], Any])

Active WebSocket session with full-duplex communication.

Obtained via :meth:WebSocketAgent.session. Wraps a live websockets connection and adds structured send/receive primitives plus event-triggered sending.

Sending

  • :meth:send_text — send a raw text frame
  • :meth:send_bytes — send a raw binary frame
  • :meth:send_json — JSON-encode an object and send as a text frame

Receiving

  • :meth:receive — pull the next message (drains internal buffer first)
  • :meth:wait_for — block until a matching message arrives; non-matching messages are pushed into an internal buffer so they are not lost
  • :meth:send_after — convenience: wait for a trigger event then immediately send, in a single call

Iteration

The session is async-iterable::

async for msg in session:
    ...  # stops when the connection closes

Internal buffer

:meth:wait_for may read ahead to find a matching message. Any messages that arrive before the match are kept in an internal FIFO buffer. Subsequent calls to :meth:receive or async iteration drain that buffer before touching the wire again.

Source code in src/russo/adapters/websocket.py
def __init__(
    self,
    ws: Any,
    parse_incoming: Callable[[str | bytes], Any],
) -> None:
    self._ws = ws
    self._parse = parse_incoming
    self._buffer: collections.deque[Any] = collections.deque()

send_text async

send_text(text: str) -> None

Send a text frame.

Source code in src/russo/adapters/websocket.py
async def send_text(self, text: str) -> None:
    """Send a text frame."""
    await self._ws.send(text)

send_bytes async

send_bytes(data: bytes) -> None

Send a binary frame.

Source code in src/russo/adapters/websocket.py
async def send_bytes(self, data: bytes) -> None:
    """Send a binary frame."""
    await self._ws.send(data)

send_json async

send_json(obj: Any) -> None

JSON-encode obj and send as a text frame.

Source code in src/russo/adapters/websocket.py
async def send_json(self, obj: Any) -> None:
    """JSON-encode *obj* and send as a text frame."""
    await self._ws.send(json.dumps(obj))

receive async

receive(timeout: float | None = None) -> Any

Return the next incoming message.

Drains the internal buffer before reading from the wire.

PARAMETER DESCRIPTION
timeout

Seconds to wait. None (default) blocks indefinitely.

TYPE: float | None DEFAULT: None

RAISES DESCRIPTION
TimeoutError

If timeout elapses with no message available.

Source code in src/russo/adapters/websocket.py
async def receive(self, timeout: float | None = None) -> Any:
    """Return the next incoming message.

    Drains the internal buffer before reading from the wire.

    Args:
        timeout: Seconds to wait. ``None`` (default) blocks indefinitely.

    Raises:
        TimeoutError: If *timeout* elapses with no message available.
    """
    if timeout is not None:
        async with asyncio.timeout(timeout):
            return await self._next_message()
    return await self._next_message()

wait_for async

wait_for(predicate: Callable[[Any], bool], timeout: float = 30.0) -> Any

Wait until a message satisfying predicate arrives.

Messages that do not match are pushed into the internal buffer so they are not lost; subsequent :meth:receive calls or async iteration will return them in order.

PARAMETER DESCRIPTION
predicate

Callable that receives a parsed message and returns True to stop waiting, False to keep reading.

TYPE: Callable[[Any], bool]

timeout

Maximum seconds to wait (default 30).

TYPE: float DEFAULT: 30.0

RETURNS DESCRIPTION
Any

The first message for which predicate returned True.

RAISES DESCRIPTION
TimeoutError

If no matching message arrives within timeout.

Example::

# Block until the server signals readiness
await session.wait_for(
    lambda m: isinstance(m, dict) and m.get("ready")
)
Source code in src/russo/adapters/websocket.py
async def wait_for(
    self,
    predicate: Callable[[Any], bool],
    timeout: float = 30.0,
) -> Any:
    """Wait until a message satisfying *predicate* arrives.

    Messages that do not match are pushed into the internal buffer so they
    are not lost; subsequent :meth:`receive` calls or async iteration will
    return them in order.

    Args:
        predicate: Callable that receives a parsed message and returns
            ``True`` to stop waiting, ``False`` to keep reading.
        timeout: Maximum seconds to wait (default 30).

    Returns:
        The first message for which *predicate* returned ``True``.

    Raises:
        TimeoutError: If no matching message arrives within *timeout*.

    Example::

        # Block until the server signals readiness
        await session.wait_for(
            lambda m: isinstance(m, dict) and m.get("ready")
        )
    """
    async with asyncio.timeout(timeout):
        while True:
            msg = await self._next_message()
            if predicate(msg):
                return msg
            self._buffer.append(msg)

send_after async

send_after(predicate: Callable[[Any], bool], *, text: str | None = None, data: bytes | None = None, json_data: Any = None, timeout: float = 30.0) -> Any

Wait for a matching event, then send — in a single call.

Exactly one of text, data, or json_data should be provided. Non-matching messages received while waiting are buffered (see :meth:wait_for).

PARAMETER DESCRIPTION
predicate

Same semantics as :meth:wait_for.

TYPE: Callable[[Any], bool]

text

Text frame to send after the trigger.

TYPE: str | None DEFAULT: None

data

Binary frame to send after the trigger.

TYPE: bytes | None DEFAULT: None

json_data

Object to JSON-encode and send after the trigger.

TYPE: Any DEFAULT: None

timeout

Forwarded to :meth:wait_for.

TYPE: float DEFAULT: 30.0

RETURNS DESCRIPTION
Any

The triggering message (the one matched by predicate).

Example::

# Send audio only after the server signals setup is complete
await session.send_after(
    lambda m: isinstance(m, dict) and m.get("setupComplete"),
    json_data={"audio": base64_audio, "format": "wav"},
)
Source code in src/russo/adapters/websocket.py
async def send_after(
    self,
    predicate: Callable[[Any], bool],
    *,
    text: str | None = None,
    data: bytes | None = None,
    json_data: Any = None,
    timeout: float = 30.0,
) -> Any:
    """Wait for a matching event, then send — in a single call.

    Exactly one of *text*, *data*, or *json_data* should be provided.
    Non-matching messages received while waiting are buffered (see
    :meth:`wait_for`).

    Args:
        predicate: Same semantics as :meth:`wait_for`.
        text: Text frame to send after the trigger.
        data: Binary frame to send after the trigger.
        json_data: Object to JSON-encode and send after the trigger.
        timeout: Forwarded to :meth:`wait_for`.

    Returns:
        The triggering message (the one matched by *predicate*).

    Example::

        # Send audio only after the server signals setup is complete
        await session.send_after(
            lambda m: isinstance(m, dict) and m.get("setupComplete"),
            json_data={"audio": base64_audio, "format": "wav"},
        )
    """
    trigger = await self.wait_for(predicate, timeout=timeout)
    if json_data is not None:
        await self.send_json(json_data)
    elif data is not None:
        await self.send_bytes(data)
    elif text is not None:
        await self.send_text(text)
    return trigger

close async

close() -> None

Close the underlying WebSocket connection.

Source code in src/russo/adapters/websocket.py
async def close(self) -> None:
    """Close the underlying WebSocket connection."""
    await self._ws.close()

callable

Callable agent adapter — wraps async functions as Agents.

Re-exports from _helpers for organizational clarity. The actual implementation lives in russo._helpers to keep the public API flat (russo.agent decorator).

agent

agent(fn: Callable[[Audio], Coroutine[Any, Any, AgentResponse]]) -> _CallableAgent

Decorator to turn an async function into an Agent.

Usage

@russo.agent async def my_agent(audio: russo.Audio) -> russo.AgentResponse: result = await call_my_api(audio.data) return russo.AgentResponse(tool_calls=[...])

Source code in src/russo/_helpers.py
def agent(fn: Callable[[Audio], Coroutine[Any, Any, AgentResponse]]) -> _CallableAgent:
    """Decorator to turn an async function into an Agent.

    Usage:
        @russo.agent
        async def my_agent(audio: russo.Audio) -> russo.AgentResponse:
            result = await call_my_api(audio.data)
            return russo.AgentResponse(tool_calls=[...])
    """
    return _CallableAgent(fn)

gemini

Gemini SDK agent adapters — wraps a google-genai Client as an Agent.

Provides two adapters:

  • GeminiAgent: standard generate_content (request/response)
  • GeminiLiveAgent: Live API over WebSocket (streaming/real-time)

Send audio directly to a Gemini model via the SDK, no HTTP endpoint needed. Requires the google-genai package (already a core dependency).

GeminiAgent

GeminiAgent(*, client: Any | None = None, model: str = 'gemini-2.0-flash', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None)

Agent adapter that wraps a google.genai.Client object directly.

Sends audio to Gemini via client.aio.models.generate_content() and auto-parses function-call responses with :class:GeminiResponseParser.

Usage::

from google import genai

client = genai.Client(api_key="...")
agent = GeminiAgent(
    client=client,
    model="gemini-2.0-flash",
    tools=[book_flight_declaration],
)
response = await agent.run(audio)

For Vertex AI::

client = genai.Client(vertexai=True, project="my-project", location="us-central1")
agent = GeminiAgent(client=client, model="gemini-2.0-flash", tools=[...])
Source code in src/russo/adapters/gemini.py
def __init__(
    self,
    *,
    client: Any | None = None,
    model: str = "gemini-2.0-flash",
    tools: list[Any] | None = None,
    system_instruction: str | None = None,
    config: Any | None = None,
) -> None:
    self.client = client if client is not None else _make_client()
    self.model = model
    self.tools = tools
    self.system_instruction = system_instruction
    self.config = config
    self._parser = GeminiResponseParser()
run async
run(audio: Audio) -> AgentResponse

Send audio to Gemini and parse the tool-call response.

Source code in src/russo/adapters/gemini.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio to Gemini and parse the tool-call response."""
    from google.genai import types

    data, mime_type = AudioManager.prepare_for_generate_content(audio)
    contents = [types.Part.from_bytes(data=data, mime_type=mime_type)]
    config = self._build_config(types)

    logger.debug("Sending %d bytes of %s audio to %s", len(data), mime_type, self.model)

    response = await self.client.aio.models.generate_content(
        model=self.model,
        contents=contents,
        config=config,
    )
    return self._parser.parse(response)

GeminiLiveAgent

GeminiLiveAgent(*, client: Any | None = None, session: Any | None = None, model: str = 'gemini-live-2.5-flash-native-audio', tools: list[Any] | None = None, system_instruction: str | None = None, config: Any | None = None, response_timeout: float = 30.0)

Agent adapter for Gemini's Live API (streaming/real-time).

Connects via client.aio.live.connect(), sends audio via send_realtime_input, and collects function-call responses.

Accepts either a google.genai.Client (new session per run) or a pre-existing Live session.

Source code in src/russo/adapters/gemini.py
def __init__(
    self,
    *,
    client: Any | None = None,
    session: Any | None = None,
    model: str = "gemini-live-2.5-flash-native-audio",
    tools: list[Any] | None = None,
    system_instruction: str | None = None,
    config: Any | None = None,
    response_timeout: float = 30.0,
) -> None:
    if session is None and client is None:
        client = _make_client()
    self.client = client
    self.session = session
    self.model = model
    self.tools = tools
    self.system_instruction = system_instruction
    self.config = config
    self.response_timeout = response_timeout
run async
run(audio: Audio) -> AgentResponse

Send audio to a Live session and collect function calls.

Source code in src/russo/adapters/gemini.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio to a Live session and collect function calls."""
    if self.session is not None:
        return await self._run_on(self.session, audio)

    from google.genai import types

    assert self.client is not None
    config = self._build_config(types)
    async with self.client.aio.live.connect(model=self.model, config=config) as session:
        return await self._run_on(session, audio)

http

HTTP agent adapter — sends audio to an API endpoint.

HttpAgent

HttpAgent(*, url: str, parser: ResponseParser | None = None, method: str = 'POST', headers: dict[str, str] | None = None, audio_field: str = 'audio', format_field: str = 'format', timeout: float = 60.0)

Agent adapter that sends audio to an HTTP endpoint.

Sends audio as base64-encoded JSON and parses the response using an optional ResponseParser.

Usage

agent = HttpAgent( url="http://localhost:8000/voice-agent", parser=russo.parsers.GeminiResponseParser(), ) response = await agent.run(audio)

Source code in src/russo/adapters/http.py
def __init__(
    self,
    *,
    url: str,
    parser: ResponseParser | None = None,
    method: str = "POST",
    headers: dict[str, str] | None = None,
    audio_field: str = "audio",
    format_field: str = "format",
    timeout: float = 60.0,
) -> None:
    self.url = url
    self.parser = parser
    self.method = method
    self.headers = headers or {}
    self.audio_field = audio_field
    self.format_field = format_field
    self.timeout = timeout
run async
run(audio: Audio) -> AgentResponse

Send audio to the HTTP endpoint and parse the response.

Source code in src/russo/adapters/http.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio to the HTTP endpoint and parse the response."""
    payload = {
        self.audio_field: base64.b64encode(audio.data).decode("ascii"),
        self.format_field: audio.format,
    }

    raw_response = await self._send(payload)

    if self.parser:
        return self.parser.parse(raw_response)

    return self._default_parse(raw_response)

openai

OpenAI SDK agent adapters — wraps OpenAI clients as Agents.

Provides two adapters:

  • OpenAIAgent: standard Chat Completions with audio input (gpt-4o-audio-preview and similar models)
  • OpenAIRealtimeAgent: Realtime API over WebSocket (gpt-4o-realtime-preview)

Requires the openai package: pip install russo[openai]

OpenAIAgent

OpenAIAgent(*, client: Any, model: str = 'gpt-4o-audio-preview', tools: list[Any] | None = None, system_prompt: str | None = None, extra_create_kwargs: dict[str, Any] | None = None)

Agent adapter that wraps an AsyncOpenAI client for Chat Completions.

Sends audio via the Chat Completions API and auto-parses tool-call responses with :class:OpenAIResponseParser.

Usage::

from openai import AsyncOpenAI

client = AsyncOpenAI()
agent = OpenAIAgent(
    client=client,
    model="gpt-4o-audio-preview",
    tools=[{
        "type": "function",
        "function": {
            "name": "book_flight",
            "parameters": {"type": "object", "properties": {...}},
        },
    }],
)
response = await agent.run(audio)
PARAMETER DESCRIPTION
client

An openai.AsyncOpenAI instance.

TYPE: Any

model

Model name supporting audio input.

TYPE: str DEFAULT: 'gpt-4o-audio-preview'

tools

OpenAI tool definitions (function-calling format).

TYPE: list[Any] | None DEFAULT: None

system_prompt

Optional system message prepended to the conversation.

TYPE: str | None DEFAULT: None

extra_create_kwargs

Additional kwargs forwarded to client.chat.completions.create().

TYPE: dict[str, Any] | None DEFAULT: None

Source code in src/russo/adapters/openai.py
def __init__(
    self,
    *,
    client: Any,
    model: str = "gpt-4o-audio-preview",
    tools: list[Any] | None = None,
    system_prompt: str | None = None,
    extra_create_kwargs: dict[str, Any] | None = None,
) -> None:
    """
    Args:
        client: An ``openai.AsyncOpenAI`` instance.
        model: Model name supporting audio input.
        tools: OpenAI tool definitions (function-calling format).
        system_prompt: Optional system message prepended to the conversation.
        extra_create_kwargs: Additional kwargs forwarded to
            ``client.chat.completions.create()``.
    """
    self.client = client
    self.model = model
    self.tools = tools
    self.system_prompt = system_prompt
    self.extra_create_kwargs = extra_create_kwargs or {}
    self._parser = OpenAIResponseParser()
run async
run(audio: Audio) -> AgentResponse

Send audio via Chat Completions and parse the tool-call response.

Source code in src/russo/adapters/openai.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio via Chat Completions and parse the tool-call response."""
    audio_b64 = base64.b64encode(audio.data).decode("ascii")

    messages: list[dict[str, Any]] = []
    if self.system_prompt:
        messages.append({"role": "system", "content": self.system_prompt})

    messages.append(
        {
            "role": "user",
            "content": [
                {
                    "type": "input_audio",
                    "input_audio": {"data": audio_b64, "format": audio.format},
                }
            ],
        }
    )

    kwargs: dict[str, Any] = {
        "model": self.model,
        "messages": messages,
        **self.extra_create_kwargs,
    }
    if self.tools:
        kwargs["tools"] = self.tools

    logger.debug(
        "Sending %d bytes of %s audio to %s",
        len(audio.data),
        audio.format,
        self.model,
    )

    response = await self.client.chat.completions.create(**kwargs)
    return self._parser.parse(response)

OpenAIRealtimeAgent

OpenAIRealtimeAgent(*, client: Any | None = None, connection: Any | None = None, model: str = 'gpt-4o-realtime-preview', tools: list[Any] | None = None, response_timeout: float = 30.0)

Agent adapter for OpenAI's Realtime API.

Connects via the SDK's client.beta.realtime.connect() interface, sends audio, and collects response.function_call_arguments.done events.

Accepts either:

  • An AsyncOpenAI client (creates a new connection per run())
  • A pre-existing realtime connection (reuses it, no session config sent)

Usage with client::

from openai import AsyncOpenAI

client = AsyncOpenAI()
agent = OpenAIRealtimeAgent(
    client=client,
    model="gpt-4o-realtime-preview",
    tools=[{
        "type": "function",
        "name": "book_flight",
        "description": "Book a flight",
        "parameters": {"type": "object", "properties": {...}},
    }],
)
response = await agent.run(audio)

Usage with pre-existing connection::

async with client.beta.realtime.connect(model="gpt-4o-realtime-preview") as conn:
    agent = OpenAIRealtimeAgent(connection=conn)
    response = await agent.run(audio)
Note

The Realtime API expects pcm16 audio at 24 kHz mono by default. If you pass WAV audio, the adapter automatically strips the WAV header to extract raw PCM frames.

PARAMETER DESCRIPTION
client

An openai.AsyncOpenAI instance. Mutually preferred with connection.

TYPE: Any | None DEFAULT: None

connection

A pre-existing realtime connection (from client.beta.realtime.connect()).

TYPE: Any | None DEFAULT: None

model

Realtime model name.

TYPE: str DEFAULT: 'gpt-4o-realtime-preview'

tools

Tool definitions sent during session configuration.

TYPE: list[Any] | None DEFAULT: None

response_timeout

Max seconds to wait for a complete response.

TYPE: float DEFAULT: 30.0

Source code in src/russo/adapters/openai.py
def __init__(
    self,
    *,
    client: Any | None = None,
    connection: Any | None = None,
    model: str = "gpt-4o-realtime-preview",
    tools: list[Any] | None = None,
    response_timeout: float = 30.0,
) -> None:
    """
    Args:
        client: An ``openai.AsyncOpenAI`` instance. Mutually preferred with *connection*.
        connection: A pre-existing realtime connection (from ``client.beta.realtime.connect()``).
        model: Realtime model name.
        tools: Tool definitions sent during session configuration.
        response_timeout: Max seconds to wait for a complete response.
    """
    if client is None and connection is None:
        msg = "Provide either 'client' (AsyncOpenAI) or 'connection' (realtime connection)"
        raise ValueError(msg)
    self.client = client
    self.connection = connection
    self.model = model
    self.tools = tools
    self.response_timeout = response_timeout
run async
run(audio: Audio) -> AgentResponse

Send audio via the Realtime API and collect function calls.

Source code in src/russo/adapters/openai.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio via the Realtime API and collect function calls."""
    if self.connection is not None:
        return await self._run_on(self.connection, audio, configure=False)

    async with self.client.beta.realtime.connect(model=self.model) as conn:
        return await self._run_on(conn, audio, configure=True)

websocket

WebSocket agent adapter — sends audio to a WebSocket endpoint.

Requires the websockets package: pip install russo[ws]

WebSocketSession

WebSocketSession(ws: Any, parse_incoming: Callable[[str | bytes], Any])

Active WebSocket session with full-duplex communication.

Obtained via :meth:WebSocketAgent.session. Wraps a live websockets connection and adds structured send/receive primitives plus event-triggered sending.

Sending

  • :meth:send_text — send a raw text frame
  • :meth:send_bytes — send a raw binary frame
  • :meth:send_json — JSON-encode an object and send as a text frame

Receiving

  • :meth:receive — pull the next message (drains internal buffer first)
  • :meth:wait_for — block until a matching message arrives; non-matching messages are pushed into an internal buffer so they are not lost
  • :meth:send_after — convenience: wait for a trigger event then immediately send, in a single call

Iteration

The session is async-iterable::

async for msg in session:
    ...  # stops when the connection closes

Internal buffer

:meth:wait_for may read ahead to find a matching message. Any messages that arrive before the match are kept in an internal FIFO buffer. Subsequent calls to :meth:receive or async iteration drain that buffer before touching the wire again.

Source code in src/russo/adapters/websocket.py
def __init__(
    self,
    ws: Any,
    parse_incoming: Callable[[str | bytes], Any],
) -> None:
    self._ws = ws
    self._parse = parse_incoming
    self._buffer: collections.deque[Any] = collections.deque()
send_text async
send_text(text: str) -> None

Send a text frame.

Source code in src/russo/adapters/websocket.py
async def send_text(self, text: str) -> None:
    """Send a text frame."""
    await self._ws.send(text)
send_bytes async
send_bytes(data: bytes) -> None

Send a binary frame.

Source code in src/russo/adapters/websocket.py
async def send_bytes(self, data: bytes) -> None:
    """Send a binary frame."""
    await self._ws.send(data)
send_json async
send_json(obj: Any) -> None

JSON-encode obj and send as a text frame.

Source code in src/russo/adapters/websocket.py
async def send_json(self, obj: Any) -> None:
    """JSON-encode *obj* and send as a text frame."""
    await self._ws.send(json.dumps(obj))
receive async
receive(timeout: float | None = None) -> Any

Return the next incoming message.

Drains the internal buffer before reading from the wire.

PARAMETER DESCRIPTION
timeout

Seconds to wait. None (default) blocks indefinitely.

TYPE: float | None DEFAULT: None

RAISES DESCRIPTION
TimeoutError

If timeout elapses with no message available.

Source code in src/russo/adapters/websocket.py
async def receive(self, timeout: float | None = None) -> Any:
    """Return the next incoming message.

    Drains the internal buffer before reading from the wire.

    Args:
        timeout: Seconds to wait. ``None`` (default) blocks indefinitely.

    Raises:
        TimeoutError: If *timeout* elapses with no message available.
    """
    if timeout is not None:
        async with asyncio.timeout(timeout):
            return await self._next_message()
    return await self._next_message()
wait_for async
wait_for(predicate: Callable[[Any], bool], timeout: float = 30.0) -> Any

Wait until a message satisfying predicate arrives.

Messages that do not match are pushed into the internal buffer so they are not lost; subsequent :meth:receive calls or async iteration will return them in order.

PARAMETER DESCRIPTION
predicate

Callable that receives a parsed message and returns True to stop waiting, False to keep reading.

TYPE: Callable[[Any], bool]

timeout

Maximum seconds to wait (default 30).

TYPE: float DEFAULT: 30.0

RETURNS DESCRIPTION
Any

The first message for which predicate returned True.

RAISES DESCRIPTION
TimeoutError

If no matching message arrives within timeout.

Example::

# Block until the server signals readiness
await session.wait_for(
    lambda m: isinstance(m, dict) and m.get("ready")
)
Source code in src/russo/adapters/websocket.py
async def wait_for(
    self,
    predicate: Callable[[Any], bool],
    timeout: float = 30.0,
) -> Any:
    """Wait until a message satisfying *predicate* arrives.

    Messages that do not match are pushed into the internal buffer so they
    are not lost; subsequent :meth:`receive` calls or async iteration will
    return them in order.

    Args:
        predicate: Callable that receives a parsed message and returns
            ``True`` to stop waiting, ``False`` to keep reading.
        timeout: Maximum seconds to wait (default 30).

    Returns:
        The first message for which *predicate* returned ``True``.

    Raises:
        TimeoutError: If no matching message arrives within *timeout*.

    Example::

        # Block until the server signals readiness
        await session.wait_for(
            lambda m: isinstance(m, dict) and m.get("ready")
        )
    """
    async with asyncio.timeout(timeout):
        while True:
            msg = await self._next_message()
            if predicate(msg):
                return msg
            self._buffer.append(msg)
send_after async
send_after(predicate: Callable[[Any], bool], *, text: str | None = None, data: bytes | None = None, json_data: Any = None, timeout: float = 30.0) -> Any

Wait for a matching event, then send — in a single call.

Exactly one of text, data, or json_data should be provided. Non-matching messages received while waiting are buffered (see :meth:wait_for).

PARAMETER DESCRIPTION
predicate

Same semantics as :meth:wait_for.

TYPE: Callable[[Any], bool]

text

Text frame to send after the trigger.

TYPE: str | None DEFAULT: None

data

Binary frame to send after the trigger.

TYPE: bytes | None DEFAULT: None

json_data

Object to JSON-encode and send after the trigger.

TYPE: Any DEFAULT: None

timeout

Forwarded to :meth:wait_for.

TYPE: float DEFAULT: 30.0

RETURNS DESCRIPTION
Any

The triggering message (the one matched by predicate).

Example::

# Send audio only after the server signals setup is complete
await session.send_after(
    lambda m: isinstance(m, dict) and m.get("setupComplete"),
    json_data={"audio": base64_audio, "format": "wav"},
)
Source code in src/russo/adapters/websocket.py
async def send_after(
    self,
    predicate: Callable[[Any], bool],
    *,
    text: str | None = None,
    data: bytes | None = None,
    json_data: Any = None,
    timeout: float = 30.0,
) -> Any:
    """Wait for a matching event, then send — in a single call.

    Exactly one of *text*, *data*, or *json_data* should be provided.
    Non-matching messages received while waiting are buffered (see
    :meth:`wait_for`).

    Args:
        predicate: Same semantics as :meth:`wait_for`.
        text: Text frame to send after the trigger.
        data: Binary frame to send after the trigger.
        json_data: Object to JSON-encode and send after the trigger.
        timeout: Forwarded to :meth:`wait_for`.

    Returns:
        The triggering message (the one matched by *predicate*).

    Example::

        # Send audio only after the server signals setup is complete
        await session.send_after(
            lambda m: isinstance(m, dict) and m.get("setupComplete"),
            json_data={"audio": base64_audio, "format": "wav"},
        )
    """
    trigger = await self.wait_for(predicate, timeout=timeout)
    if json_data is not None:
        await self.send_json(json_data)
    elif data is not None:
        await self.send_bytes(data)
    elif text is not None:
        await self.send_text(text)
    return trigger
close async
close() -> None

Close the underlying WebSocket connection.

Source code in src/russo/adapters/websocket.py
async def close(self) -> None:
    """Close the underlying WebSocket connection."""
    await self._ws.close()

WebSocketAgent

WebSocketAgent(*, url: str, parser: ResponseParser | None = None, headers: dict[str, str] | None = None, send_bytes: bool = False, audio_field: str = 'audio', format_field: str = 'format', response_timeout: float = 30.0, max_messages: int = 100, on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None, on_send: Callable[[Audio], str | bytes] | None = None, is_complete: Callable[[list[Any]], bool] | None = None, aggregate: Callable[[list[Any]], Any] | None = None, open_timeout: float = 10.0, close_timeout: float = 5.0, extra_ws_kwargs: dict[str, Any] | None = None)

Agent adapter that communicates over WebSocket.

Supports two usage modes:

High-level — call :meth:run with an :class:~russo.Audio object. The adapter connects, runs the on_connect hook (if set), sends the audio, collects responses until a completion condition is met, then parses tool calls.

Session-based — use :meth:session as an async context manager to obtain a :class:WebSocketSession and drive the protocol yourself::

async with agent.session() as sess:
    # Wait for server-ready before sending anything
    await sess.wait_for(lambda m: m.get("ready"))

    await sess.send_json({"audio": base64_data, "format": "wav"})

    async for msg in sess:
        if is_done(msg):
            break

Send modes

  • json (default): {"audio": "<base64>", "format": "wav"}
  • bytes: raw audio bytes on the wire (send_bytes=True)

Hooks (apply to both :meth:run and :meth:session)

  • on_connect: called with the :class:WebSocketSession right after the connection opens, before anything else — use it to wait for a server-ready message
  • on_send: transform an :class:~russo.Audio into the wire format your server expects (bypasses send_bytes / audio_field)
  • is_complete: return True when :meth:run should stop collecting response messages
  • aggregate: fold multiple response messages into a single object before parsing

Examples::

# Simple JSON protocol
agent = WebSocketAgent(url="ws://localhost:8000/ws/agent")

# Raw bytes (e.g. streaming PCM)
agent = WebSocketAgent(url="ws://localhost:8000/ws/stream", send_bytes=True)

# Wait for setupComplete, then send a custom format
async def wait_ready(session: WebSocketSession) -> None:
    await session.wait_for(lambda m: isinstance(m, dict) and m.get("setupComplete"))

agent = WebSocketAgent(
    url="ws://localhost:8000/ws/agent",
    on_connect=wait_ready,
    on_send=lambda audio: json.dumps({"pcm": base64.b64encode(audio.data).decode()}),
    is_complete=lambda msgs: any(isinstance(m, dict) and m.get("done") for m in msgs),
)
Source code in src/russo/adapters/websocket.py
def __init__(
    self,
    *,
    url: str,
    parser: ResponseParser | None = None,
    headers: dict[str, str] | None = None,
    # Send options
    send_bytes: bool = False,
    audio_field: str = "audio",
    format_field: str = "format",
    # Response collection
    response_timeout: float = 30.0,
    max_messages: int = 100,
    # Hooks
    on_connect: Callable[[WebSocketSession], Awaitable[None]] | None = None,
    on_send: Callable[[Audio], str | bytes] | None = None,
    is_complete: Callable[[list[Any]], bool] | None = None,
    aggregate: Callable[[list[Any]], Any] | None = None,
    # Connection
    open_timeout: float = 10.0,
    close_timeout: float = 5.0,
    extra_ws_kwargs: dict[str, Any] | None = None,
) -> None:
    if not _HAS_WEBSOCKETS:
        msg = "WebSocketAgent requires the 'websockets' package. Install with: pip install russo[ws]"
        raise ImportError(msg)

    self.url = url
    self.parser = parser
    self.headers = headers or {}
    self.send_bytes = send_bytes
    self.audio_field = audio_field
    self.format_field = format_field
    self.response_timeout = response_timeout
    self.max_messages = max_messages
    self.on_connect = on_connect
    self.on_send = on_send
    self.is_complete = is_complete
    self.aggregate = aggregate
    self.open_timeout = open_timeout
    self.close_timeout = close_timeout
    self.extra_ws_kwargs = extra_ws_kwargs or {}
session async
session() -> AsyncIterator[WebSocketSession]

Open a WebSocket connection and yield a :class:WebSocketSession.

The connection is kept alive for the duration of the async with block and closed cleanly on exit. If an on_connect hook is configured it is invoked before the session is yielded, so the caller always receives a fully-initialised session.

Example::

async with agent.session() as sess:
    await sess.send_after(
        lambda m: isinstance(m, dict) and m.get("ready"),
        json_data={"audio": base64_audio},
    )
    result = await sess.receive(timeout=30.0)
Source code in src/russo/adapters/websocket.py
@contextlib.asynccontextmanager
async def session(self) -> AsyncIterator[WebSocketSession]:
    """Open a WebSocket connection and yield a :class:`WebSocketSession`.

    The connection is kept alive for the duration of the ``async with``
    block and closed cleanly on exit.  If an ``on_connect`` hook is
    configured it is invoked before the session is yielded, so the caller
    always receives a fully-initialised session.

    Example::

        async with agent.session() as sess:
            await sess.send_after(
                lambda m: isinstance(m, dict) and m.get("ready"),
                json_data={"audio": base64_audio},
            )
            result = await sess.receive(timeout=30.0)
    """
    import websockets

    async with websockets.connect(
        self.url,
        additional_headers=self.headers or None,
        open_timeout=self.open_timeout,
        close_timeout=self.close_timeout,
        **self.extra_ws_kwargs,
    ) as ws:
        sess = WebSocketSession(ws, self._parse_incoming)
        if self.on_connect:
            await self.on_connect(sess)
        yield sess
run async
run(audio: Audio) -> AgentResponse

Send audio over WebSocket and collect the response.

Source code in src/russo/adapters/websocket.py
async def run(self, audio: Audio) -> AgentResponse:
    """Send audio over WebSocket and collect the response."""
    async with self.session() as sess:
        message = self._prepare_message(audio)
        if isinstance(message, bytes):
            await sess.send_bytes(message)
        else:
            await sess.send_text(message)
        logger.debug(
            "Sent %s message (%d bytes)",
            "binary" if isinstance(message, bytes) else "text",
            len(message),
        )
        messages = await self._collect_from_session(sess)
        logger.debug("Collected %d response messages", len(messages))

    raw = self._aggregate(messages)
    if self.parser:
        return self.parser.parse(raw)
    return self._default_parse(raw)