Source code for lightodm.connection

"""
MongoDB Connection Manager

Thread-safe singleton connection manager for MongoDB supporting both sync (pymongo)
and async (motor) clients with automatic cleanup.
"""

import atexit
import os
import threading
from typing import Optional

from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.database import Database


[docs] class MongoConnection: """ Singleton MongoDB connection manager supporting both sync (pymongo) and async (motor) clients with thread-safety. The connection is configured via environment variables: - MONGO_URL: MongoDB connection URL - MONGO_USER: MongoDB username - MONGO_PASSWORD: MongoDB password - MONGO_DB_NAME: Database name Example: # Sync usage conn = MongoConnection() db = conn.database collection = conn.get_collection("users") # Async usage conn = MongoConnection() client = await conn.get_async_client() db = await conn.get_async_database() """ _instance: Optional["MongoConnection"] = None _lock = threading.Lock() # Sync client _client: Optional[MongoClient] = None _db: Optional[Database] = None # Async client (motor) _async_client: Optional[AsyncIOMotorClient] = None def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): # Initialize sync client eagerly (to keep existing behavior) if self._client is None: self._initialize_connection() def _initialize_connection(self): """Initialize synchronous MongoDB connection""" mongo_url = os.environ.get("MONGO_URL") mongo_user = os.environ.get("MONGO_USER") mongo_password = os.environ.get("MONGO_PASSWORD") mongo_db_name = os.environ.get("MONGO_DB_NAME") if not all([mongo_url, mongo_user, mongo_password]): raise ValueError( "MongoDB connection parameters are not set. " "Please set MONGO_URL, MONGO_USER, and MONGO_PASSWORD environment variables." ) try: self._client = MongoClient( mongo_url, username=mongo_user, password=mongo_password, maxPoolSize=50, minPoolSize=5, maxIdleTimeMS=30000, serverSelectionTimeoutMS=5000, socketTimeoutMS=20000, connectTimeoutMS=20000, heartbeatFrequencyMS=10000, retryWrites=True, retryReads=True, maxConnecting=2, waitQueueTimeoutMS=10000, ) self._db = ( self._client[mongo_db_name] if mongo_db_name else self._client.get_default_database() ) # Test the connection self._client.admin.command("ping") atexit.register(self.close_connection) except Exception as e: raise ConnectionError(f"Failed to initialize MongoDB (sync) connection: {e}") from e @property def client(self) -> MongoClient: """Get the synchronous MongoDB client""" if self._client is None: self._initialize_connection() return self._client @property def database(self) -> Database: """Get the synchronous MongoDB database""" if self._db is None: self._initialize_connection() return self._db
[docs] def get_collection(self, collection_name: str) -> Collection: """ Get a synchronous collection. Args: collection_name: Name of the collection Returns: PyMongo Collection instance """ if self._db is None: self._initialize_connection() return self._db[collection_name]
[docs] async def get_async_client(self) -> AsyncIOMotorClient: """ Get or create the AsyncIOMotorClient and verify connectivity asynchronously. Returns: Motor AsyncIOMotorClient instance """ if self._async_client is None: mongo_url = os.environ.get("MONGO_URL") mongo_user = os.environ.get("MONGO_USER") mongo_password = os.environ.get("MONGO_PASSWORD") if not all([mongo_url, mongo_user, mongo_password]): raise ValueError( "MongoDB connection parameters are not set. " "Please set MONGO_URL, MONGO_USER, and MONGO_PASSWORD environment variables." ) try: # Create motor client lazily self._async_client = AsyncIOMotorClient( mongo_url, username=mongo_user, password=mongo_password, ) # Perform an async ping to ensure connectivity await self._async_client.admin.command("ping") except Exception as e: # Ensure no half-initialized client remains if self._async_client is not None: self._async_client.close() self._async_client = None raise ConnectionError( f"Failed to initialize MongoDB (async) connection: {e}" ) from e return self._async_client
[docs] async def get_async_database(self, db_name: Optional[str] = None) -> AsyncIOMotorDatabase: """ Get the asynchronous MongoDB database. Args: db_name: Optional database name override Returns: Motor AsyncIOMotorDatabase instance """ client = await self.get_async_client() if db_name: return client[db_name] mongo_db_name = os.environ.get("MONGO_DB_NAME") if not mongo_db_name: return client.get_default_database() return client[mongo_db_name]
[docs] def close_connection(self): """Close both sync and async clients if present.""" # Close sync client if self._client: try: self._client.close() except Exception: pass finally: self._client = None self._db = None # Close async client if self._async_client: try: # Motor's close is synchronous method self._async_client.close() except Exception: pass finally: self._async_client = None
# Global connection instance _mongo_conn: Optional[MongoConnection] = None def get_mongo_connection() -> MongoConnection: """ Get the singleton MongoConnection instance. Returns: MongoConnection singleton """ global _mongo_conn if _mongo_conn is None: _mongo_conn = MongoConnection() return _mongo_conn def get_collection(collection_name: str) -> Collection: """ Get a MongoDB collection by name using the singleton connection (sync). Args: collection_name: Name of the collection Returns: PyMongo Collection instance """ conn = get_mongo_connection() return conn.get_collection(collection_name) async def get_async_database(db_name: Optional[str] = None) -> AsyncIOMotorDatabase: """ Get asynchronous MongoDB database using the singleton connection. Args: db_name: Optional database name override Returns: Motor AsyncIOMotorDatabase instance """ conn = get_mongo_connection() return await conn.get_async_database(db_name) def get_database() -> Database: """ Get synchronous MongoDB database using the singleton connection. Returns: PyMongo Database instance """ conn = get_mongo_connection() return conn.database def get_client() -> MongoClient: """ Get synchronous MongoDB client using the singleton connection. Returns: PyMongo MongoClient instance """ conn = get_mongo_connection() return conn.client async def get_async_client() -> AsyncIOMotorClient: """ Get asynchronous MongoDB client using the singleton connection. Returns: Motor AsyncIOMotorClient instance """ conn = get_mongo_connection() return await conn.get_async_client()
[docs] def connect( url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, db_name: Optional[str] = None, ) -> Database: """ Initialize MongoDB connection with optional explicit parameters. If parameters are not provided, they will be read from environment variables: - MONGO_URL - MONGO_USER - MONGO_PASSWORD - MONGO_DB_NAME Args: url: MongoDB connection URL (optional) username: MongoDB username (optional) password: MongoDB password (optional) db_name: Database name (optional) Returns: PyMongo Database instance Example:: # Connect with explicit parameters db = connect( url="mongodb://localhost:27017", username="myuser", password="mypass", db_name="mydb" ) # Or use environment variables db = connect() """ # Set environment variables if provided if url: os.environ["MONGO_URL"] = url if username: os.environ["MONGO_USER"] = username if password: os.environ["MONGO_PASSWORD"] = password if db_name: os.environ["MONGO_DB_NAME"] = db_name # Initialize connection and return database return get_database()