feat: auth backend implementation
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
from sqlalchemy.orm import Session
|
||||
from app import models, schemas
|
||||
from app.utils import hash_password, verify_password
|
||||
|
||||
|
||||
def get_item(db: Session, item_id: int):
|
||||
@@ -24,3 +25,34 @@ def delete_item(db: Session, item_id: int):
|
||||
db.delete(item)
|
||||
db.commit()
|
||||
return item
|
||||
|
||||
|
||||
# User / Auth
|
||||
|
||||
|
||||
def authenticate_user(db: Session, username: str, password: str):
|
||||
user = get_user_by_username(db, username)
|
||||
if not user:
|
||||
return None
|
||||
if not verify_password(password, str(user.hashed_password)):
|
||||
return None
|
||||
return user
|
||||
|
||||
|
||||
def get_user_by_username(db: Session, username: str):
|
||||
return db.query(models.User).filter(models.User.username == username).first()
|
||||
|
||||
|
||||
def get_user_by_email(db: Session, email: str):
|
||||
return db.query(models.User).filter(models.User.email == email).first()
|
||||
|
||||
|
||||
def create_user(db: Session, user: schemas.UserCreate):
|
||||
hashed_pw = hash_password(user.password)
|
||||
db_user = models.User(
|
||||
username=user.username, email=user.email, hashed_password=hashed_pw
|
||||
)
|
||||
db.add(db_user)
|
||||
db.commit()
|
||||
db.refresh(db_user)
|
||||
return db_user
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
|
||||
class Logger:
|
||||
def __init__(self) -> None:
|
||||
self.logger = logging.getLogger(os.getenv("LOGGER", "backend-logger"))
|
||||
self.logger.setLevel(logging.INFO)
|
||||
+37
-1
@@ -1,5 +1,8 @@
|
||||
from fastapi import FastAPI, Depends, HTTPException
|
||||
from fastapi import FastAPI, Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.utils import create_access_token
|
||||
from . import schemas, crud
|
||||
from .database import SessionLocal, engine, Base
|
||||
|
||||
@@ -46,3 +49,36 @@ def delete_item(item_id: int, db: Session = Depends(get_db)):
|
||||
if item is None:
|
||||
raise HTTPException(status_code=404, detail="Item not found")
|
||||
return item
|
||||
|
||||
|
||||
# Users
|
||||
|
||||
|
||||
@app.post("/login", response_model=schemas.Token)
|
||||
def user_login(
|
||||
form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)
|
||||
):
|
||||
user = crud.authenticate_user(db, form_data.username, form_data.password)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorrect username or password",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
access_token = create_access_token(data={"sub": user.username})
|
||||
return {"access_token": access_token, "token_type": "bearer"}
|
||||
|
||||
|
||||
@app.post("/register", response_model=schemas.UserOut)
|
||||
def register_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
|
||||
if crud.get_user_by_username(db, user.username):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Username already registered",
|
||||
)
|
||||
if crud.get_user_by_email(db, user.email):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Account with that email already registered",
|
||||
)
|
||||
return crud.create_user(db, user)
|
||||
|
||||
+12
-1
@@ -1,4 +1,4 @@
|
||||
from sqlalchemy import JSON, Column, Integer, String
|
||||
from sqlalchemy import JSON, Boolean, Column, Integer, String
|
||||
from .database import Base
|
||||
|
||||
|
||||
@@ -9,3 +9,14 @@ class Item(Base):
|
||||
name = Column(String, index=True)
|
||||
description = Column(String, nullable=True)
|
||||
body = Column(JSON, nullable=False)
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "users"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
username = Column(String, unique=True, nullable=False, index=True)
|
||||
email = Column(String, unique=True, nullable=False, index=True)
|
||||
hashed_password = Column(String, nullable=False)
|
||||
permissions = Column(JSON, nullable=False)
|
||||
subscriber = Column(Boolean, nullable=False)
|
||||
|
||||
+37
-1
@@ -1,4 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, EmailStr
|
||||
|
||||
|
||||
# DB Schemas
|
||||
|
||||
|
||||
class ItemBase(BaseModel):
|
||||
@@ -12,3 +15,36 @@ class Item(ItemBase):
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class UserBase(BaseModel):
|
||||
username: str
|
||||
email: EmailStr
|
||||
|
||||
|
||||
class UserCreate(UserBase):
|
||||
password: str
|
||||
|
||||
|
||||
class UserOut(UserBase):
|
||||
id: int
|
||||
|
||||
class Config:
|
||||
orm_mode = True
|
||||
|
||||
|
||||
# Other Schemas
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
access_token: str
|
||||
token_type: str
|
||||
|
||||
|
||||
class TokenData(BaseModel):
|
||||
username: str | None = None
|
||||
|
||||
|
||||
class UserLogin(BaseModel):
|
||||
username: str
|
||||
password: str
|
||||
|
||||
@@ -0,0 +1,41 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Mapping
|
||||
from passlib.context import CryptContext
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from jose import JWTError, jwt
|
||||
from app.logger_config import Logger
|
||||
|
||||
pwd_context = CryptContext(schemas=["bcrypt"], deprecated="auto")
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
return pwd_context.hash(password)
|
||||
|
||||
|
||||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||||
return pwd_context.verify(plain_password, hashed_password)
|
||||
|
||||
|
||||
# FIXME: Remove hard coded vars
|
||||
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "")
|
||||
ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256")
|
||||
EXPIRATION_MINS = os.getenv("JWT_EXPIRATION_MINS", "10")
|
||||
|
||||
|
||||
def create_access_token(
|
||||
data: dict, expires_delta: timedelta = timedelta(minutes=int(EXPIRATION_MINS))
|
||||
):
|
||||
to_encode = data.copy()
|
||||
expire = datetime.now(UTC) + expires_delta
|
||||
to_encode.update({"exp": expire})
|
||||
|
||||
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
|
||||
|
||||
def decode_access_token(token: str) -> Mapping[Any, Any] | None:
|
||||
try:
|
||||
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
except JWTError:
|
||||
logging.exception(msg="Failed to Decode JWT", extra={"TOKEN": token})
|
||||
return None
|
||||
Reference in New Issue
Block a user