Skip to main content
Fireworks /

Streaming Speech Transcription

accounts/fireworks/models/streaming-speech

ServerlessAudio

Fireworks Streaming Speech Transcription allows doing real-time transcription over WebSockets

Serverless API

Streaming Speech Transcription is available via Fireworks' Streaming Speech-to-Text APIs, where you are billed based on the duration of the transcribed audio

Try it

API Examples

Generate a model response using the streaming-transcription endpoint of streaming-speech. API reference


import io
import os
from typing import Iterator, Tuple

import torch
import torchaudio

SAMPLE_RATE = 16_000

# For demonstration, we use the file path from your config (with a fallback).
FILE_PATH = "/home/3.5m.flac"

def _audio_tensor_to_bytes(value: torch.Tensor) -> bytes:
    """
    Convert a waveform Tensor to PCM bytes for streaming.
    """
    return (value * 32768.0).to(torch.int16).numpy().tobytes()

def _audio_path_to_tensor(path: str) -> torch.Tensor:
    """
    Load and optionally resample an audio file into a Torch tensor.
    """
    with open(path, "rb") as file:
        target_sr = SAMPLE_RATE
        waveform, original_sr = torchaudio.load(file)

        if original_sr != target_sr:
            resampler = torchaudio.transforms.Resample(
                orig_freq=original_sr, new_freq=target_sr
            )
            waveform = resampler(waveform)

        # Convert to mono if multiple channels
        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)

        return waveform


# Example chunk size in seconds:
chunk_seconds = 0.2
audio_tensor = _audio_path_to_tensor(FILE_PATH).squeeze()
chunk_size = int(chunk_seconds * SAMPLE_RATE)

audio_chunks = []
for i in range(0, len(audio_tensor), chunk_size):
    chunk = audio_tensor[i : i + chunk_size].unsqueeze(0)
    audio_chunk = _audio_tensor_to_bytes(chunk)
    audio_chunks.append((audio_chunk, chunk_seconds))

print(f"Loaded {len(audio_chunks)} chunks")


# WebSocket client for streaming audio transcription
import json
import threading
import time
import websocket
import urllib.parse


# Build the streaming endpoint (model path + any query parameters).
# We'll pass at least the language. You might add model, etc., as query params if needed.
ENDPOINT_URL_BASE = "wss://audio-streaming.us-virginia-1.direct.fireworks.ai"
ENDPOINT_PATH = "/v1/audio/transcriptions/streaming"
url_params = urllib.parse.urlencode({"language": "en"})


ENDPOINT_URL = f"{ENDPOINT_URL_BASE}{ENDPOINT_PATH}?{url_params}"
print(f"Connecting to: {ENDPOINT_URL}")


def run_websocket_client(audio_stream: Iterator[Tuple[bytes, float]]):
    """
    Send audio chunks over WebSocket for streaming transcription.
    """
    lock = threading.Lock()
    segments = {}

    def on_open(ws):
        def stream_audio(ws):
            # Stream each chunk, then sleep for chunk duration
            for audio_chunk, duration in audio_stream:
                ws.send(audio_chunk, opcode=websocket.ABNF.OPCODE_BINARY)
                time.sleep(duration)

            # Give the server some time to finalize any last transcription segments
            time.sleep(10)
            ws.close()

        threading.Thread(target=stream_audio, args=(ws,)).start()

    def on_error(ws, error):
        print(f"Error: {error}")

    def on_message(ws, message):
        response = json.loads(message)
        if "error" in response:
            print(response["error"])
        else:
            with lock:
                for segment in response.get("segments", []):
                    segments[segment["id"]] = segment["text"]
                print("\\n".join(f" - {k}: {v}" for k, v in segments.items()))

    ws = websocket.WebSocketApp(
        ENDPOINT_URL,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
    )
    ws.run_forever()


# Start streaming audio chunks for transcription
run_websocket_client(audio_chunks)