Files
botbot/db_service/main.py

183 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
import os
from typing import AsyncGenerator
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Header, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy import BigInteger, Column, MetaData, Table, Text, text
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
# ---------------------------------------------------------------------------
# Environment & logging setup
# ---------------------------------------------------------------------------
load_dotenv()
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=LOG_LEVEL, format="%(levelname)s | %(name)s | %(message)s")
log = logging.getLogger("db_service")
LOG_TOKEN = os.getenv("LOG_TOKEN", "changeme")
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "postgres")
POSTGRES_DB = os.getenv("POSTGRES_DB", "botbot")
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "postgres")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432")
DATABASE_URL = (
f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}" f"@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"
)
ADMIN_URL = (
f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}" f"@{POSTGRES_HOST}:{POSTGRES_PORT}/postgres"
)
# ---------------------------------------------------------------------------
# SQLAlchemy table definition (metadata)
# ---------------------------------------------------------------------------
metadata = MetaData()
messages = Table(
"messages",
metadata,
Column("event_id", Text, primary_key=True),
Column("room_id", Text, nullable=False),
Column("user_id", Text, nullable=False),
Column("ts_ms", BigInteger, nullable=False),
Column("body", Text, nullable=False),
)
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="Botbot Logging Service", version="1.1.0")
class MessageIn(BaseModel):
"""Payload received from matrix_service."""
event_id: str = Field(..., example="$14327358242610PhrSn:matrix.org")
room_id: str = Field(..., example="!someroomid:matrix.org")
user_id: str = Field(..., example="@alice:matrix.org")
ts_ms: int = Field(..., example=1713866689000, description="Matrix server_timestamp in ms since epoch")
body: str = Field(..., example="Hello, world!")
# ---------------------------------------------------------------------------
# Database engine/session factories (populated on startup)
# ---------------------------------------------------------------------------
engine: AsyncEngine | None = None
SessionLocal: async_sessionmaker[AsyncSession] | None = None
async def ensure_database_exists() -> None:
"""Connect to the admin DB and create `POSTGRES_DB` if it is missing."""
log.info("Checking whether database %s exists", POSTGRES_DB)
admin_engine = create_async_engine(ADMIN_URL, pool_pre_ping=True)
try:
async with admin_engine.begin() as conn:
db_exists = await conn.scalar(
text("SELECT 1 FROM pg_database WHERE datname = :db"),
{"db": POSTGRES_DB},
)
if not db_exists:
log.warning("Database %s not found creating it", POSTGRES_DB)
await conn.execute(text(f'CREATE DATABASE "{POSTGRES_DB}"'))
log.info("Database %s created", POSTGRES_DB)
finally:
await admin_engine.dispose()
async def create_engine_and_tables() -> None:
"""Initialise SQLAlchemy engine and create the `messages` table if needed."""
global engine, SessionLocal # noqa: PLW0603
engine = create_async_engine(DATABASE_URL, pool_pre_ping=True)
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
log.info("Database initialised and tables ensured.")
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session: # type: ignore[arg-type]
yield session
# ---------------------------------------------------------------------------
# Lifespan events
# ---------------------------------------------------------------------------
@app.on_event("startup")
async def on_startup() -> None: # noqa: D401 (imperative mood)
"""Ensure the database *and* table exist before serving traffic."""
log.info("Starting up")
try:
await create_engine_and_tables()
except OperationalError as err:
# Common case: database itself does not yet exist.
if "does not exist" in str(err):
log.warning("Primary database missing attempting to create it")
await ensure_database_exists()
# Retry now that DB exists
await create_engine_and_tables()
else:
log.error("Database connection failed: %s", err)
raise
@app.on_event("shutdown")
async def on_shutdown() -> None: # noqa: D401
if engine:
await engine.dispose()
# ---------------------------------------------------------------------------
# API endpoints
# ---------------------------------------------------------------------------
@app.get("/healthz", tags=["health"])
async def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.post("/api/v1/log", status_code=status.HTTP_202_ACCEPTED, tags=["log"])
async def log_message(
payload: MessageIn,
x_log_token: str = Header(alias="X-Log-Token"),
session: AsyncSession = Depends(get_session),
) -> dict[str, str]:
"""Persist one Matrix message to Postgres.
Requires header `X-Log-Token` matching the `LOG_TOKEN` envvar.
"""
if x_log_token != LOG_TOKEN:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token")
stmt = (
messages.insert()
.values(**payload.model_dump())
.on_conflict_do_nothing(index_elements=[messages.c.event_id])
)
await session.execute(stmt)
await session.commit()
return {"status": "accepted"}