Python API
Dinobase can be used as a Python library in addition to the CLI and MCP server.
DinobaseDB
Section titled “DinobaseDB”The core database class. Wraps DuckDB with metadata management.
from dinobase.db import DinobaseDB
db = DinobaseDB() # uses ~/.dinobase/dinobase.duckdb# ordb = DinobaseDB("/path/to/custom.duckdb")Methods
Section titled “Methods”db.conn
Section titled “db.conn”Property. Returns the DuckDB connection, creating it and initializing metadata tables on first access.
conn = db.conn # duckdb.DuckDBPyConnectiondb.query(sql)
Section titled “db.query(sql)”Execute SQL, return results as a list of dicts.
rows = db.query("SELECT * FROM stripe.customers LIMIT 5")# [{"id": "cus_123", "email": "alice@example.com", ...}, ...]db.query_raw(sql)
Section titled “db.query_raw(sql)”Execute SQL, return (column_names, rows) tuple.
columns, rows = db.query_raw("SELECT * FROM stripe.customers LIMIT 5")# (["id", "email", ...], [("cus_123", "alice@example.com", ...), ...])db.get_schemas()
Section titled “db.get_schemas()”List all user schemas (excludes internal ones).
schemas = db.get_schemas()# ["stripe", "hubspot"]db.get_tables(schema)
Section titled “db.get_tables(schema)”List all tables in a schema.
tables = db.get_tables("stripe")# ["customers", "subscriptions", "charges", "invoices"]db.get_columns(schema, table)
Section titled “db.get_columns(schema, table)”Get column info for a table.
columns = db.get_columns("stripe", "customers")# [{"column_name": "id", "data_type": "VARCHAR", "is_nullable": "YES"}, ...]db.get_row_count(schema, table)
Section titled “db.get_row_count(schema, table)”count = db.get_row_count("stripe", "customers")# 180db.log_sync_start(source_name, source_type)
Section titled “db.log_sync_start(source_name, source_type)”Record the start of a sync. Returns the sync log ID.
sync_id = db.log_sync_start("stripe", "stripe")db.log_sync_end(sync_id, status, ...)
Section titled “db.log_sync_end(sync_id, status, ...)”Record the end of a sync.
db.log_sync_end(sync_id, "success", tables_synced=4, rows_synced=1255)db.log_sync_end(sync_id, "error", error_message="API rate limited")db.update_table_metadata(source_name, schema_name, annotations=None)
Section titled “db.update_table_metadata(source_name, schema_name, annotations=None)”Refresh _dinobase.tables and _dinobase.columns from actual schema. Optionally include annotations.
annotations = { "customers": { "email": {"description": "Customer email", "note": "Join key"} }}db.update_table_metadata("stripe", "stripe", annotations=annotations)db.get_column_annotations(schema, table)
Section titled “db.get_column_annotations(schema, table)”Get stored annotations for a table’s columns.
anns = db.get_column_annotations("stripe", "customers")# {"email": {"description": "The customer's email address.", "note": "Can be null"}}db.close()
Section titled “db.close()”Close the database connection.
QueryEngine
Section titled “QueryEngine”High-level query interface with formatted results.
from dinobase.db import DinobaseDBfrom dinobase.query.engine import QueryEngine
db = DinobaseDB()engine = QueryEngine(db)Methods
Section titled “Methods”engine.execute(sql, max_rows=200)
Section titled “engine.execute(sql, max_rows=200)”Execute SQL, return formatted results.
result = engine.execute("SELECT * FROM stripe.customers LIMIT 5")# {"columns": [...], "rows": [...], "row_count": 5, "total_rows": 5}
# Check for errorsif "error" in result: print(result["error"])engine.list_sources()
Section titled “engine.list_sources()”List all sources with tables and stats.
info = engine.list_sources()# {"sources": [{"name": "stripe", "tables": [...], "table_count": 4, ...}]}engine.describe_table(table_ref)
Section titled “engine.describe_table(table_ref)”Describe a table. Accepts "schema.table" or just "table".
desc = engine.describe_table("stripe.customers")# {"schema": "stripe", "table": "customers", "columns": [...], "sample_rows": [...]}MutationEngine
Section titled “MutationEngine”Handles writes back to source systems via SQL.
from dinobase.db import DinobaseDBfrom dinobase.query.mutations import MutationEngine
db = DinobaseDB()engine = MutationEngine(db)Methods
Section titled “Methods”engine.handle_sql(sql, max_affected_rows=50)
Section titled “engine.handle_sql(sql, max_affected_rows=50)”Parse and preview one or more mutation statements. Returns a preview with mutation_id.
result = engine.handle_sql("UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123'")# {"mutation_id": "mut_...", "status": "pending_confirmation", "preview": {...}}Multi-statement SQL is supported:
result = engine.handle_sql(""" UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123'; INSERT INTO linear.issues (title) VALUES ('Follow up');""")# {"batch_id": "batch_...", "mutations": [...]}engine.confirm(mutation_id)
Section titled “engine.confirm(mutation_id)”Execute a pending mutation.
result = engine.confirm("mut_abc123def456")# {"status": "executed", "api_write_back": {...}, "local_update": {...}}engine.confirm_batch(mutation_ids)
Section titled “engine.confirm_batch(mutation_ids)”Execute multiple pending mutations.
result = engine.confirm_batch(["mut_abc123", "mut_def456"])# {"status": "batch_executed", "succeeded": 2, "failed": 0}engine.cancel(mutation_id)
Section titled “engine.cancel(mutation_id)”Cancel a pending mutation.
result = engine.cancel("mut_abc123def456")# {"status": "cancelled", "mutation_id": "mut_abc123def456"}engine.list_pending()
Section titled “engine.list_pending()”List all pending mutations.
pending = engine.list_pending()# [{"mutation_id": "mut_...", "operation": "UPDATE", ...}]SyncEngine
Section titled “SyncEngine”Syncs a single source using dlt.
from dinobase.db import DinobaseDBfrom dinobase.sync.engine import SyncEngine
db = DinobaseDB()engine = SyncEngine(db)
result = engine.sync("stripe", {"type": "stripe", "credentials": {"api_key": "sk_..."}})# SyncResult(source_name="stripe", status="success", tables_synced=4, rows_synced=1255)SyncResult
Section titled “SyncResult”@dataclassclass SyncResult: source_name: str source_type: str tables_synced: int rows_synced: int status: str # "success" or "error" error: str | NoneSyncScheduler
Section titled “SyncScheduler”Scheduled syncing with concurrency.
from dinobase.db import DinobaseDBfrom dinobase.sync.scheduler import SyncScheduler
db = DinobaseDB()scheduler = SyncScheduler(db, default_interval="1h", max_workers=10)Methods
Section titled “Methods”scheduler.sync_all_due()
Section titled “scheduler.sync_all_due()”Sync all sources that are due. Returns list of result dicts.
results = scheduler.sync_all_due()scheduler.run_loop(check_interval=60)
Section titled “scheduler.run_loop(check_interval=60)”Run the sync loop in the foreground. Blocks until interrupted.
try: scheduler.run_loop()except KeyboardInterrupt: passscheduler.start_background(check_interval=60)
Section titled “scheduler.start_background(check_interval=60)”Start sync loop in a background daemon thread.
scheduler.start_background()# ... do other work ...scheduler.stop()scheduler.stop()
Section titled “scheduler.stop()”Stop the background sync loop.