Files
speed-logger/app.py
2026-02-15 17:31:33 +01:00

149 lines
4.6 KiB
Python

import os
import time
from datetime import datetime
from flask import Flask, render_template, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
app = Flask(__name__)
# connect to your existing database file
# ensure the path is correct relative to where you run this script
db_path = os.path.join(os.getcwd(), 'speedtest.db')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# Reflect the existing table explicitly to match your schema
class SpeedTest(db.Model):
__tablename__ = 'speed_tests'
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.Float, nullable=False)
failed = db.Column(db.Boolean, nullable=False)
isp = db.Column(db.String)
ip = db.Column(db.String)
location_code = db.Column(db.String)
location_city = db.Column(db.String)
location_region = db.Column(db.String)
latency = db.Column(db.Float)
jitter = db.Column(db.Float)
down_100kB = db.Column(db.Float)
down_1MB = db.Column(db.Float)
down_10MB = db.Column(db.Float)
down_25MB = db.Column(db.Float)
down_90th = db.Column(db.Float) # We will graph this
up_100kB = db.Column(db.Float)
up_1MB = db.Column(db.Float)
up_10MB = db.Column(db.Float)
up_90th = db.Column(db.Float) # We will graph this
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/hourly_stats')
def get_hourly_stats():
# We use raw SQL for efficiency and to easily extract the hour from the timestamp
# strftime('%H') works on the unix timestamp if we convert it first
# SQLite: strftime('%H', datetime(timestamp, 'unixepoch'))
sql = text("""
SELECT
strftime('%H', datetime(timestamp, 'unixepoch', 'localtime')) as hour,
AVG(down_90th) as avg_down,
AVG(up_90th) as avg_up,
COUNT(*) as count
FROM speed_tests
WHERE failed = 0
GROUP BY hour
ORDER BY hour ASC
""")
result = db.session.execute(sql)
data = []
for row in result:
data.append({
'hour': row[0], # 00, 01, ... 23
'down': row[1],
'up': row[2],
'count': row[3]
})
return jsonify(data)
@app.route('/api/data')
def get_data():
# Get start/end dates from query params (defaults to last 24h if missing)
start_str = request.args.get('start')
end_str = request.args.get('end')
# Default to last 7 days if no filter provided
now = time.time()
if start_str and end_str:
# Convert string 'YYYY-MM-DD' to unix timestamp
start_ts = datetime.strptime(start_str, '%Y-%m-%d').timestamp()
end_ts = datetime.strptime(end_str, '%Y-%m-%d').timestamp() + 86400 # Include the full end day
else:
start_ts = now - (7 * 24 * 60 * 60)
end_ts = now
# Query the database
query = SpeedTest.query.filter(
SpeedTest.timestamp >= start_ts,
SpeedTest.timestamp <= end_ts
).order_by(SpeedTest.timestamp.asc())
results = query.all()
# Process data for JSON response
data = []
total_tests = 0
failed_tests = 0
total_down = 0
total_up = 0
count_valid_speed = 0
for r in results:
total_tests += 1
if r.failed:
failed_tests += 1
# Format timestamp for JS (milliseconds)
ts_ms = r.timestamp * 1000
# Only include successful speeds in averages
if not r.failed and r.down_90th is not None and r.up_90th is not None:
total_down += r.down_90th
total_up += r.up_90th
count_valid_speed += 1
data.append({
'timestamp': ts_ms,
'readable_date': datetime.fromtimestamp(r.timestamp).strftime('%Y-%m-%d %H:%M'),
'down': r.down_90th,
'up': r.up_90th,
'failed': r.failed,
'latency': r.latency,
'isp': r.isp
})
# Calculate Averages and Uptime
uptime_pct = ((total_tests - failed_tests) / total_tests * 100) if total_tests > 0 else 0
avg_down = (total_down / count_valid_speed) if count_valid_speed > 0 else 0
avg_up = (total_up / count_valid_speed) if count_valid_speed > 0 else 0
return jsonify({
'rows': data,
'stats': {
'uptime': round(uptime_pct, 2),
'avg_down': round(avg_down, 2),
'avg_up': round(avg_up, 2),
'total_tests': total_tests
}
})
if __name__ == '__main__':
app.run(debug=True)