#!/usr/bin/env python3 """ Flask app to visualize a large SQLite database of speed tests. - Plots time series for down_90th and up_90th - Serves a lazily loaded table (server-side pagination) - Designed for ~1GB DB: efficient SQLite pragmas + timestamp cursor pagination Run: export DB_PATH="/path/to/your/speedtests.sqlite3" python3 app.py Then open http://127.0.0.1:5000 Optional: create an index (speeds up range scans by timestamp): sqlite3 "$DB_PATH" "CREATE INDEX IF NOT EXISTS idx_speed_tests_ts ON speed_tests(timestamp);" """ from __future__ import annotations import os import math import sqlite3 from datetime import datetime import pytz from flask import Flask, jsonify, request, render_template DB_PATH = os.environ.get("DB_PATH", "./speed_tests.sqlite3") PAGE_SIZE_DEFAULT = 200 PAGE_SIZE_MAX = 1000 SERIES_MAX_POINTS_DEFAULT = 5000 SERIES_MAX_POINTS_HARD = 20000 app = Flask(__name__, template_folder="templates") # ---------------------- SQLite Helpers ---------------------- def get_conn() -> sqlite3.Connection: uri = f"file:{os.path.abspath(DB_PATH)}?cache=shared" conn = sqlite3.connect(uri, uri=True, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA temp_store=MEMORY;") conn.execute("PRAGMA cache_size=-20000;") # ~20MB cache return conn CONN = get_conn() # ---------------------- Utilities ---------------------- def ts_to_iso(ts: float | int | str) -> str: try: t = float(ts) except Exception: return str(ts) return datetime.fromtimestamp(t, tz=pytz.timezone("Europe/Berlin")).isoformat() # ---------------------- API Endpoints ---------------------- @app.get("/api/series") def api_series(): q_from = request.args.get("from", type=float) q_to = request.args.get("to", type=float) max_points = request.args.get("max_points", type=int) or SERIES_MAX_POINTS_DEFAULT max_points = max(100, min(max_points, SERIES_MAX_POINTS_HARD)) params = [] where = [] if q_from is not None: where.append("timestamp >= ?") params.append(q_from) if q_to is not None: where.append("timestamp <= ?") params.append(q_to) where_sql = ("WHERE " + " AND ".join(where)) if where else "" cnt_sql = f"SELECT COUNT(*) AS n FROM speed_tests {where_sql};" n_rows = CONN.execute(cnt_sql, params).fetchone()[0] stride = 1 if n_rows <= max_points else math.ceil(n_rows / max_points) sql = ( f"SELECT timestamp, down_90th, up_90th FROM speed_tests {where_sql} " "ORDER BY timestamp ASC;" ) rows = [] kept = 0 for i, r in enumerate(CONN.execute(sql, params)): if (i % stride) == 0: rows.append({ "t": float(r["timestamp"]), "t_iso": ts_to_iso(r["timestamp"]), "down_90th": None if r["down_90th"] is None else float(r["down_90th"]), "up_90th": None if r["up_90th"] is None else float(r["up_90th"]), }) kept += 1 if kept >= max_points: break return jsonify({ "count_total": n_rows, "stride": stride, "returned": len(rows), "points": rows, }) @app.get("/api/table") def api_table(): limit = request.args.get("limit", type=int) or PAGE_SIZE_DEFAULT limit = max(1, min(limit, PAGE_SIZE_MAX)) order = request.args.get("order", default="desc") order = "ASC" if str(order).lower().startswith("asc") else "DESC" cursor = request.args.get("cursor", type=float) params = [] where = [] if cursor is not None: if order == "ASC": where.append("timestamp > ?") else: where.append("timestamp < ?") params.append(cursor) where_sql = ("WHERE " + " AND ".join(where)) if where else "" sql = ( "SELECT id, timestamp, failed, isp, ip, location_code, location_city, location_region, " "latency, jitter, down_100kB, down_1MB, down_10MB, down_25MB, down_90th, " "up_100kB, up_1MB, up_10MB, up_90th " f"FROM speed_tests {where_sql} ORDER BY timestamp {order} LIMIT ?;" ) params2 = params + [limit] rows = [dict(r) for r in CONN.execute(sql, params2).fetchall()] next_cursor = None if rows: last_ts = rows[-1]["timestamp"] try: next_cursor = float(last_ts) except Exception: next_cursor = last_ts for r in rows: r["timestamp_iso"] = ts_to_iso(r["timestamp"]) return jsonify({ "limit": limit, "order": order.lower(), "count": len(rows), "next_cursor": next_cursor, "rows": rows, }) @app.get("/") def index(): return render_template( "index.html", db_name=os.path.basename(DB_PATH), SERIES_MAX_POINTS_DEFAULT=SERIES_MAX_POINTS_DEFAULT, SERIES_MAX_POINTS_HARD=SERIES_MAX_POINTS_HARD, PAGE_SIZE_DEFAULT=PAGE_SIZE_DEFAULT, PAGE_SIZE_MAX=PAGE_SIZE_MAX, ) if __name__ == "__main__": port = int(os.environ.get("PORT", 5000)) app.run(host="0.0.0.0", port=port, debug=True)