""" SQLite database for tracking daily usage """ import sqlite3 from datetime import datetime, timezone, timedelta from config import settings import threading _local = threading.local() def get_db(): """Get thread-local database connection""" if not hasattr(_local, "connection"): _local.connection = sqlite3.connect( settings.database_path, check_same_thread=False ) _local.connection.row_factory = sqlite3.Row return _local.connection def init_db(): """Initialize database schema""" conn = get_db() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS usage ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, date TEXT NOT NULL, count INTEGER DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, date) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_usage_user_date ON usage(user_id, date) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS extraction_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, tier TEXT, timestamp TEXT DEFAULT CURRENT_TIMESTAMP, success INTEGER DEFAULT 1, processing_time_ms INTEGER ) """) conn.commit() cleanup_old_records() def get_today_utc() -> str: """Get today's date in UTC as string""" return datetime.now(timezone.utc).strftime("%Y-%m-%d") def get_usage_today(user_id: str) -> int: """Get number of extractions used today for a user""" conn = get_db() cursor = conn.cursor() today = get_today_utc() cursor.execute( "SELECT count FROM usage WHERE user_id = ? AND date = ?", (user_id, today) ) row = cursor.fetchone() return row["count"] if row else 0 def increment_usage(user_id: str) -> int: """Increment usage count for today. Returns new count.""" conn = get_db() cursor = conn.cursor() today = get_today_utc() now = datetime.now(timezone.utc).isoformat() cursor.execute(""" INSERT INTO usage (user_id, date, count, updated_at) VALUES (?, ?, 1, ?) ON CONFLICT(user_id, date) DO UPDATE SET count = count + 1, updated_at = ? """, (user_id, today, now, now)) conn.commit() return get_usage_today(user_id) def cleanup_old_records(days_to_keep: int = 30): """Remove usage records older than specified days""" conn = get_db() cursor = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(days=days_to_keep)).strftime("%Y-%m-%d") cursor.execute("DELETE FROM usage WHERE date < ?", (cutoff,)) cursor.execute("DELETE FROM extraction_log WHERE date(timestamp) < ?", (cutoff,)) conn.commit() def get_usage_stats(user_id: str, days: int = 7) -> list: """Get usage history for a user""" conn = get_db() cursor = conn.cursor() cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d") cursor.execute(""" SELECT date, count FROM usage WHERE user_id = ? AND date >= ? ORDER BY date DESC """, (user_id, cutoff)) return [dict(row) for row in cursor.fetchall()]