Python SDK
The roset package is the official Python client for the Roset file processing orchestration API. It provides methods for uploading files, managing processing jobs, retrieving extraction variants, and configuring connections and webhooks.
Requires Python 3.9 or later.
Installation
pip install rosetQuick Start
import os
from roset import Client
client = Client(api_key=os.getenv("ROSET_API_KEY"))
# Upload a file -- Roset routes it to the right extraction provider
file = client.files.upload(
filename="report.pdf",
content_type="application/pdf",
size_bytes=1024,
)
# List all completed files
result = client.files.list(status="completed")Configuration
client = Client(
api_key="rsk_...", # Required -- your Roset API key
base_url="https://api.roset.dev", # Optional -- defaults to production
timeout=30, # Optional -- request timeout in seconds (default: 30)
)You can also set the API key via environment variable:
export ROSET_API_KEY=rsk_your_key_here# Reads ROSET_API_KEY from environment automatically
client = Client()Resources
Files
Files are documents tracked by Roset's metadata store. Each file has a processing status and zero or more variants.
# List files with optional filters
result = client.files.list(status="completed", limit=20)
files = result["files"]
next_cursor = result.get("next_cursor")
# Get a file by ID (includes variant list)
file = client.files.get("file-abc123")
# Upload a file -- creates a file record and a processing job
uploaded = client.files.upload(
filename="doc.pdf",
content_type="application/pdf",
size_bytes=45678,
)
# Delete a file and all its variants
client.files.delete("file-abc123")
# List extraction outputs (markdown, embeddings, etc.)
result = client.files.list_variants("file-abc123")
variants = result["variants"]
# Get a specific variant by type
markdown = client.files.get_variant("file-abc123", "markdown")
# Batch upload multiple files
batch = client.files.upload_batch([
{"filename": "report-q1.pdf", "content_type": "application/pdf", "size_bytes": 45678},
{"filename": "report-q2.pdf", "content_type": "application/pdf", "size_bytes": 56789},
])
# Reprocess a file with different settings
client.files.process("file-abc123",
provider="gemini",
variants=["markdown", "embeddings"],
)
# Batch reprocess files
client.files.process_batch(
file_ids=["file-1", "file-2"],
variants=["markdown", "embeddings"],
)Jobs
Jobs represent the processing pipeline for a file. Each job moves through a state machine: queued -> processing -> completed or failed.
# List jobs with optional status filter
result = client.jobs.list(status="failed")
jobs = result["jobs"]
# Get a single job's details (includes provider and timing)
job = client.jobs.get("job-456")
# Cancel a queued or in-progress job
client.jobs.cancel("job-456")
# Retry a failed job (resets to queued)
client.jobs.retry("job-456")Connections
Connections link your cloud storage buckets (S3, GCS, Azure Blob Storage, MinIO) to Roset. Roset uses connections to issue signed URLs and sync file metadata -- it never copies or proxies file bytes.
# Link an S3 bucket to Roset
conn = client.connections.create(
provider="s3",
name="Production",
bucket="my-bucket",
region="us-east-1",
)
# List all connections for your organization
result = client.connections.list()
# Test that Roset can access the bucket
test = client.connections.test("conn-abc")
# Sync file metadata from the bucket (metadata only, no bytes transferred)
sync = client.connections.sync("conn-abc")
# Delete a connection (your bucket is unaffected)
client.connections.delete("conn-abc")Nodes
Nodes are file and folder records discovered from synced storage connections. They mirror your bucket's directory structure.
# List nodes from a synced connection
result = client.nodes.list(connection_id="conn-abc")
# Get a node's metadata
node = client.nodes.get("node-xyz")
# Get a signed download URL (direct from your bucket, valid 1 hour)
download = client.nodes.download("node-xyz")
print(download["url"])
# Delete a node record (does not delete the file from your bucket)
client.nodes.delete("node-xyz")
# Upload a file to a connected bucket
upload = client.nodes.upload("conn-abc",
path="uploads/report.pdf",
content_type="application/pdf",
)
# List children of a folder
children = client.nodes.list_children("folder-123", type="file", limit=50)
# Search nodes by name or content
found = client.nodes.search("quarterly report", connection_id="conn-abc")Webhooks
Webhooks deliver HTTP callbacks when processing events occur (file completed, variant ready, job failed).
# Register a webhook for processing events
webhook = client.webhooks.create(
url="https://example.com/webhook",
events=["file.processing.completed", "file.processing.failed"],
)
# List all registered webhooks
result = client.webhooks.list()
# Update which events a webhook subscribes to
client.webhooks.update("wh-123", events=["file.processing.completed"])
# Send a test event to verify your endpoint
client.webhooks.test("wh-123")
# View delivery history for debugging
deliveries = client.webhooks.deliveries("wh-123")
# Delete a webhook
client.webhooks.delete("wh-123")
# Rotate the signing secret
secret = client.webhooks.rotate_secret("wh-123")
# Replay deliveries from a time range
client.webhooks.replay("wh-123",
since="2025-06-14T00:00:00Z",
until="2025-06-15T00:00:00Z",
event_types=["file.processing.completed"],
)Spaces
Spaces provide optional namespace isolation for multi-tenant applications. If you are building a B2B SaaS product, assign each of your customers a space name to scope their files.
# List spaces with file counts
result = client.spaces.list()
for s in result["spaces"]:
print(f"{s['space']}: {s['file_count']} files")API Keys
Manage API keys programmatically. All Roset API keys use the rsk_ prefix.
# Create a new API key
result = client.api_keys.create(name="CI Pipeline")
# Save result["key"] immediately -- it is only shown once
# List existing API keys (key values are redacted)
result = client.api_keys.list()
# Revoke an API key
client.api_keys.delete("key-abc")Provider Keys
Provider keys are optional BYOK credentials for the extraction and embedding services. Roset uses managed keys by default.
# Save a provider key (e.g., Reducto for document extraction)
client.provider_keys.set(provider="reducto", key="rdt_your_key")
# List configured providers (key values are redacted)
result = client.provider_keys.get()
# Remove a provider key
client.provider_keys.delete("reducto")Analytics
Query processing metrics and usage data for your organization.
# Organization-wide stats
overview = client.analytics.overview()
# Processing latency percentiles by provider
processing = client.analytics.processing(days=30)
# File type distribution across all uploads
types = client.analytics.file_types()
# Per-space health scores and file counts
spaces = client.analytics.spaces()
# Recent processing failures with error details
failures = client.analytics.failures(limit=20)
# Daily upload and processing volume
volume = client.analytics.volume(days=14)Search
Search files by content using full-text, vector similarity, or hybrid search.
# Hybrid search (default)
result = client.search.query(
query="payment terms",
mode="hybrid",
space="contracts",
limit=20,
)
for r in result["results"]:
print(f"{r['fileId']} ({r['score']}): {r.get('snippet', '')}")Q&A
Ask questions about your files using RAG (Retrieval Augmented Generation).
# Ask a question
result = client.qa.ask(
question="What are the payment terms?",
space="contracts",
topK=5,
)
print(result["answer"])
for source in result["sources"]:
print(f" - {source['filename']} ({source['score']})")Error Handling
All API errors are raised as exceptions with the HTTP status code, error message, and request ID.
from roset import Client
from roset.exceptions import NotFoundError, RateLimitError
try:
client.files.get("nonexistent")
except NotFoundError:
print("File not found")
except RateLimitError as e:
print(f"Rate limited, retry after {e.retry_after}")Available exception classes:
| Class | HTTP Status | Description |
|---|---|---|
ValidationError | 400 | Invalid request parameters |
UnauthorizedError | 401 | Missing or invalid API key |
ForbiddenError | 403 | Insufficient permissions |
NotFoundError | 404 | Resource does not exist |
ConflictError | 409 | Resource conflict |
RateLimitError | 429 | Too many requests |
QuotaExceededError | 402 | Usage quota exceeded |
TimeoutError | 408 | Request timed out |
ServerError | 500 | Internal server error |
ServiceUnavailableError | 503 | Service temporarily unavailable |
NetworkError | -- | Network connectivity issue |
Async Support
The Python SDK supports async/await via AsyncClient:
import asyncio
from roset import AsyncClient
async def main():
client = AsyncClient(api_key="rsk_...")
# All methods are async
file = await client.files.upload(
filename="report.pdf",
content_type="application/pdf",
size_bytes=45678,
)
result = await client.files.list(status="completed")
print(f"Files: {len(result['files'])}")
asyncio.run(main())Next Steps
- API Reference -- full REST API documentation.
- Quickstart -- upload your first file.
- Webhooks -- react to processing events in real time.