Skip to content

Python API

Dinobase can be used as a Python library in addition to the CLI and MCP server.

The core database class. Wraps DuckDB with metadata management.

from dinobase.db import DinobaseDB
db = DinobaseDB() # uses ~/.dinobase/dinobase.duckdb
# or
db = DinobaseDB("/path/to/custom.duckdb")

Property. Returns the DuckDB connection, creating it and initializing metadata tables on first access.

conn = db.conn # duckdb.DuckDBPyConnection

Execute SQL, return results as a list of dicts.

rows = db.query("SELECT * FROM stripe.customers LIMIT 5")
# [{"id": "cus_123", "email": "alice@example.com", ...}, ...]

Execute SQL, return (column_names, rows) tuple.

columns, rows = db.query_raw("SELECT * FROM stripe.customers LIMIT 5")
# (["id", "email", ...], [("cus_123", "alice@example.com", ...), ...])

List all user schemas (excludes internal ones).

schemas = db.get_schemas()
# ["stripe", "hubspot"]

List all tables in a schema.

tables = db.get_tables("stripe")
# ["customers", "subscriptions", "charges", "invoices"]

Get column info for a table.

columns = db.get_columns("stripe", "customers")
# [{"column_name": "id", "data_type": "VARCHAR", "is_nullable": "YES"}, ...]
count = db.get_row_count("stripe", "customers")
# 180

db.log_sync_start(source_name, source_type)

Section titled “db.log_sync_start(source_name, source_type)”

Record the start of a sync. Returns the sync log ID.

sync_id = db.log_sync_start("stripe", "stripe")

Record the end of a sync.

db.log_sync_end(sync_id, "success", tables_synced=4, rows_synced=1255)
db.log_sync_end(sync_id, "error", error_message="API rate limited")

db.update_table_metadata(source_name, schema_name, annotations=None)

Section titled “db.update_table_metadata(source_name, schema_name, annotations=None)”

Refresh _dinobase.tables and _dinobase.columns from actual schema. Optionally include annotations.

annotations = {
"customers": {
"email": {"description": "Customer email", "note": "Join key"}
}
}
db.update_table_metadata("stripe", "stripe", annotations=annotations)

Get stored annotations for a table’s columns.

anns = db.get_column_annotations("stripe", "customers")
# {"email": {"description": "The customer's email address.", "note": "Can be null"}}

Close the database connection.


High-level query interface with formatted results.

from dinobase.db import DinobaseDB
from dinobase.query.engine import QueryEngine
db = DinobaseDB()
engine = QueryEngine(db)

Execute SQL, return formatted results.

result = engine.execute("SELECT * FROM stripe.customers LIMIT 5")
# {"columns": [...], "rows": [...], "row_count": 5, "total_rows": 5}
# Check for errors
if "error" in result:
print(result["error"])

List all sources with tables and stats.

info = engine.list_sources()
# {"sources": [{"name": "stripe", "tables": [...], "table_count": 4, ...}]}

Describe a table. Accepts "schema.table" or just "table".

desc = engine.describe_table("stripe.customers")
# {"schema": "stripe", "table": "customers", "columns": [...], "sample_rows": [...]}

Handles writes back to source systems via SQL.

from dinobase.db import DinobaseDB
from dinobase.query.mutations import MutationEngine
db = DinobaseDB()
engine = MutationEngine(db)

engine.handle_sql(sql, max_affected_rows=50)

Section titled “engine.handle_sql(sql, max_affected_rows=50)”

Parse and preview one or more mutation statements. Returns a preview with mutation_id.

result = engine.handle_sql("UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123'")
# {"mutation_id": "mut_...", "status": "pending_confirmation", "preview": {...}}

Multi-statement SQL is supported:

result = engine.handle_sql("""
UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123';
INSERT INTO linear.issues (title) VALUES ('Follow up');
""")
# {"batch_id": "batch_...", "mutations": [...]}

Execute a pending mutation.

result = engine.confirm("mut_abc123def456")
# {"status": "executed", "api_write_back": {...}, "local_update": {...}}

Execute multiple pending mutations.

result = engine.confirm_batch(["mut_abc123", "mut_def456"])
# {"status": "batch_executed", "succeeded": 2, "failed": 0}

Cancel a pending mutation.

result = engine.cancel("mut_abc123def456")
# {"status": "cancelled", "mutation_id": "mut_abc123def456"}

List all pending mutations.

pending = engine.list_pending()
# [{"mutation_id": "mut_...", "operation": "UPDATE", ...}]

Syncs a single source using dlt.

from dinobase.db import DinobaseDB
from dinobase.sync.engine import SyncEngine
db = DinobaseDB()
engine = SyncEngine(db)
result = engine.sync("stripe", {"type": "stripe", "credentials": {"api_key": "sk_..."}})
# SyncResult(source_name="stripe", status="success", tables_synced=4, rows_synced=1255)
@dataclass
class SyncResult:
source_name: str
source_type: str
tables_synced: int
rows_synced: int
status: str # "success" or "error"
error: str | None

Scheduled syncing with concurrency.

from dinobase.db import DinobaseDB
from dinobase.sync.scheduler import SyncScheduler
db = DinobaseDB()
scheduler = SyncScheduler(db, default_interval="1h", max_workers=10)

Sync all sources that are due. Returns list of result dicts.

results = scheduler.sync_all_due()

Run the sync loop in the foreground. Blocks until interrupted.

try:
scheduler.run_loop()
except KeyboardInterrupt:
pass

scheduler.start_background(check_interval=60)

Section titled “scheduler.start_background(check_interval=60)”

Start sync loop in a background daemon thread.

scheduler.start_background()
# ... do other work ...
scheduler.stop()

Stop the background sync loop.