import sqlite3 import os from datetime import datetime import pytz import time from config import DB_PATH def get_conn() -> sqlite3.Connection: uri = f"file:{os.path.abspath(DB_PATH)}?cache=shared" conn = sqlite3.connect(uri, uri=True, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA temp_store=MEMORY;") conn.execute("PRAGMA cache_size=-20000;") # ~20MB cache return conn CONN = get_conn() # ---------------------- Utilities ---------------------- def ts_to_iso(ts: float | int | str) -> str: try: t = float(ts) except Exception: return str(ts) return datetime.fromtimestamp(t, tz=pytz.timezone("Europe/Berlin")).isoformat() # ---------------------- API Endpoints ---------------------- def api_series(q_from: float, q_to: float): params = [] where = [] if q_from is not None: where.append("timestamp >= ?") params.append(q_from) if q_to is not None: where.append("timestamp <= ?") params.append(q_to) where_sql = ("WHERE " + " AND ".join(where)) if where else "" sql = ( f"SELECT timestamp, down_90th, up_90th FROM speed_tests {where_sql} " "ORDER BY timestamp ASC;" ) rows = [] for i, r in enumerate(CONN.execute(sql, params)): rows.append({ "t": float(r["timestamp"]), "t_iso": ts_to_iso(r["timestamp"]), "down_90th": None if r["down_90th"] is None else float(r["down_90th"]), "up_90th": None if r["up_90th"] is None else float(r["up_90th"]), }) return rows rows = api_series(1756220400, time.time()) does_not_work = 0 for r in rows: if r["down_90th"] is None or r["up_90th"] is None: does_not_work += 1 print(f"Working percentage: {100 - (does_not_work / len(rows))*100}% ({does_not_work}/{len(rows)})")