"""Authentication utilities.""" from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from ..utils.config import settings from ..utils.logging import get_logger logger = get_logger(__name__) # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # HTTP Bearer for JWT security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash. Args: plain_password: Plain text password hashed_password: Hashed password Returns: True if password matches """ return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password. Args: password: Plain text password Returns: Hashed password """ return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token. Args: data: Data to encode in token expires_delta: Token expiration time Returns: JWT token string """ to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRATION_HOURS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm="HS256") return encoded_jwt def verify_token(token: str) -> dict: """Verify and decode JWT token. Args: token: JWT token string Returns: Decoded token payload Raises: HTTPException: If token is invalid """ try: payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=["HS256"]) return payload except JWTError as e: logger.error(f"Token verification failed: {e}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) def authenticate_user(email: str, password: str) -> Optional[dict]: """Authenticate user with email and password. Args: email: User email password: User password Returns: User data if authenticated, None otherwise """ # Check against admin credentials from environment if email == settings.ADMIN_EMAIL and password == settings.ADMIN_PASSWORD: return { "email": email, "role": "admin" } return None async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict: """Get current authenticated user from JWT token. Args: credentials: HTTP Bearer credentials Returns: User data from token Raises: HTTPException: If authentication fails """ token = credentials.credentials payload = verify_token(token) email: str = payload.get("sub") if email is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) return { "email": email, "role": payload.get("role", "user") } async def require_auth(current_user: dict = Depends(get_current_user)) -> dict: """Dependency to require authentication. Args: current_user: Current user from get_current_user Returns: Current user data """ return current_user