Todo
- Class diagram
- Functionality examples
- Couple to given sql tables
SQLite Class
We have a SQLite class that handles requests to the SQLite database.
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Iterable, Mapping, Optional
# ---- Small utility: dict rows ------------------------------------------------
def _dict_factory(cursor: sqlite3.Cursor, row: tuple) -> dict:
return {col[0]: row[idx] for idx, col in enumerate(cursor.description or [])}
# ---- Main DB class -----------------------------------------------------------
@dataclass
class UsageDB:
path: str = ":memory:"
timeout: float = 30.0
def __post_init__(self) -> None:
self.conn = sqlite3.connect(self.path, timeout=self.timeout, isolation_level=None)
self.conn.row_factory = _dict_factory
self._configure()
self.create_all()
# -- connection/pragma -----------------------------------------------------
def _configure(self) -> None:
cur = self.conn.cursor()
cur.execute("PRAGMA foreign_keys = ON;")
cur.execute("PRAGMA journal_mode = WAL;")
cur.execute("PRAGMA synchronous = NORMAL;")
cur.execute("PRAGMA temp_store = MEMORY;")
def close(self) -> None:
try:
self.conn.close()
except Exception:
pass
@contextmanager
def transaction(self):
cur = self.conn.cursor()
try:
cur.execute("BEGIN;")
yield
cur.execute("COMMIT;")
except Exception:
cur.execute("ROLLBACK;")
raise
# -- schema ----------------------------------------------------------------
def create_all(self) -> None:
cur = self.conn.cursor()
# Dimension tables
cur.executescript("""
CREATE TABLE IF NOT EXISTS dim_group (
group_id INTEGER PRIMARY KEY,
group_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS dim_user (
user_id TEXT PRIMARY KEY,
display_name TEXT,
group_id INTEGER REFERENCES dim_group(group_id) ON UPDATE CASCADE ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS dim_project (
project_id INTEGER PRIMARY KEY,
cloud_project_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS dim_machine (
machine_id INTEGER PRIMARY KEY,
machine_name TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS map_user_project (
user_id TEXT NOT NULL REFERENCES dim_user(user_id) ON DELETE CASCADE,
project_id INTEGER NOT NULL REFERENCES dim_project(project_id) ON DELETE CASCADE,
PRIMARY KEY (user_id, project_id)
);
CREATE TABLE IF NOT EXISTS map_project_machine (
project_id INTEGER NOT NULL REFERENCES dim_project(project_id) ON DELETE CASCADE,
machine_id INTEGER NOT NULL REFERENCES dim_machine(machine_id) ON DELETE CASCADE,
PRIMARY KEY (project_id, machine_id)
);
CREATE TABLE IF NOT EXISTS dim_instance (
instance_id INTEGER PRIMARY KEY,
host TEXT NOT NULL,
port INTEGER,
raw_label TEXT
);
""")
# Fact tables
cur.executescript("""
CREATE TABLE IF NOT EXISTS fact_usage (
usage_id INTEGER PRIMARY KEY,
ts TEXT NOT NULL,
scope TEXT NOT NULL CHECK (scope IN ('ada','project','machine','user')),
project_id INTEGER REFERENCES dim_project(project_id),
machine_id INTEGER REFERENCES dim_machine(machine_id),
user_id TEXT REFERENCES dim_user(user_id),
busy_cpu_seconds_total REAL NOT NULL DEFAULT 0.0,
idle_cpu_seconds_total REAL NOT NULL DEFAULT 0.0,
busy_kwh REAL NOT NULL DEFAULT 0.0,
idle_kwh REAL NOT NULL DEFAULT 0.0,
busy_gCo2eq REAL NOT NULL DEFAULT 0.0,
idle_gCo2eq REAL NOT NULL DEFAULT 0.0,
intensity_gCo2eq_kwh REAL,
CHECK ( (scope='ada' AND project_id IS NULL AND machine_id IS NULL AND user_id IS NULL)
OR (scope='project' AND project_id IS NOT NULL AND machine_id IS NULL AND user_id IS NULL)
OR (scope='machine' AND machine_id IS NOT NULL AND project_id IS NULL AND user_id IS NULL)
OR (scope='user' AND user_id IS NOT NULL AND project_id IS NULL AND machine_id IS NULL)
),
UNIQUE (scope, ts, COALESCE(project_id, -1), COALESCE(machine_id, -1), COALESCE(user_id, ''))
);
CREATE INDEX IF NOT EXISTS idx_fact_usage_ts ON fact_usage(ts);
CREATE INDEX IF NOT EXISTS idx_fact_usage_scope ON fact_usage(scope);
CREATE INDEX IF NOT EXISTS idx_fact_usage_project ON fact_usage(project_id) WHERE project_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_fact_usage_machine ON fact_usage(machine_id) WHERE machine_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_fact_usage_user ON fact_usage(user_id) WHERE user_id IS NOT NULL;
CREATE TABLE IF NOT EXISTS active_workspace (
workspace_id INTEGER PRIMARY KEY,
instance_id INTEGER NOT NULL REFERENCES dim_instance(instance_id),
machine_id INTEGER NOT NULL REFERENCES dim_machine(machine_id),
user_id TEXT REFERENCES dim_user(user_id),
project_id INTEGER REFERENCES dim_project(project_id),
started_at TEXT NOT NULL
);
""")
# Views
cur.executescript("""
CREATE VIEW IF NOT EXISTS v_ada_timeseries AS
SELECT
ts,
busy_cpu_seconds_total,
idle_cpu_seconds_total,
busy_kwh,
idle_kwh,
busy_gCo2eq,
idle_gCo2eq,
CASE
WHEN (busy_kwh + idle_kwh) > 0
THEN (busy_gCo2eq + idle_gCo2eq) / (busy_kwh + idle_kwh)
ELSE NULL
END AS intensity_gCo2eq_kwh
FROM fact_usage
WHERE scope='ada';
CREATE VIEW IF NOT EXISTS v_project_timeseries AS
SELECT
p.cloud_project_name,
f.ts,
f.busy_cpu_seconds_total,
f.idle_cpu_seconds_total,
f.busy_kwh,
f.idle_kwh,
f.busy_gCo2eq,
f.idle_gCo2eq,
COALESCE(f.intensity_gCo2eq_kwh,
CASE WHEN (f.busy_kwh + f.idle_kwh) > 0
THEN (f.busy_gCo2eq + f.idle_gCo2eq)/(f.busy_kwh + f.idle_kwh)
END) AS intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_project p ON p.project_id = f.project_id
WHERE f.scope='project';
CREATE VIEW IF NOT EXISTS v_machine_timeseries AS
SELECT
m.machine_name,
f.ts,
f.busy_cpu_seconds_total,
f.idle_cpu_seconds_total,
f.busy_kwh,
f.idle_kwh,
f.busy_gCo2eq,
f.idle_gCo2eq,
COALESCE(f.intensity_gCo2eq_kwh,
CASE WHEN (f.busy_kwh + f.idle_kwh) > 0
THEN (f.busy_gCo2eq + f.idle_gCo2eq)/(f.busy_kwh + f.idle_kwh)
END) AS intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_machine m ON m.machine_id = f.machine_id
WHERE f.scope='machine';
CREATE VIEW IF NOT EXISTS v_user_timeseries AS
SELECT
u.user_id,
f.ts,
f.busy_cpu_seconds_total,
f.idle_cpu_seconds_total,
f.busy_kwh,
f.idle_kwh,
f.busy_gCo2eq,
f.idle_gCo2eq,
COALESCE(f.intensity_gCo2eq_kwh,
CASE WHEN (f.busy_kwh + f.idle_kwh) > 0
THEN (f.busy_gCo2eq + f.idle_gCo2eq)/(f.busy_kwh + f.idle_kwh)
END) AS intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_user u ON u.user_id = f.user_id
WHERE f.scope='user';
CREATE VIEW IF NOT EXISTS v_project_totals AS
SELECT
p.cloud_project_name,
SUM(busy_cpu_seconds_total) AS busy_cpu_seconds_total,
SUM(idle_cpu_seconds_total) AS idle_cpu_seconds_total,
SUM(busy_kwh) AS busy_kwh,
SUM(idle_kwh) AS idle_kwh,
SUM(busy_gCo2eq) AS busy_gCo2eq,
SUM(idle_gCo2eq) AS idle_gCo2eq
FROM fact_usage f
JOIN dim_project p ON p.project_id = f.project_id
WHERE f.scope='project'
GROUP BY p.cloud_project_name;
CREATE VIEW IF NOT EXISTS v_machine_totals AS
SELECT
m.machine_name,
SUM(busy_cpu_seconds_total) AS busy_cpu_seconds_total,
SUM(idle_cpu_seconds_total) AS idle_cpu_seconds_total,
SUM(busy_kwh) AS busy_kwh,
SUM(idle_kwh) AS idle_kwh,
SUM(busy_gCo2eq) AS busy_gCo2eq,
SUM(idle_gCo2eq) AS idle_gCo2eq
FROM fact_usage f
JOIN dim_machine m ON m.machine_id = f.machine_id
WHERE f.scope='machine'
GROUP BY m.machine_name;
CREATE VIEW IF NOT EXISTS v_group_totals AS
SELECT
g.group_name,
SUM(f.busy_cpu_seconds_total) AS busy_cpu_seconds_total,
SUM(f.idle_cpu_seconds_total) AS idle_cpu_seconds_total,
SUM(f.busy_kwh) AS busy_kwh,
SUM(f.idle_kwh) AS idle_kwh,
SUM(f.busy_gCo2eq) AS busy_gCo2eq,
SUM(f.idle_gCo2eq) AS idle_gCo2eq
FROM fact_usage f
JOIN dim_user u ON u.user_id = f.user_id
JOIN dim_group g ON g.group_id = u.group_id
WHERE f.scope='user'
GROUP BY g.group_name;
CREATE VIEW IF NOT EXISTS v_user_totals AS
SELECT
u.user_id,
SUM(busy_cpu_seconds_total) AS busy_cpu_seconds_total,
SUM(idle_cpu_seconds_total) AS idle_cpu_seconds_total,
SUM(busy_kwh) AS busy_kwh,
SUM(idle_kwh) AS idle_kwh,
SUM(busy_gCo2eq) AS busy_gCo2eq,
SUM(idle_gCo2eq) AS idle_gCo2eq
FROM fact_usage f
JOIN dim_user u ON u.user_id = f.user_id
WHERE f.scope='user'
GROUP BY u.user_id;
CREATE VIEW IF NOT EXISTS v_project_averages AS
SELECT
p.cloud_project_name,
AVG(busy_kwh) AS avg_busy_energy_kwh,
AVG(idle_kwh) AS avg_idle_energy_kwh,
AVG(busy_gCo2eq) AS avg_busy_carbon_gCo2eq,
AVG(idle_gCo2eq) AS avg_idle_carbon_gCo2eq,
AVG(
CASE WHEN (busy_kwh + idle_kwh) > 0
THEN (busy_gCo2eq + idle_gCo2eq)/(busy_kwh + idle_kwh)
END
) AS avg_intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_project p ON p.project_id = f.project_id
WHERE f.scope='project'
GROUP BY p.cloud_project_name;
CREATE VIEW IF NOT EXISTS v_machine_averages AS
SELECT
m.machine_name,
AVG(busy_kwh) AS avg_busy_energy_kwh,
AVG(idle_kwh) AS avg_idle_energy_kwh,
AVG(busy_gCo2eq) AS avg_busy_carbon_gCo2eq,
AVG(idle_gCo2eq) AS avg_idle_carbon_gCo2eq,
AVG(
CASE WHEN (busy_kwh + idle_kwh) > 0
THEN (busy_gCo2eq + idle_gCo2eq)/(busy_kwh + idle_kwh)
END
) AS avg_intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_machine m ON m.machine_id = f.machine_id
WHERE f.scope='machine'
GROUP BY m.machine_name;
CREATE VIEW IF NOT EXISTS v_group_averages AS
SELECT
g.group_name,
AVG(f.busy_kwh) AS avg_busy_energy_kwh,
AVG(f.idle_kwh) AS avg_idle_energy_kwh,
AVG(f.busy_gCo2eq) AS avg_busy_carbon_gCo2eq,
AVG(f.idle_gCo2eq) AS avg_idle_carbon_gCo2eq,
AVG(
CASE WHEN (f.busy_kwh + f.idle_kwh) > 0
THEN (f.busy_gCo2eq + f.idle_gCo2eq)/(f.busy_kwh + f.idle_kwh)
END
) AS avg_intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_user u ON u.user_id = f.user_id
JOIN dim_group g ON g.group_id = u.group_id
WHERE f.scope='user'
GROUP BY g.group_name;
CREATE VIEW IF NOT EXISTS v_user_averages AS
SELECT
u.user_id,
AVG(busy_kwh) AS avg_busy_energy_kwh,
AVG(idle_kwh) AS avg_idle_energy_kwh,
AVG(busy_gCo2eq) AS avg_busy_carbon_gCo2eq,
AVG(idle_gCo2eq) AS avg_idle_carbon_gCo2eq,
AVG(
CASE WHEN (busy_kwh + idle_kwh) > 0
THEN (busy_gCo2eq + idle_gCo2eq)/(busy_kwh + idle_kwh)
END
) AS avg_intensity_gCo2eq_kwh
FROM fact_usage f
JOIN dim_user u ON u.user_id = f.user_id
WHERE f.scope='user'
GROUP BY u.user_id;
""")
# -- get-or-create helpers -------------------------------------------------
def get_or_create_group(self, group_name: str) -> int:
cur = self.conn.cursor()
cur.execute("INSERT OR IGNORE INTO dim_group(group_name) VALUES (?)", (group_name,))
cur.execute("SELECT group_id FROM dim_group WHERE group_name=?", (group_name,))
return cur.fetchone()["group_id"]
def get_or_create_user(self, user_id: str, display_name: Optional[str] = None,
group_name: Optional[str] = None) -> str:
group_id = None
if group_name:
group_id = self.get_or_create_group(group_name)
cur = self.conn.cursor()
cur.execute("""
INSERT INTO dim_user(user_id, display_name, group_id)
VALUES (?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
display_name=COALESCE(excluded.display_name, dim_user.display_name),
group_id=COALESCE(excluded.group_id, dim_user.group_id)
""", (user_id, display_name, group_id))
return user_id
def get_or_create_project(self, cloud_project_name: str) -> int:
cur = self.conn.cursor()
cur.execute("INSERT OR IGNORE INTO dim_project(cloud_project_name) VALUES (?)", (cloud_project_name,))
cur.execute("SELECT project_id FROM dim_project WHERE cloud_project_name=?", (cloud_project_name,))
return cur.fetchone()["project_id"]
def get_or_create_machine(self, machine_name: str) -> int:
cur = self.conn.cursor()
cur.execute("INSERT OR IGNORE INTO dim_machine(machine_name) VALUES (?)", (machine_name,))
cur.execute("SELECT machine_id FROM dim_machine WHERE machine_name=?", (machine_name,))
return cur.fetchone()["machine_id"]
def get_or_create_instance(self, host: str, port: Optional[int] = None,
raw_label: Optional[str] = None) -> int:
cur = self.conn.cursor()
# no natural unique key → create if exact tuple not present
cur.execute("""
INSERT INTO dim_instance(host, port, raw_label) VALUES (?, ?, ?)
""", (host, port, raw_label))
return cur.lastrowid
# -- mapping helpers -------------------------------------------------------
def map_user_project(self, user_id: str, project_id: int | None = None,
cloud_project_name: str | None = None) -> None:
if project_id is None:
if not cloud_project_name:
raise ValueError("Provide project_id or cloud_project_name")
project_id = self.get_or_create_project(cloud_project_name)
self.get_or_create_user(user_id) # ensure user exists
self.conn.execute(
"INSERT OR IGNORE INTO map_user_project(user_id, project_id) VALUES (?, ?)",
(user_id, project_id)
)
def map_project_machine(self, project_id: int | None = None, machine_id: int | None = None,
cloud_project_name: str | None = None, machine_name: str | None = None) -> None:
if project_id is None:
if not cloud_project_name:
raise ValueError("Provide project_id or cloud_project_name")
project_id = self.get_or_create_project(cloud_project_name)
if machine_id is None:
if not machine_name:
raise ValueError("Provide machine_id or machine_name")
machine_id = self.get_or_create_machine(machine_name)
self.conn.execute(
"INSERT OR IGNORE INTO map_project_machine(project_id, machine_id) VALUES (?, ?)",
(project_id, machine_id)
)
# -- active workspaces -----------------------------------------------------
def start_workspace(self, instance_id: int, machine_id: int,
started_at_iso_utc: str, user_id: Optional[str] = None,
project_id: Optional[int] = None) -> int:
cur = self.conn.cursor()
cur.execute("""
INSERT INTO active_workspace(instance_id, machine_id, user_id, project_id, started_at)
VALUES (?, ?, ?, ?, ?)
""", (instance_id, machine_id, user_id, project_id, started_at_iso_utc))
return cur.lastrowid
# -- fact_usage inserts ----------------------------------------------------
@staticmethod
def _validate_scope(scope: str,
project_id: Optional[int],
machine_id: Optional[int],
user_id: Optional[str]) -> None:
if scope not in {"ada", "project", "machine", "user"}:
raise ValueError("scope must be one of {'ada','project','machine','user'}")
want = {
"ada": (None, None, None),
"project": ("req", None, None),
"machine": (None, "req", None),
"user": (None, None, "req"),
}[scope]
checks = [
(want[0], project_id, "project_id"),
(want[1], machine_id, "machine_id"),
(want[2], user_id, "user_id"),
]
for expected, provided, name in checks:
if expected == "req" and provided is None:
raise ValueError(f"{name} is required for scope='{scope}'")
if expected is None and provided is not None and scope != name.split("_")[0]:
# ensure *only* the required key is set
# (the SQL CHECK also enforces this)
raise ValueError(f"{name} must be NULL for scope='{scope}'")
def insert_fact_usage(self, *, scope: str, ts_iso_utc: str,
project_id: Optional[int] = None,
machine_id: Optional[int] = None,
user_id: Optional[str] = None,
busy_cpu_seconds_total: float = 0.0,
idle_cpu_seconds_total: float = 0.0,
busy_kwh: float = 0.0,
idle_kwh: float = 0.0,
busy_gCo2eq: float = 0.0,
idle_gCo2eq: float = 0.0,
intensity_gCo2eq_kwh: Optional[float] = None) -> int:
self._validate_scope(scope, project_id, machine_id, user_id)
cur = self.conn.cursor()
cur.execute("""
INSERT INTO fact_usage(
ts, scope, project_id, machine_id, user_id,
busy_cpu_seconds_total, idle_cpu_seconds_total,
busy_kwh, idle_kwh, busy_gCo2eq, idle_gCo2eq, intensity_gCo2eq_kwh
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(scope, ts, COALESCE(project_id, -1), COALESCE(machine_id, -1), COALESCE(user_id, ''))
DO UPDATE SET
busy_cpu_seconds_total = excluded.busy_cpu_seconds_total,
idle_cpu_seconds_total = excluded.idle_cpu_seconds_total,
busy_kwh = excluded.busy_kwh,
idle_kwh = excluded.idle_kwh,
busy_gCo2eq = excluded.busy_gCo2eq,
idle_gCo2eq = excluded.idle_gCo2eq,
intensity_gCo2eq_kwh = excluded.intensity_gCo2eq_kwh
""", (
ts_iso_utc, scope, project_id, machine_id, user_id,
busy_cpu_seconds_total, idle_cpu_seconds_total,
busy_kwh, idle_kwh, busy_gCo2eq, idle_gCo2eq, intensity_gCo2eq_kwh
))
return cur.lastrowid
def bulk_insert_fact_usage(self, rows: Iterable[Mapping[str, Any]]) -> None:
with self.transaction():
for r in rows:
self.insert_fact_usage(**r)
# -- query helpers (views) -------------------------------------------------
def q(self, sql: str, params: tuple | dict = ()) -> list[dict]:
return list(self.conn.execute(sql, params))
# Timeseries
def ada_timeseries(self) -> list[dict]:
return self.q("SELECT * FROM v_ada_timeseries ORDER BY ts")
def project_timeseries(self, cloud_project_name: str) -> list[dict]:
return self.q("""
SELECT * FROM v_project_timeseries
WHERE cloud_project_name=? ORDER BY ts
""", (cloud_project_name,))
def machine_timeseries(self, machine_name: str) -> list[dict]:
return self.q("""
SELECT * FROM v_machine_timeseries
WHERE machine_name=? ORDER BY ts
""", (machine_name,))
def user_timeseries(self, user_id: str) -> list[dict]:
return self.q("""
SELECT * FROM v_user_timeseries
WHERE user_id=? ORDER BY ts
""", (user_id,))
# Totals
def project_totals(self) -> list[dict]:
return self.q("SELECT * FROM v_project_totals ORDER BY cloud_project_name")
def machine_totals(self) -> list[dict]:
return self.q("SELECT * FROM v_machine_totals ORDER BY machine_name")
def group_totals(self) -> list[dict]:
return self.q("SELECT * FROM v_group_totals ORDER BY group_name")
def user_totals(self) -> list[dict]:
return self.q("SELECT * FROM v_user_totals ORDER BY user_id")
# Averages
def project_averages(self) -> list[dict]:
return self.q("SELECT * FROM v_project_averages ORDER BY cloud_project_name")
def machine_averages(self) -> list[dict]:
return self.q("SELECT * FROM v_machine_averages ORDER BY machine_name")
def group_averages(self) -> list[dict]:
return self.q("SELECT * FROM v_group_averages ORDER BY group_name")
def user_averages(self) -> list[dict]:
return self.q("SELECT * FROM v_user_averages ORDER BY user_id")