112 lines
3.0 KiB
Python
112 lines
3.0 KiB
Python
from sqlalchemy.orm import Session
|
|
from app import models, schemas
|
|
from app.utils import hash_password, verify_password
|
|
from typing import List, Optional
|
|
|
|
|
|
def get_blog(db: Session, blog_id: int) -> Optional[models.Blog]:
|
|
return db.query(models.Blog).filter(models.Blog.id == blog_id).first()
|
|
|
|
|
|
def get_blogs(
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 10,
|
|
author_id: Optional[int] = None,
|
|
visibility: Optional[str] = None,
|
|
) -> List[models.Blog]:
|
|
q = db.query(models.Blog)
|
|
if author_id is not None:
|
|
q = q.filter(models.Blog.author_id == author_id)
|
|
if visibility is not None:
|
|
q = q.filter(models.Blog.visibility == visibility)
|
|
return q.offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_blog(db: Session, blog_in: schemas.BlogCreate) -> models.Blog:
|
|
db_blog = models.Blog(**blog_in.model_dump())
|
|
db.add(db_blog)
|
|
db.commit()
|
|
db.refresh(db_blog)
|
|
return db_blog
|
|
|
|
|
|
def update_blog(
|
|
db: Session, blog_id: int, blog_in: schemas.BlogUpdate
|
|
) -> Optional[models.Blog]:
|
|
db_blog = get_blog(db, blog_id)
|
|
if not db_blog:
|
|
return None
|
|
update_data = blog_in.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_blog, field, value)
|
|
db.commit()
|
|
db.refresh(db_blog)
|
|
return db_blog
|
|
|
|
|
|
def delete_blog(db: Session, blog_id: int) -> Optional[models.Blog]:
|
|
db_blog = get_blog(db, blog_id)
|
|
if not db_blog:
|
|
return None
|
|
db.delete(db_blog)
|
|
db.commit()
|
|
return db_blog
|
|
|
|
|
|
def increment_view_count(db: Session, blog_id: int) -> None:
|
|
db.query(models.Blog).filter(models.Blog.id == blog_id).update(
|
|
{"view_count": models.Blog.view_count + 1}
|
|
)
|
|
db.commit()
|
|
|
|
|
|
def add_like(db: Session, blog_id: int) -> None:
|
|
db.query(models.Blog).filter(models.Blog.id == blog_id).update(
|
|
{"like_count": models.Blog.like_count + 1}
|
|
)
|
|
db.commit()
|
|
|
|
|
|
def delete_blogs(db: Session, item_id: int):
|
|
item = db.query(models.Blog).filter(models.Blog.id == item_id).first()
|
|
if item:
|
|
db.delete(item)
|
|
db.commit()
|
|
return item
|
|
|
|
|
|
# User / Auth
|
|
|
|
|
|
def authenticate_user(db: Session, username: str, password: str):
|
|
user = get_user_by_username(db, username) or get_user_by_email(db, username)
|
|
if not user:
|
|
return None
|
|
if not verify_password(password, user.hashed_password):
|
|
return None
|
|
return user
|
|
|
|
|
|
def get_user_by_username(db: Session, username: str):
|
|
return db.query(models.User).filter(models.User.username == username).first()
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str):
|
|
return db.query(models.User).filter(models.User.email == email).first()
|
|
|
|
|
|
def create_user(db: Session, user: schemas.UserCreate):
|
|
hashed_pw = hash_password(user.password)
|
|
db_user = models.User(
|
|
username=user.username,
|
|
email=user.email,
|
|
hashed_password=hashed_pw,
|
|
permissions=user.permissions,
|
|
subscriber=user.subscriber,
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
return db_user
|