from sqlalchemy.orm import Session from app import models, schemas from app.utils import hash_password, verify_password from typing import List, Optional def get_blog(db: Session, blog_id: int) -> Optional[models.Blog]: return db.query(models.Blog).filter(models.Blog.id == blog_id).first() def get_blogs( db: Session, skip: int = 0, limit: int = 10, author_id: Optional[int] = None, visibility: Optional[str] = None, ) -> List[models.Blog]: q = db.query(models.Blog) if author_id is not None: q = q.filter(models.Blog.author_id == author_id) if visibility is not None: q = q.filter(models.Blog.visibility == visibility) return q.offset(skip).limit(limit).all() def create_blog(db: Session, blog_in: schemas.BlogCreate) -> models.Blog: db_blog = models.Blog(**blog_in.model_dump()) db.add(db_blog) db.commit() db.refresh(db_blog) return db_blog def update_blog( db: Session, blog_id: int, blog_in: schemas.BlogUpdate ) -> Optional[models.Blog]: db_blog = get_blog(db, blog_id) if not db_blog: return None update_data = blog_in.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_blog, field, value) db.commit() db.refresh(db_blog) return db_blog def delete_blog(db: Session, blog_id: int) -> Optional[models.Blog]: db_blog = get_blog(db, blog_id) if not db_blog: return None db.delete(db_blog) db.commit() return db_blog def increment_view_count(db: Session, blog_id: int) -> None: db.query(models.Blog).filter(models.Blog.id == blog_id).update( {"view_count": models.Blog.view_count + 1} ) db.commit() def add_like(db: Session, blog_id: int) -> None: db.query(models.Blog).filter(models.Blog.id == blog_id).update( {"like_count": models.Blog.like_count + 1} ) db.commit() def delete_blogs(db: Session, item_id: int): item = db.query(models.Blog).filter(models.Blog.id == item_id).first() if item: db.delete(item) db.commit() return item # User / Auth def authenticate_user(db: Session, username: str, password: str): user = get_user_by_username(db, username) or get_user_by_email(db, username) if not user: return None if not verify_password(password, user.hashed_password): return None return user def get_user_by_username(db: Session, username: str): return db.query(models.User).filter(models.User.username == username).first() def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first() def create_user(db: Session, user: schemas.UserCreate): hashed_pw = hash_password(user.password) db_user = models.User( username=user.username, email=user.email, hashed_password=hashed_pw, permissions=user.permissions, subscriber=user.subscriber, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user