Files
slideshow/db.py
2026-05-06 05:11:12 +02:00

89 lines
3.2 KiB
Python

import os
import sqlite3
from datetime import datetime, timezone
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'booru.db')
def get_conn():
return sqlite3.connect(DB_PATH)
def init_db():
with get_conn() as c:
c.execute("""CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id TEXT NOT NULL,
site TEXT NOT NULL,
filename TEXT NOT NULL UNIQUE,
tags TEXT NOT NULL DEFAULT '',
file_url TEXT NOT NULL,
post_url TEXT NOT NULL DEFAULT '',
preview_filename TEXT NOT NULL DEFAULT '',
downloaded_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)""")
c.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_site_post ON images (site, post_id)"
)
# migrate existing DBs that predate the post_url column
try:
c.execute("ALTER TABLE images ADD COLUMN post_url TEXT NOT NULL DEFAULT ''")
except sqlite3.OperationalError:
pass
# migrate existing DBs that predate the preview_filename column
try:
c.execute("ALTER TABLE images ADD COLUMN preview_filename TEXT NOT NULL DEFAULT ''")
except sqlite3.OperationalError:
pass
# migrate existing DBs that predate the created_at column
try:
c.execute("ALTER TABLE images ADD COLUMN created_at INTEGER NOT NULL DEFAULT 0")
except sqlite3.OperationalError:
pass
def image_exists(site, post_id):
with get_conn() as c:
return (
c.execute(
"SELECT 1 FROM images WHERE site=? AND post_id=?", (site, post_id)
).fetchone()
is not None
)
def insert_image(post_id, site, filename, tags, file_url, post_url, preview_filename='', created_at=0):
with get_conn() as c:
c.execute(
"INSERT OR IGNORE INTO images (post_id, site, filename, tags, file_url, post_url, preview_filename, created_at) VALUES (?,?,?,?,?,?,?,?)",
(post_id, site, filename, tags, file_url, post_url, preview_filename, created_at),
)
def reset_db():
with get_conn() as c:
c.execute('DELETE FROM images')
def get_image_count():
with get_conn() as c:
return c.execute('SELECT COUNT(*) FROM images').fetchone()[0]
def search_images(tag_query, sort=None):
terms = tag_query.split() if tag_query.strip() else []
order = 'created_at DESC' if sort == 'newest' else 'created_at ASC' if sort == 'oldest' else 'id ASC'
with get_conn() as c:
if not terms:
rows = c.execute(
f"SELECT filename, preview_filename, post_url, tags, created_at FROM images ORDER BY {order}"
).fetchall()
else:
where = ' AND '.join(['tags LIKE ?'] * len(terms))
params = [f'%{t}%' for t in terms]
rows = c.execute(
f"SELECT filename, preview_filename, post_url, tags, created_at FROM images WHERE {where} ORDER BY {order}",
params,
).fetchall()
return [{'filename': r[0], 'preview_filename': r[1], 'post_url': r[2], 'tags': r[3], 'created_at': r[4]} for r in rows]