diff --git a/.env.example b/.env.example index 2911e09..1c9211c 100644 --- a/.env.example +++ b/.env.example @@ -12,3 +12,13 @@ AI_HANDLER_URL="http://ai_service:8000" # OpenAI API Key OPENAI_API_KEY=your_openai_api_key_here AI_HANDLER_TOKEN=common_token_here + +# db_service +LOG_TOKEN=your_log_token_here + + +# PostgreSQL Configuration +POSTGRES_USER=database_user +POSTGRES_PASSWORD=database_password +POSTGRES_DB=database_name +POSTGRES_HOST=postgres \ No newline at end of file diff --git a/db_service/Dockerfile b/db_service/Dockerfile new file mode 100644 index 0000000..30c68f8 --- /dev/null +++ b/db_service/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.11-slim + +# Prevent Python from buffering stdout/stderr so logs appear immediately +ENV PYTHONUNBUFFERED=1 + +# Install system dependencies (none needed for asyncpg on slim) + +WORKDIR /app + +# Install Python dependencies first to leverage Docker layer caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy service source code only after dependencies +COPY . /app + +EXPOSE 8000 + +# Use Uvicorn as the ASGI server +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/db_service/main.py b/db_service/main.py new file mode 100644 index 0000000..39affc4 --- /dev/null +++ b/db_service/main.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import logging +import os +from typing import AsyncGenerator + +from dotenv import load_dotenv +from fastapi import Depends, FastAPI, Header, HTTPException, status +from pydantic import BaseModel, Field +from sqlalchemy import BigInteger, Column, MetaData, Table, Text, text +from sqlalchemy.exc import OperationalError +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +# --------------------------------------------------------------------------- +# Environment & logging setup +# --------------------------------------------------------------------------- + +load_dotenv() +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +logging.basicConfig(level=LOG_LEVEL, format="%(levelname)s | %(name)s | %(message)s") +log = logging.getLogger("db_service") + +LOG_TOKEN = os.getenv("LOG_TOKEN", "changeme") +POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres") +POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "postgres") +POSTGRES_DB = os.getenv("POSTGRES_DB", "botbot") +POSTGRES_HOST = os.getenv("POSTGRES_HOST", "postgres") +POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5432") + +DATABASE_URL = ( + f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}" f"@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" +) +ADMIN_URL = ( + f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}" f"@{POSTGRES_HOST}:{POSTGRES_PORT}/postgres" +) + +# --------------------------------------------------------------------------- +# SQLAlchemy table definition (metadata) +# --------------------------------------------------------------------------- + +metadata = MetaData() + +messages = Table( + "messages", + metadata, + Column("event_id", Text, primary_key=True), + Column("room_id", Text, nullable=False), + Column("user_id", Text, nullable=False), + Column("ts_ms", BigInteger, nullable=False), + Column("body", Text, nullable=False), +) + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- + +app = FastAPI(title="Botbot Logging Service", version="1.1.0") + + +class MessageIn(BaseModel): + """Payload received from matrix_service.""" + + event_id: str = Field(..., example="$14327358242610PhrSn:matrix.org") + room_id: str = Field(..., example="!someroomid:matrix.org") + user_id: str = Field(..., example="@alice:matrix.org") + ts_ms: int = Field(..., example=1713866689000, description="Matrix server_timestamp in ms since epoch") + body: str = Field(..., example="Hello, world!") + + +# --------------------------------------------------------------------------- +# Database engine/session factories (populated on startup) +# --------------------------------------------------------------------------- + +engine: AsyncEngine | None = None +SessionLocal: async_sessionmaker[AsyncSession] | None = None + + +async def ensure_database_exists() -> None: + """Connect to the admin DB and create `POSTGRES_DB` if it is missing.""" + + log.info("Checking whether database %s exists", POSTGRES_DB) + + admin_engine = create_async_engine(ADMIN_URL, pool_pre_ping=True) + try: + async with admin_engine.begin() as conn: + db_exists = await conn.scalar( + text("SELECT 1 FROM pg_database WHERE datname = :db"), + {"db": POSTGRES_DB}, + ) + if not db_exists: + log.warning("Database %s not found – creating it", POSTGRES_DB) + await conn.execute(text(f'CREATE DATABASE "{POSTGRES_DB}"')) + log.info("Database %s created", POSTGRES_DB) + finally: + await admin_engine.dispose() + + +async def create_engine_and_tables() -> None: + """Initialise SQLAlchemy engine and create the `messages` table if needed.""" + + global engine, SessionLocal # noqa: PLW0603 + + engine = create_async_engine(DATABASE_URL, pool_pre_ping=True) + + async with engine.begin() as conn: + await conn.run_sync(metadata.create_all) + + SessionLocal = async_sessionmaker(engine, expire_on_commit=False) + log.info("Database initialised and tables ensured.") + + +async def get_session() -> AsyncGenerator[AsyncSession, None]: + async with SessionLocal() as session: # type: ignore[arg-type] + yield session + + +# --------------------------------------------------------------------------- +# Lifespan events +# --------------------------------------------------------------------------- + +@app.on_event("startup") +async def on_startup() -> None: # noqa: D401 (imperative mood) + """Ensure the database *and* table exist before serving traffic.""" + log.info("Starting up") + + try: + await create_engine_and_tables() + except OperationalError as err: + # Common case: database itself does not yet exist. + if "does not exist" in str(err): + log.warning("Primary database missing – attempting to create it") + await ensure_database_exists() + # Retry now that DB exists + await create_engine_and_tables() + else: + log.error("Database connection failed: %s", err) + raise + + +@app.on_event("shutdown") +async def on_shutdown() -> None: # noqa: D401 + if engine: + await engine.dispose() + + +# --------------------------------------------------------------------------- +# API endpoints +# --------------------------------------------------------------------------- + + +@app.get("/healthz", tags=["health"]) +async def healthz() -> dict[str, str]: + return {"status": "ok"} + + +@app.post("/api/v1/log", status_code=status.HTTP_202_ACCEPTED, tags=["log"]) +async def log_message( + payload: MessageIn, + x_log_token: str = Header(alias="X-Log-Token"), + session: AsyncSession = Depends(get_session), +) -> dict[str, str]: + """Persist one Matrix message to Postgres. + + Requires header `X-Log-Token` matching the `LOG_TOKEN` env‑var. + """ + + if x_log_token != LOG_TOKEN: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="invalid token") + + stmt = ( + messages.insert() + .values(**payload.model_dump()) + .on_conflict_do_nothing(index_elements=[messages.c.event_id]) + ) + await session.execute(stmt) + await session.commit() + return {"status": "accepted"} diff --git a/db_service/requirements.txt b/db_service/requirements.txt new file mode 100644 index 0000000..7a9715b --- /dev/null +++ b/db_service/requirements.txt @@ -0,0 +1,10 @@ +# Web framework & ASGI server +fastapi +uvicorn[standard] + +# Database access +sqlalchemy[asyncio]>=2.0 +asyncpg>=0.29 + +# Environment / configuration helpers +python-dotenv>=1.0 diff --git a/docker-compose.yml b/docker-compose.yml index c7221a2..9b9f52c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,42 @@ services: + # ----------------------- + # Database (PostgreSQL) + # ----------------------- + postgres: + image: postgres:16-alpine + restart: unless-stopped + environment: + - POSTGRES_USER + - POSTGRES_PASSWORD + - POSTGRES_DB + volumes: + - postgres-data:/var/lib/postgresql/data + healthcheck: + test: "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}" + interval: 10s + timeout: 5s + retries: 5 + + # ----------------------- + # Conversation‑logging microservice + # ----------------------- + db_service: + build: + context: ./db_service + restart: unless-stopped + environment: + - POSTGRES_USER + - POSTGRES_PASSWORD + - POSTGRES_DB + - POSTGRES_HOST + - LOG_TOKEN + depends_on: + postgres: + condition: service_healthy + ports: + - "8000:8000" # expose externally only if needed + + matrix_service: build: ./matrix_service environment: @@ -25,6 +63,10 @@ services: redis: image: redis:7 restart: unless-stopped + volumes: + - redis-data:/data volumes: - matrix_data: \ No newline at end of file + matrix_data: + redis-data: + postgres-data: \ No newline at end of file diff --git a/matrix_service/requirements.txt b/matrix_service/requirements.txt index 170594a..8f2bc1e 100644 --- a/matrix_service/requirements.txt +++ b/matrix_service/requirements.txt @@ -1,4 +1,4 @@ -matrix-nio[e2e]>=0.25.0 +matrix-nio[e2e]>=0.25.2 python-dotenv>=1.0.0 httpx>=0.23.0 pydantic>=1.10 \ No newline at end of file