Files

112 lines
3.0 KiB
Python

from sqlalchemy.orm import Session
from app import models, schemas
from app.utils import hash_password, verify_password
from typing import List, Optional
def get_blog(db: Session, blog_id: int) -> Optional[models.Blog]:
return db.query(models.Blog).filter(models.Blog.id == blog_id).first()
def get_blogs(
db: Session,
skip: int = 0,
limit: int = 10,
author_id: Optional[int] = None,
visibility: Optional[str] = None,
) -> List[models.Blog]:
q = db.query(models.Blog)
if author_id is not None:
q = q.filter(models.Blog.author_id == author_id)
if visibility is not None:
q = q.filter(models.Blog.visibility == visibility)
return q.offset(skip).limit(limit).all()
def create_blog(db: Session, blog_in: schemas.BlogCreate) -> models.Blog:
db_blog = models.Blog(**blog_in.model_dump())
db.add(db_blog)
db.commit()
db.refresh(db_blog)
return db_blog
def update_blog(
db: Session, blog_id: int, blog_in: schemas.BlogUpdate
) -> Optional[models.Blog]:
db_blog = get_blog(db, blog_id)
if not db_blog:
return None
update_data = blog_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_blog, field, value)
db.commit()
db.refresh(db_blog)
return db_blog
def delete_blog(db: Session, blog_id: int) -> Optional[models.Blog]:
db_blog = get_blog(db, blog_id)
if not db_blog:
return None
db.delete(db_blog)
db.commit()
return db_blog
def increment_view_count(db: Session, blog_id: int) -> None:
db.query(models.Blog).filter(models.Blog.id == blog_id).update(
{"view_count": models.Blog.view_count + 1}
)
db.commit()
def add_like(db: Session, blog_id: int) -> None:
db.query(models.Blog).filter(models.Blog.id == blog_id).update(
{"like_count": models.Blog.like_count + 1}
)
db.commit()
def delete_blogs(db: Session, item_id: int):
item = db.query(models.Blog).filter(models.Blog.id == item_id).first()
if item:
db.delete(item)
db.commit()
return item
# User / Auth
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_username(db, username) or get_user_by_email(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def get_user_by_username(db: Session, username: str):
return db.query(models.User).filter(models.User.username == username).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def create_user(db: Session, user: schemas.UserCreate):
hashed_pw = hash_password(user.password)
db_user = models.User(
username=user.username,
email=user.email,
hashed_password=hashed_pw,
permissions=user.permissions,
subscriber=user.subscriber,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user