Python API
Dinobase can be used as a Python library in addition to the CLI and MCP server.
DinobaseDB
Section titled “DinobaseDB”The core database class. Wraps DuckDB with metadata management.
from dinobase.db import DinobaseDB
db = DinobaseDB() # uses ~/.dinobase/dinobase.duckdb# ordb = DinobaseDB("/path/to/custom.duckdb")Methods
Section titled “Methods”db.conn
Section titled “db.conn”Property. Returns the DuckDB connection, creating it and initializing metadata tables on first access.
conn = db.conn # duckdb.DuckDBPyConnectiondb.query(sql)
Section titled “db.query(sql)”Execute SQL, return results as a list of dicts.
rows = db.query("SELECT * FROM stripe.customers LIMIT 5")# [{"id": "cus_123", "email": "alice@example.com", ...}, ...]db.query_raw(sql)
Section titled “db.query_raw(sql)”Execute SQL, return (column_names, rows) tuple.
columns, rows = db.query_raw("SELECT * FROM stripe.customers LIMIT 5")# (["id", "email", ...], [("cus_123", "alice@example.com", ...), ...])db.get_schemas()
Section titled “db.get_schemas()”List all user schemas (excludes internal ones).
schemas = db.get_schemas()# ["stripe", "hubspot"]db.get_tables(schema)
Section titled “db.get_tables(schema)”List all tables in a schema.
tables = db.get_tables("stripe")# ["customers", "subscriptions", "charges", "invoices"]db.get_columns(schema, table)
Section titled “db.get_columns(schema, table)”Get column info for a table.
columns = db.get_columns("stripe", "customers")# [{"column_name": "id", "data_type": "VARCHAR", "is_nullable": "YES"}, ...]db.get_row_count(schema, table)
Section titled “db.get_row_count(schema, table)”count = db.get_row_count("stripe", "customers")# 180db.log_sync_start(source_name, source_type)
Section titled “db.log_sync_start(source_name, source_type)”Record the start of a sync. Returns the sync log ID.
sync_id = db.log_sync_start("stripe", "stripe")db.log_sync_end(sync_id, status, ...)
Section titled “db.log_sync_end(sync_id, status, ...)”Record the end of a sync.
db.log_sync_end(sync_id, "success", tables_synced=4, rows_synced=1255)db.log_sync_end(sync_id, "error", error_message="API rate limited")db.update_table_metadata(source_name, schema_name, annotations=None)
Section titled “db.update_table_metadata(source_name, schema_name, annotations=None)”Refresh _dinobase.tables and _dinobase.columns from actual schema. Optionally include annotations.
annotations = { "customers": { "email": {"description": "Customer email", "note": "Join key"} }}db.update_table_metadata("stripe", "stripe", annotations=annotations)db.get_column_annotations(schema, table)
Section titled “db.get_column_annotations(schema, table)”Get stored annotations for a table’s columns.
anns = db.get_column_annotations("stripe", "customers")# {"email": {"description": "The customer's email address.", "note": "Can be null"}}db.close()
Section titled “db.close()”Close the database connection.
QueryEngine
Section titled “QueryEngine”High-level query interface with formatted results.
from dinobase.db import DinobaseDBfrom dinobase.query.engine import QueryEngine
db = DinobaseDB()engine = QueryEngine(db)Methods
Section titled “Methods”engine.execute(sql, max_rows=200)
Section titled “engine.execute(sql, max_rows=200)”Execute SQL, return formatted results.
result = engine.execute("SELECT * FROM stripe.customers LIMIT 5")# {"columns": [...], "rows": [...], "row_count": 5, "total_rows": 5}
# Check for errorsif "error" in result: print(result["error"])engine.list_connectors()
Section titled “engine.list_connectors()”List all connectors with tables and stats.
info = engine.list_connectors()# {"connectors": [{"name": "stripe", "tables": [...], "table_count": 4, ...}]}engine.describe_table(table_ref)
Section titled “engine.describe_table(table_ref)”Describe a table. Accepts "schema.table" or just "table".
desc = engine.describe_table("stripe.customers")# {"schema": "stripe", "table": "customers", "columns": [...], "sample_rows": [...]}MutationEngine
Section titled “MutationEngine”Handles writes back to upstream systems via SQL.
from dinobase.db import DinobaseDBfrom dinobase.query.mutations import MutationEngine
db = DinobaseDB()engine = MutationEngine(db)Methods
Section titled “Methods”engine.handle_sql(sql, max_affected_rows=50)
Section titled “engine.handle_sql(sql, max_affected_rows=50)”Parse and preview one or more mutation statements. Returns a preview with mutation_id.
result = engine.handle_sql("UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123'")# {"mutation_id": "mut_...", "status": "pending_confirmation", "preview": {...}}Multi-statement SQL is supported:
result = engine.handle_sql(""" UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123'; INSERT INTO linear.issues (title) VALUES ('Follow up');""")# {"batch_id": "batch_...", "mutations": [...]}engine.confirm(mutation_id)
Section titled “engine.confirm(mutation_id)”Execute a pending mutation.
result = engine.confirm("mut_abc123def456")# {"status": "executed", "api_write_back": {...}, "local_update": {...}}engine.confirm_batch(mutation_ids)
Section titled “engine.confirm_batch(mutation_ids)”Execute multiple pending mutations.
result = engine.confirm_batch(["mut_abc123", "mut_def456"])# {"status": "batch_executed", "succeeded": 2, "failed": 0}engine.cancel(mutation_id)
Section titled “engine.cancel(mutation_id)”Cancel a pending mutation.
result = engine.cancel("mut_abc123def456")# {"status": "cancelled", "mutation_id": "mut_abc123def456"}engine.list_pending()
Section titled “engine.list_pending()”List all pending mutations.
pending = engine.list_pending()# [{"mutation_id": "mut_...", "operation": "UPDATE", ...}]SyncEngine
Section titled “SyncEngine”Syncs a single connector using dlt.
from dinobase.db import DinobaseDBfrom dinobase.sync.engine import SyncEngine
db = DinobaseDB()engine = SyncEngine(db)
result = engine.sync("stripe", {"type": "stripe", "credentials": {"api_key": "sk_..."}})# SyncResult(source_name="stripe", status="success", tables_synced=4, rows_synced=1255)SyncResult
Section titled “SyncResult”@dataclassclass SyncResult: source_name: str source_type: str tables_synced: int rows_synced: int status: str # "success" or "error" error: str | NoneSyncScheduler
Section titled “SyncScheduler”Scheduled syncing with concurrency.
from dinobase.db import DinobaseDBfrom dinobase.sync.scheduler import SyncScheduler
db = DinobaseDB()scheduler = SyncScheduler(db, default_interval="1h", max_workers=10)Methods
Section titled “Methods”scheduler.sync_all_due()
Section titled “scheduler.sync_all_due()”Sync all connectors that are due. Returns list of result dicts.
results = scheduler.sync_all_due()scheduler.run_loop(check_interval=60)
Section titled “scheduler.run_loop(check_interval=60)”Run the sync loop in the foreground. Blocks until interrupted.
try: scheduler.run_loop()except KeyboardInterrupt: passscheduler.start_background(check_interval=60)
Section titled “scheduler.start_background(check_interval=60)”Start sync loop in a background daemon thread.
scheduler.start_background()# ... do other work ...scheduler.stop()scheduler.stop()
Section titled “scheduler.stop()”Stop the background sync loop.
MCP Client
Section titled “MCP Client”Sync Python API for calling tools on connected MCP servers. Uses the same connector configs as dinobase sync and dinobase mcp call.
from dinobase.mcp import call, tools, servers, search, instructionscall(ref, **kwargs)
Section titled “call(ref, **kwargs)”Call an MCP tool by server.tool reference. Keyword arguments are passed as tool arguments.
from dinobase.mcp import call
# No argumentsresult = call("posthog_mcp.dashboards-get-all")
# With argumentsresult = call("posthog_mcp.dashboard-get", id=1118504)Returns a dict with:
| Key | Description |
|---|---|
content | List of content blocks ({"type": "text", "text": "..."}) |
structuredContent | Parsed structured output, if the server provides it |
isError | True if the tool returned an error |
tools(server)
Section titled “tools(server)”List all tools on an MCP server with full schemas.
from dinobase.mcp import tools
tool_list = tools("posthog_mcp")# [{"name": "list_projects", "description": "...", "inputSchema": {...}}, ...]servers()
Section titled “servers()”List all connected MCP servers (those with a transport: in their connector YAML).
from dinobase.mcp import servers
svrs = servers()# [{"name": "posthog_mcp", "description": "...", "transport": "stdio", "tools": 12, "tool_names": [...]}, ...]search(pattern)
Section titled “search(pattern)”Regex search tool names and descriptions across all connected MCP servers.
from dinobase.mcp import search
matches = search("dashboard")# [{"server": "posthog_mcp", "tool": "dashboards-get-all", "description": "..."}, ...]
matches = search("list.*")instructions(server)
Section titled “instructions(server)”Get a server’s name, version, and usage instructions.
from dinobase.mcp import instructions
info = instructions("posthog_mcp")# {"name": "PostHog MCP", "version": "1.0.0", "instructions": "..."}