Usage Examples

This page demonstrates real-world usage patterns for LightODM.

FastAPI Integration

Complete example of using LightODM with FastAPI for a REST API:

from fastapi import FastAPI, HTTPException
from lightodm import connect, MongoBaseModel
from typing import Optional, List
from pydantic import EmailStr

# Initialize connection on startup
app = FastAPI()

@app.on_event("startup")
async def startup():
    connect(db_name="myapp")

# Define model
class User(MongoBaseModel):
    class Settings:
        name = "users"

    name: str
    email: EmailStr
    age: Optional[int] = None

# Create user endpoint
@app.post("/users/", response_model=User)
async def create_user(user: User):
    await user.asave()
    return user

# Get user endpoint
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: str):
    user = await User.aget(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# List users endpoint
@app.get("/users/", response_model=List[User])
async def list_users(skip: int = 0, limit: int = 10):
    users = await User.afind({}, skip=skip, limit=limit)
    return users

# Update user endpoint
@app.put("/users/{user_id}", response_model=User)
async def update_user(user_id: str, user: User):
    existing = await User.aget(user_id)
    if not existing:
        raise HTTPException(status_code=404, detail="User not found")

    user.id = user_id
    await user.asave()
    return user

# Delete user endpoint
@app.delete("/users/{user_id}")
async def delete_user(user_id: str):
    user = await User.aget(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    await user.adelete()
    return {"message": "User deleted"}

Custom Connection Management

For advanced scenarios, implement custom connection logic:

from lightodm import MongoBaseModel
from pymongo import MongoClient
from motor.motor_asyncio import AsyncIOMotorClient
import os

# Global connection instances
SYNC_CLIENT = None
ASYNC_CLIENT = None
DB_NAME = os.getenv("MONGO_DB_NAME", "myapp")

def get_sync_client():
    global SYNC_CLIENT
    if SYNC_CLIENT is None:
        SYNC_CLIENT = MongoClient(os.getenv("MONGO_URL"))
    return SYNC_CLIENT

async def get_async_client():
    global ASYNC_CLIENT
    if ASYNC_CLIENT is None:
        ASYNC_CLIENT = AsyncIOMotorClient(os.getenv("MONGO_URL"))
    return ASYNC_CLIENT

# Base model with custom connection
class MyBaseModel(MongoBaseModel):
    @classmethod
    def get_collection(cls):
        client = get_sync_client()
        return client[DB_NAME][cls.Settings.name]

    @classmethod
    async def get_async_collection(cls):
        client = await get_async_client()
        return client[DB_NAME][cls.Settings.name]

# Use in models
class User(MyBaseModel):
    class Settings:
        name = "users"

    name: str
    email: str

Multi-Tenant Application

Implement multi-tenancy by dynamically selecting database:

from lightodm import MongoBaseModel
from pymongo import MongoClient
from motor.motor_asyncio import AsyncIOMotorClient
from contextvars import ContextVar

# Context variable to track current tenant
current_tenant: ContextVar[str] = ContextVar("current_tenant")

client = MongoClient("mongodb://localhost:27017")
async_client = AsyncIOMotorClient("mongodb://localhost:27017")

class TenantModel(MongoBaseModel):
    """Base model for multi-tenant collections"""

    @classmethod
    def get_collection(cls):
        tenant = current_tenant.get()
        db_name = f"tenant_{tenant}"
        return client[db_name][cls.Settings.name]

    @classmethod
    async def get_async_collection(cls):
        tenant = current_tenant.get()
        db_name = f"tenant_{tenant}"
        return async_client[db_name][cls.Settings.name]

class User(TenantModel):
    class Settings:
        name = "users"

    name: str
    email: str

# Usage with FastAPI
from fastapi import FastAPI, Header

app = FastAPI()

@app.get("/users/")
async def list_users(x_tenant_id: str = Header(...)):
    # Set tenant context
    current_tenant.set(x_tenant_id)

    # Query uses tenant-specific database
    users = await User.afind({})
    return users

Custom ID Generation

Generate custom IDs based on business logic:

from lightodm import MongoBaseModel, generate_id
from pydantic import Field

class Order(MongoBaseModel):
    class Settings:
        name = "orders"

    # Generate deterministic ID from user_id and timestamp
    id: str = Field(
        default_factory=lambda: generate_id(
            prefix="ORDER",
            timestamp=datetime.now().timestamp()
        )
    )

    user_id: str
    total: float
    items: List[str]

# Or override __init__
class Product(MongoBaseModel):
    class Settings:
        name = "products"

    sku: str
    name: str
    price: float

    def __init__(self, **data):
        if "id" not in data:
            # Use SKU as ID
            data["id"] = generate_id(sku=data["sku"])
        super().__init__(**data)

Bulk Operations

Efficiently process large datasets:

from lightodm import MongoBaseModel
from typing import List

class User(MongoBaseModel):
    class Settings:
        name = "users"

    name: str
    email: str

# Bulk insert
async def bulk_create_users(users_data: List[dict]):
    users = [User(**data) for data in users_data]
    ids = await User.ainsert_many(users)
    return ids

# Batch processing with iteration
async def process_all_users():
    batch_size = 100
    batch = []

    async for user in User.afind_iter({}):
        batch.append(user)

        if len(batch) >= batch_size:
            # Process batch
            await process_batch(batch)
            batch = []

    # Process remaining
    if batch:
        await process_batch(batch)

async def process_batch(users: List[User]):
    # Process users in batch
    for user in users:
        # Do something
        pass

Aggregation Pipeline

Complex analytics with MongoDB aggregation:

from lightodm import MongoBaseModel
from datetime import datetime, timedelta

class Order(MongoBaseModel):
    class Settings:
        name = "orders"

    user_id: str
    total: float
    created_at: datetime
    status: str

# Revenue by date
async def get_daily_revenue(start_date: datetime, end_date: datetime):
    pipeline = [
        {
            "$match": {
                "created_at": {"$gte": start_date, "$lte": end_date},
                "status": "completed"
            }
        },
        {
            "$group": {
                "_id": {
                    "$dateToString": {
                        "format": "%Y-%m-%d",
                        "date": "$created_at"
                    }
                },
                "revenue": {"$sum": "$total"},
                "count": {"$sum": 1}
            }
        },
        {"$sort": {"_id": 1}}
    ]

    results = await Order.aaggregate(pipeline)
    return results

# Top customers
async def get_top_customers(limit: int = 10):
    pipeline = [
        {
            "$group": {
                "_id": "$user_id",
                "total_spent": {"$sum": "$total"},
                "order_count": {"$sum": 1}
            }
        },
        {"$sort": {"total_spent": -1}},
        {"$limit": limit}
    ]

    return await Order.aaggregate(pipeline)

Indexing

Create indexes for query optimization:

from lightodm import MongoBaseModel
from pymongo import ASCENDING, DESCENDING

class User(MongoBaseModel):
    class Settings:
        name = "users"

    email: str
    name: str
    created_at: datetime

# Create indexes on startup
async def create_indexes():
    collection = await User.get_async_collection()

    # Unique email index
    await collection.create_index(
        [("email", ASCENDING)],
        unique=True
    )

    # Compound index for sorting
    await collection.create_index([
        ("created_at", DESCENDING),
        ("name", ASCENDING)
    ])

    # Text search index
    await collection.create_index([("name", "text")])

# Use in FastAPI startup
@app.on_event("startup")
async def startup():
    await create_indexes()

Soft Delete Pattern

Implement soft deletes instead of permanent deletion:

from lightodm import MongoBaseModel
from datetime import datetime
from typing import Optional

class SoftDeleteModel(MongoBaseModel):
    """Base model with soft delete support"""
    deleted_at: Optional[datetime] = None

    async def soft_delete(self):
        """Mark as deleted instead of removing"""
        self.deleted_at = datetime.now()
        await self.asave()

    @classmethod
    async def find_active(cls, filter: dict = None, **kwargs):
        """Find only non-deleted documents"""
        filter = filter or {}
        filter["deleted_at"] = None
        return await cls.afind(filter, **kwargs)

    @classmethod
    async def find_deleted(cls, filter: dict = None, **kwargs):
        """Find only deleted documents"""
        filter = filter or {}
        filter["deleted_at"] = {"$ne": None}
        return await cls.afind(filter, **kwargs)

class User(SoftDeleteModel):
    class Settings:
        name = "users"

    name: str
    email: str

# Usage
user = await User.aget(user_id)
await user.soft_delete()  # Soft delete

# Find only active users
active_users = await User.find_active()

# Find deleted users
deleted_users = await User.find_deleted()

Validation and Hooks

Add custom validation and pre/post save hooks:

from lightodm import MongoBaseModel
from pydantic import field_validator, model_validator
from datetime import datetime
import hashlib

class User(MongoBaseModel):
    class Settings:
        name = "users"

    email: str
    password_hash: str
    name: str
    created_at: datetime = Field(default_factory=datetime.now)
    updated_at: datetime = Field(default_factory=datetime.now)

    @field_validator("email")
    @classmethod
    def validate_email(cls, v: str) -> str:
        """Ensure email is lowercase"""
        return v.lower()

    @model_validator(mode="before")
    @classmethod
    def hash_password(cls, values):
        """Hash password if plain password provided"""
        if "password" in values:
            password = values.pop("password")
            values["password_hash"] = hashlib.sha256(
                password.encode()
            ).hexdigest()
        return values

    async def asave(self, **kwargs):
        """Update timestamp before saving"""
        self.updated_at = datetime.now()
        return await super().asave(**kwargs)

# Usage
user = User(
    email="USER@EXAMPLE.COM",  # Will be lowercased
    password="secret123",  # Will be hashed
    name="John"
)
await user.asave()

Error Handling

Proper error handling for database operations:

from lightodm import MongoBaseModel
from pymongo.errors import DuplicateKeyError, ConnectionFailure
from fastapi import HTTPException

class User(MongoBaseModel):
    class Settings:
        name = "users"

    email: str
    name: str

async def create_user_safe(email: str, name: str):
    """Create user with error handling"""
    try:
        user = User(email=email, name=name)
        await user.asave()
        return user
    except DuplicateKeyError:
        raise HTTPException(
            status_code=400,
            detail="User with this email already exists"
        )
    except ConnectionFailure:
        raise HTTPException(
            status_code=503,
            detail="Database connection failed"
        )
    except Exception as e:
        raise HTTPException(
            status_code=500,
            detail=f"Unexpected error: {str(e)}"
        )

Testing with Mock

Test your models using mongomock:

import pytest
import mongomock
from lightodm import MongoBaseModel

class User(MongoBaseModel):
    class Settings:
        name = "users"

    name: str
    email: str

@pytest.fixture
def mock_db():
    """Provide mock MongoDB for testing"""
    client = mongomock.MongoClient()
    return client.test_db

@pytest.fixture
def user_model(mock_db, monkeypatch):
    """Override User model to use mock collection"""
    monkeypatch.setattr(
        User,
        "get_collection",
        lambda: mock_db.users
    )
    return User

def test_create_user(user_model):
    """Test user creation"""
    user = user_model(name="John", email="john@example.com")
    user.save()

    # Verify user was saved
    assert user.id is not None

    # Retrieve user
    found = user_model.get(user.id)
    assert found.name == "John"
    assert found.email == "john@example.com"

See the examples/ directory in the repository for complete working examples.