diff --git a/.env.example b/.env.example index 7280ffe..2911e09 100644 --- a/.env.example +++ b/.env.example @@ -2,9 +2,13 @@ LOG_LEVEL=INFO # Matrix Configuration -HOMESERVER_URL = "https://matrix.org" -USER_ID = "@botbot_user:matrix.org" -PASSWORD = "botbot_password" +MATRIX_HOMESERVER_URL="https://matrix.org" +MATRIX_USER_ID="@botbot_user:matrix.org" +MATRIX_PASSWORD="botbot_password" +MATRIX_LOGIN_TRIES=5 # Number of login attempts before giving up, default is 5 +MATRIX_LOGIN_DELAY_INCREMENT=5 # Delay increment,in seconds, between login attempts, default is 5 +AI_HANDLER_URL="http://ai_service:8000" # OpenAI API Key OPENAI_API_KEY=your_openai_api_key_here +AI_HANDLER_TOKEN=common_token_here diff --git a/ai_service/Dockerfile b/ai_service/Dockerfile new file mode 100644 index 0000000..1d3d01f --- /dev/null +++ b/ai_service/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt ./ + +RUN pip install --no-cache-dir -r requirements.txt + +COPY main.py ./ + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] + diff --git a/ai_service/main.py b/ai_service/main.py new file mode 100644 index 0000000..c44fd2b --- /dev/null +++ b/ai_service/main.py @@ -0,0 +1,66 @@ +import os +from dotenv import load_dotenv +from fastapi import FastAPI, Header, HTTPException +from pydantic import BaseModel +from openai import OpenAI +import logging +import redis + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +AI_TOKEN = os.environ["AI_HANDLER_TOKEN"] + +AIclient = OpenAI( + api_key=os.environ["OPENAI_API_KEY"], +) + +r = redis.Redis.from_url(os.environ.get("REDIS_URL", "redis://redis:6379")) + +# --- Logging Setup --- +numeric_level = getattr(logging, LOG_LEVEL, logging.INFO) +logging.basicConfig( + level=numeric_level, + format="%(asctime)s %(levelname)s %(name)s: %(message)s" +) +logger = logging.getLogger(__name__) + +class MessagePayload(BaseModel): + roomId: str + userId: str + eventId: str + serverTimestamp: int + content: str + +app = FastAPI() + +@app.post("/api/v1/message") +async def message( + payload: MessagePayload, + authorization: str = Header(None) +): + if authorization != f"Bearer {AI_TOKEN}": + raise HTTPException(status_code=401, detail="Unauthorized") + + # Idempotency: ignore duplicates + if r.get(payload.eventId): + return {"reply": r.get(payload.eventId).decode()} + + # Build prompt (very simple example) + prompt = f"User {payload.userId} said: {payload.content}\nBot:" + chat_response = AIclient.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": prompt} + ], + max_tokens=150, + n=1, + stop=None, + temperature=0.7, + ) + + reply = chat_response.choices[0].message.content.strip() + + # Cache reply for idempotency + r.set(payload.eventId, reply, ex=3600) + + return {"reply": reply} diff --git a/ai_service/requirements.txt b/ai_service/requirements.txt new file mode 100644 index 0000000..fe52b67 --- /dev/null +++ b/ai_service/requirements.txt @@ -0,0 +1,6 @@ +python-dotenv>=1.0.0 +openai>=1.0.0 +fastapi>=0.95 +uvicorn>=0.22 +redis>=4.5 +pydantic>=1.10 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 4a8fdf8..c7221a2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,29 @@ services: - botbot: - build: . - env_file: - - .env + matrix_service: + build: ./matrix_service + environment: + - MATRIX_HOMESERVER_URL + - MATRIX_USER_ID + - MATRIX_PASSWORD + - AI_HANDLER_URL + - AI_HANDLER_TOKEN volumes: - - ./:/app # Mount source for hot-reload - - matrix_data:/app/data # Persist Matrix client store and tokens + - ./matrix_service:/app + - matrix_data:/app/data + depends_on: + - ai_service + + ai_service: + build: ./ai_service + environment: + - OPENAI_API_KEY=${OPENAI_API_KEY} + - AI_HANDLER_TOKEN=${AI_HANDLER_TOKEN} + - REDIS_URL=redis://redis:6379 + depends_on: + - redis + + redis: + image: redis:7 restart: unless-stopped volumes: diff --git a/main.py b/main.py deleted file mode 100644 index d500ca2..0000000 --- a/main.py +++ /dev/null @@ -1,129 +0,0 @@ -import os -import asyncio -import logging -from dotenv import load_dotenv -from nio import AsyncClient, AsyncClientConfig, MatrixRoom, RoomMessageText, InviteMemberEvent -from nio.responses import LoginResponse -from openai import AsyncOpenAI - -# --- Load environment variables --- -load_dotenv() -HOMESERVER_URL = os.getenv("HOMESERVER_URL") -USER_ID = os.getenv("USER_ID") -PASSWORD = os.getenv("PASSWORD") -LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() -OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") - -if not OPENAI_API_KEY: - raise RuntimeError("OPENAI_API_KEY is not set in environment") - -# --- Initialize Async OpenAI client --- -openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY) - -# --- Logging Setup --- -numeric_level = getattr(logging, LOG_LEVEL, logging.INFO) -logging.basicConfig( - level=numeric_level, - format="%(asctime)s %(levelname)s %(name)s: %(message)s" -) -logger = logging.getLogger(__name__) - -async def trust_all_devices(client) -> None: - """ - Programmatically verify all devices to allow sharing encryption keys. - """ - for room_id in client.rooms: - try: - devices = await client.room_devices(room_id) - if isinstance(devices, dict): - for user, dev_ids in devices.items(): - if user == USER_ID: - continue - for dev_id in dev_ids: - device = client.crypto.device_store.get_device(user, dev_id) - if device and not client.crypto.device_store.is_device_verified(device): - logger.info(f"Trusting {dev_id} for {user}") - client.verify_device(device) - except Exception: - logger.exception(f"Error trusting devices in {room_id}") - -async def message_callback(room: MatrixRoom, event: RoomMessageText): - """Handle incoming text messages.""" - if event.sender == USER_ID: - return - body = event.body.strip() - lower = body.lower() - logger.info("Received '%s' from %s in %s", body, event.sender, room.display_name) - - send_kwargs = { - "room_id": room.room_id, - "message_type": "m.room.message", - "ignore_unverified_devices": True - } - - # Simple ping - if lower == "!ping": - await client.room_send(**send_kwargs, content={"msgtype": "m.text", "body": "Pong!"}) - return - - # Ask OpenAI via chat completion - if lower.startswith("!ask "): - question = body[5:].strip() - if not question: - await client.room_send(**send_kwargs, content={"msgtype": "m.text", "body": "Provide a question after !ask."}) - return - logger.info("Querying OpenAI: %s", question) - try: - response = await openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": question} - ], - max_tokens=150, - timeout=300 - ) - answer = response.choices[0].message.content.strip() - except Exception: - logger.exception("OpenAI API error") - answer = "Sorry, I encountered an error contacting the AI service." - await client.room_send(**send_kwargs, content={"msgtype": "m.text", "body": answer}) - return - - # Greeting - if lower == "hello botbot": - await client.room_send(**send_kwargs, content={"msgtype": "m.text", "body": "Hello! How can I assist you today?"}) - -async def main() -> None: - """Initialize and run the Matrix bot.""" - global client - config = AsyncClientConfig(store_sync_tokens=True, encryption_enabled=True) - client = AsyncClient(HOMESERVER_URL, USER_ID, store_path="/app/data", config=config) - - login_resp = await client.login(password=PASSWORD) - if isinstance(login_resp, LoginResponse): - logger.info("Logged in as %s", USER_ID) - else: - logger.error("Login failed: %s", login_resp) - return - - await trust_all_devices(client) - - # Auto-join and trust - async def on_invite(room, event): - if isinstance(event, InviteMemberEvent): - await client.join(room.room_id) - logger.info("Joined %s", room.room_id) - await trust_all_devices(client) - client.add_event_callback(on_invite, InviteMemberEvent) - client.add_event_callback(message_callback, RoomMessageText) - - logger.info("Starting sync loop") - await client.sync_forever(timeout=30000) - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("Shutting down") - asyncio.run(client.close()) diff --git a/Dockerfile b/matrix_service/Dockerfile similarity index 58% rename from Dockerfile rename to matrix_service/Dockerfile index 8713cae..1c25f4e 100644 --- a/Dockerfile +++ b/matrix_service/Dockerfile @@ -4,10 +4,12 @@ FROM python:3.11-slim WORKDIR /app # Install system dependencies -RUN apt-get update && \ - apt-get install -y --no-install-recommends \ - libolm-dev build-essential python3-dev && \ - rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y --no-install-recommends \ + libolm-dev \ + build-essential \ + python3-dev \ + && \ + apt-get clean && rm -rf /var/lib/apt/lists/* # Install dependencies COPY requirements.txt . diff --git a/matrix_service/main.py b/matrix_service/main.py new file mode 100644 index 0000000..008f6a5 --- /dev/null +++ b/matrix_service/main.py @@ -0,0 +1,174 @@ +import os +import logging +from dotenv import load_dotenv + +import asyncio +import httpx + +from nio import AsyncClient, AsyncClientConfig, MatrixRoom, RoomMessageText, InviteMemberEvent +from nio.responses import LoginResponse + + +# --- Load environment variables --- +load_dotenv() +MATRIX_HOMESERVER_URL = os.getenv("MATRIX_HOMESERVER_URL") +MATRIX_USER_ID = os.getenv("MATRIX_USER_ID") +MATRIX_PASSWORD = os.getenv("MATRIX_PASSWORD") +MATRIX_LOGIN_TRIES = int(os.getenv("MATRIX_LOGIN_TRIES", 5)) +MATRIX_LOGIN_DELAY_INCREMENT = int(os.getenv("MATRIX_LOGIN_DELAY_INCREMENT", 5)) + +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() + +AI_HANDLER_URL = os.getenv("AI_HANDLER_URL") +AI_HANDLER_TOKEN = os.getenv("AI_HANDLER_TOKEN") + + + +# --- Logging Setup --- +numeric_level = getattr(logging, LOG_LEVEL, logging.INFO) +logging.basicConfig( + level=numeric_level, + format="%(asctime)s %(levelname)s %(name)s: %(message)s" +) +logger = logging.getLogger(__name__) + + +async def main() -> None: + + async def trust_all_devices(client) -> None: + """ + Mark every other user's device as verified so we can receive their + future Megolm keys without being blocked. + """ + for room_id in client.rooms: + try: + devices_in_room = client.room_devices(room_id) + for user_id, user_devices in devices_in_room.items(): + if user_id == client.user_id: + continue + for dev_id, device in user_devices.items(): + if not client.device_store.is_device_verified(device): + logger.info(f"Trusting {dev_id} for {user}") + client.verify_device(device) + except Exception: + logger.exception(f"Error trusting devices in {room_id}") + + async def on_invite(room, event): + """ + Handle an invite event by joining the room and trusting all devices. + """ + if isinstance(event, InviteMemberEvent): + await client.join(room.room_id) + logger.info("Joined %s", room.room_id) + await trust_all_devices(client) + + async def send_message(room: MatrixRoom, message: str) -> None: + """ + Send a message to `room`. + """ + try: + await trust_all_devices(client) + + await client.share_group_session( + room.room_id, + ignore_unverified_devices=True + ) + + await client.room_send( + room_id=room.room_id, + message_type="m.room.message", + content={ + "msgtype": "m.text", + "body": message + }, + ignore_unverified_devices=True + ) + logger.info("Sent message to %s: %s", room.room_id, message) + except Exception as e: + logger.error(f"Error sending message: {e}") + + + async def on_message(room: MatrixRoom, event: RoomMessageText) -> None: + """ + Handle incoming messages. + """ + + # Check if the message is from the bot itself + if event.sender == client.user_id: + return + + logger.info("Received '%s' from %s in %s", event.body.strip(), event.sender, room.display_name) + + + if isinstance(event, RoomMessageText): + logger.info(f"Received message in {room.room_id}: {event.body}") + payload = { + "roomId": event.room_id, + "userId": event.sender, + "eventId": event.event_id, + "serverTimestamp": event.server_timestamp, + "content": event.body + } + headers = {"Authorization": f"Bearer {AI_HANDLER_TOKEN}"} + async with httpx.AsyncClient() as http: + try: + resp = await http.post(f"{AI_HANDLER_URL}/api/v1/message", json=payload, headers=headers) + resp.raise_for_status() + data = resp.json() + if data.get("reply"): + await trust_all_devices(client) + await send_message(room, data["reply"]) + logger.info("Reply sent: %s", data["reply"]) + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}") + except Exception: + logger.exception("Error while calling AI handler") + + # --- Initialize the client --- + # Create the data directory if it doesn't exist + try: + os.makedirs("/app/data", exist_ok=True) + except Exception as e: + logger.error(f"Error creating data directory: {e}") + return + + # Initialize the client + config = AsyncClientConfig(store_sync_tokens=True, encryption_enabled=True) + client = AsyncClient(MATRIX_HOMESERVER_URL, MATRIX_USER_ID, store_path="/app/data", config=config) + + for i in range(MATRIX_LOGIN_TRIES): + try: + login_response=await client.login(password=MATRIX_PASSWORD) + break + except Exception as e: + logger.error("Login failed: %s", login_response) + logger.error(f"Login attempt {i+1} failed: {e}") + if i == MATRIX_LOGIN_TRIES - 1: + return + await asyncio.sleep(MATRIX_LOGIN_DELAY_INCREMENT * (i + 1)) + logger.info("Logged in successfully") + + logger.debug("Upload one time Olm keys") + try: + await client.keys_upload() + except Exception as e: + logger.error(f"Error uploading keys: {e}") + return + + await trust_all_devices(client) + + client.add_event_callback(on_invite, InviteMemberEvent) + client.add_event_callback(on_message, RoomMessageText) + + logger.info("Starting sync loop") + await client.sync_forever(timeout=30000) # timeout should be moved to Variable + + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Shutting down") + asyncio.run(client.close()) + diff --git a/requirements.txt b/matrix_service/requirements.txt similarity index 61% rename from requirements.txt rename to matrix_service/requirements.txt index b634f8a..170594a 100644 --- a/requirements.txt +++ b/matrix_service/requirements.txt @@ -1,3 +1,4 @@ matrix-nio[e2e]>=0.25.0 python-dotenv>=1.0.0 -openai \ No newline at end of file +httpx>=0.23.0 +pydantic>=1.10 \ No newline at end of file