Skip to content

Python API

Dinobase can be used as a Python library in addition to the CLI and MCP server.

The core database class. Wraps DuckDB with metadata management.

from dinobase.db import DinobaseDB
db = DinobaseDB() # uses ~/.dinobase/dinobase.duckdb
# or
db = DinobaseDB("/path/to/custom.duckdb")

Property. Returns the DuckDB connection, creating it and initializing metadata tables on first access.

conn = db.conn # duckdb.DuckDBPyConnection

Execute SQL, return results as a list of dicts.

rows = db.query("SELECT * FROM stripe.customers LIMIT 5")
# [{"id": "cus_123", "email": "alice@example.com", ...}, ...]

Execute SQL, return (column_names, rows) tuple.

columns, rows = db.query_raw("SELECT * FROM stripe.customers LIMIT 5")
# (["id", "email", ...], [("cus_123", "alice@example.com", ...), ...])

List all user schemas (excludes internal ones).

schemas = db.get_schemas()
# ["stripe", "hubspot"]

List all tables in a schema.

tables = db.get_tables("stripe")
# ["customers", "subscriptions", "charges", "invoices"]

Get column info for a table.

columns = db.get_columns("stripe", "customers")
# [{"column_name": "id", "data_type": "VARCHAR", "is_nullable": "YES"}, ...]
count = db.get_row_count("stripe", "customers")
# 180

db.log_sync_start(source_name, source_type)

Section titled “db.log_sync_start(source_name, source_type)”

Record the start of a sync. Returns the sync log ID.

sync_id = db.log_sync_start("stripe", "stripe")

Record the end of a sync.

db.log_sync_end(sync_id, "success", tables_synced=4, rows_synced=1255)
db.log_sync_end(sync_id, "error", error_message="API rate limited")

db.update_table_metadata(source_name, schema_name, annotations=None)

Section titled “db.update_table_metadata(source_name, schema_name, annotations=None)”

Refresh _dinobase.tables and _dinobase.columns from actual schema. Optionally include annotations.

annotations = {
"customers": {
"email": {"description": "Customer email", "note": "Join key"}
}
}
db.update_table_metadata("stripe", "stripe", annotations=annotations)

Get stored annotations for a table’s columns.

anns = db.get_column_annotations("stripe", "customers")
# {"email": {"description": "The customer's email address.", "note": "Can be null"}}

Close the database connection.


High-level query interface with formatted results.

from dinobase.db import DinobaseDB
from dinobase.query.engine import QueryEngine
db = DinobaseDB()
engine = QueryEngine(db)

Execute SQL, return formatted results.

result = engine.execute("SELECT * FROM stripe.customers LIMIT 5")
# {"columns": [...], "rows": [...], "row_count": 5, "total_rows": 5}
# Check for errors
if "error" in result:
print(result["error"])

List all connectors with tables and stats.

info = engine.list_connectors()
# {"connectors": [{"name": "stripe", "tables": [...], "table_count": 4, ...}]}

Describe a table. Accepts "schema.table" or just "table".

desc = engine.describe_table("stripe.customers")
# {"schema": "stripe", "table": "customers", "columns": [...], "sample_rows": [...]}

Handles writes back to upstream systems via SQL.

from dinobase.db import DinobaseDB
from dinobase.query.mutations import MutationEngine
db = DinobaseDB()
engine = MutationEngine(db)

engine.handle_sql(sql, max_affected_rows=50)

Section titled “engine.handle_sql(sql, max_affected_rows=50)”

Parse and preview one or more mutation statements. Returns a preview with mutation_id.

result = engine.handle_sql("UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123'")
# {"mutation_id": "mut_...", "status": "pending_confirmation", "preview": {...}}

Multi-statement SQL is supported:

result = engine.handle_sql("""
UPDATE stripe.customers SET name = 'Acme' WHERE id = 'cus_123';
INSERT INTO linear.issues (title) VALUES ('Follow up');
""")
# {"batch_id": "batch_...", "mutations": [...]}

Execute a pending mutation.

result = engine.confirm("mut_abc123def456")
# {"status": "executed", "api_write_back": {...}, "local_update": {...}}

Execute multiple pending mutations.

result = engine.confirm_batch(["mut_abc123", "mut_def456"])
# {"status": "batch_executed", "succeeded": 2, "failed": 0}

Cancel a pending mutation.

result = engine.cancel("mut_abc123def456")
# {"status": "cancelled", "mutation_id": "mut_abc123def456"}

List all pending mutations.

pending = engine.list_pending()
# [{"mutation_id": "mut_...", "operation": "UPDATE", ...}]

Syncs a single connector using dlt.

from dinobase.db import DinobaseDB
from dinobase.sync.engine import SyncEngine
db = DinobaseDB()
engine = SyncEngine(db)
result = engine.sync("stripe", {"type": "stripe", "credentials": {"api_key": "sk_..."}})
# SyncResult(source_name="stripe", status="success", tables_synced=4, rows_synced=1255)
@dataclass
class SyncResult:
source_name: str
source_type: str
tables_synced: int
rows_synced: int
status: str # "success" or "error"
error: str | None

Scheduled syncing with concurrency.

from dinobase.db import DinobaseDB
from dinobase.sync.scheduler import SyncScheduler
db = DinobaseDB()
scheduler = SyncScheduler(db, default_interval="1h", max_workers=10)

Sync all connectors that are due. Returns list of result dicts.

results = scheduler.sync_all_due()

Run the sync loop in the foreground. Blocks until interrupted.

try:
scheduler.run_loop()
except KeyboardInterrupt:
pass

scheduler.start_background(check_interval=60)

Section titled “scheduler.start_background(check_interval=60)”

Start sync loop in a background daemon thread.

scheduler.start_background()
# ... do other work ...
scheduler.stop()

Stop the background sync loop.


Sync Python API for calling tools on connected MCP servers. Uses the same connector configs as dinobase sync and dinobase mcp call.

from dinobase.mcp import call, tools, servers, search, instructions

Call an MCP tool by server.tool reference. Keyword arguments are passed as tool arguments.

from dinobase.mcp import call
# No arguments
result = call("posthog_mcp.dashboards-get-all")
# With arguments
result = call("posthog_mcp.dashboard-get", id=1118504)

Returns a dict with:

KeyDescription
contentList of content blocks ({"type": "text", "text": "..."})
structuredContentParsed structured output, if the server provides it
isErrorTrue if the tool returned an error

List all tools on an MCP server with full schemas.

from dinobase.mcp import tools
tool_list = tools("posthog_mcp")
# [{"name": "list_projects", "description": "...", "inputSchema": {...}}, ...]

List all connected MCP servers (those with a transport: in their connector YAML).

from dinobase.mcp import servers
svrs = servers()
# [{"name": "posthog_mcp", "description": "...", "transport": "stdio", "tools": 12, "tool_names": [...]}, ...]

Regex search tool names and descriptions across all connected MCP servers.

from dinobase.mcp import search
matches = search("dashboard")
# [{"server": "posthog_mcp", "tool": "dashboards-get-all", "description": "..."}, ...]
matches = search("list.*")

Get a server’s name, version, and usage instructions.

from dinobase.mcp import instructions
info = instructions("posthog_mcp")
# {"name": "PostHog MCP", "version": "1.0.0", "instructions": "..."}