Files
scrapAPI/app/security.py

68 lines
2.5 KiB
Python

import bcrypt
from jose import jwt, JWTError
from datetime import datetime, timedelta, timezone
from fastapi import HTTPException, status, Request
import os
from pydantic import BaseModel
from typing import Optional
# Secret and algorithm for JWT
SECRET_KEY = os.getenv('SECRET_KEY', 'your_jwt_secret_key') # Ensure this is set in your environment
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 14
class TokenData(BaseModel):
user_id: Optional[int] = None
# Hash password using bcrypt directly
def get_password_hash(password: str) -> str:
"""Hashes the password using bcrypt."""
salt = bcrypt.gensalt() # Generate a salt
hashed_password = bcrypt.hashpw(password.encode('utf-8'), salt) # Hash the password
return hashed_password.decode('utf-8') # Return as a string
# Verify password using bcrypt directly
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verifies if the plain password matches the hashed password."""
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
# Create JWT token
def create_access_token(data: dict, expires_delta: timedelta = None):
"""Creates a JWT token with expiration time."""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_access_token(request: Request) -> int:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# First, check Authorization header (for cases where the JWT is passed in headers)
auth_header = request.headers.get("Authorization")
token = None
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
# If no Authorization header, fallback to cookies
if not token:
token = request.cookies.get("access_token")
if not token:
raise credentials_exception
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("user_id")
if user_id is None:
raise credentials_exception
return user_id
except JWTError:
raise credentials_exception