تنفيذ Refresh Tokens مع JWT في FastAPI: حماية كاملة للـ API

تنفيذ Refresh Tokens مع JWT في FastAPI: حماية كاملة للـ API

في أنظمة الـ API الحديثة، الاعتماد على JWT فقط كـ Access Token بدون نظام Refresh Tokens يعتبر مخاطرة أمنية كبيرة، خصوصًا مع تطور طرق الهجمات، وسهولة سرقة التوكن من المتصفح أو من جهاز المستخدم. في هذا الدليل سنشرح بالتفصيل كيفية تطبيق FastAPI JWT Refresh Tokens لبناء نظام مصادقة آمن يدعم:

  • انتهاء صلاحية الجلسة (Access Token Expiry)
  • تجديد التوكن (Token Refresh)
  • إبطال التوكن (Token Revocation)
  • حماية كاملة للـ API مع أفضل الممارسات

إذا لم تكن لديك خبرة سابقة مع FastAPI أو بناء RESTful APIs، يمكنك مراجعة: بناء RESTful APIs باستخدام FastAPI و أفضل ممارسات تصميم RESTful APIs آمن مع أمثلة قبل الغوص في هذا الدليل.

1. فهم JWT و Refresh Tokens في FastAPI

1.1 ما هو JWT؟

JWT أو JSON Web Token هو معيار لتمثيل بيانات الوصول (Claims) بشكل مشفر (موقّع رقمياً) في شكل سلسلة نصية. يتكون من ثلاثة أجزاء:

  • Header: نوع التوكن وخوارزمية التوقيع (مثلاً HS256).
  • Payload: البيانات (claims) مثل user_id، role، exp.
  • Signature: توقيع مبني على header + payload + secret.

في FastAPI، غالباً نستخدم JWT كـ Access Token لإثبات هوية المستخدم والسماح له بالوصول لنقاط API محمية.

1.2 لماذا نحتاج Refresh Tokens؟

إذا جعلت صلاحية الـ Access Token طويلة (مثلاً 7 أيام)، فأي اختراق أو سرقة للتوكن تعني وصول مستمر للـ API لفترة طويلة. وإذا جعلتها قصيرة جداً (15 دقيقة مثلاً)، سيُطلب من المستخدم تسجيل الدخول كثيراً.

هنا يأتي دور Refresh Tokens:

  • Access Token: قصير العمر (من 5 إلى 30 دقيقة).
  • Refresh Token: أطول عمراً (أيام أو أسابيع)، يستخدم فقط لتجديد Access Token.

الفكرة: عند انتهاء الـ Access Token، يرسل العميل (Client) الـ Refresh Token للحصول على Access Token جديد دون إعادة إدخال بيانات الدخول، مع إمكانية إبطال الـ Refresh Token من السيرفر في أي وقت.

2. تصميم نظام FastAPI JWT Refresh Tokens

قبل كتابة الكود، نحتاج لتصميم بسيط يوضح مكونات النظام:

  1. نستخدم JWT Access Token قصير المدة (مثلاً 15 دقيقة).
  2. نستخدم JWT Refresh Token أو UUID يُخزن في قاعدة البيانات.
  3. إنشاء جداول في قاعدة البيانات لتخزين:
    • المستخدمين Users
    • الـ Refresh Tokens النشطة (مع معلومات مثل user_id، device، ip، تاريخ الانتهاء).
  4. نقطة تسجيل الدخول /auth/login تُرجع Access + Refresh.
  5. نقطة تجديد التوكن /auth/refresh تأخذ Refresh Token وتعيد Access جديد.
  6. نقطة تسجيل الخروج /auth/logout لإبطال Refresh Token (أو كل الجلسات).

هذا التصميم يعطي مرونة كبيرة، ويتيح:

  • إبطال جلسة معينة (جهاز واحد) أو جميع الجلسات للمستخدم.
  • متابعة نشاط الجلسات من لوحة إدارة.
  • تتبع محاولات الاختراق أو الاستخدام غير الطبيعي.

3. تجهيز مشروع FastAPI

نفترض أنك تعمل على بيئة Python 3.10+، سنبدأ بتثبيت الحزم الأساسية:

pip install fastapi uvicorn[standard] python-jose[cryptography] passlib[bcrypt] pydantic[email] sqlalchemy

سنستخدم:

  • python-jose لتوليد والتحقق من JWT
  • passlib[bcrypt] لتشفير كلمات المرور
  • SQLAlchemy للتعامل مع قاعدة البيانات

3.1 إعداد الإعدادات الأساسية وملف التهيئة

أنشئ ملف config.py:

from datetime import timedelta

class Settings:
    SECRET_KEY = "CHANGE_ME_TO_A_LONG_RANDOM_SECRET"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 15
    REFRESH_TOKEN_EXPIRE_DAYS = 7

settings = Settings()

4. نماذج المستخدمين و Refresh Tokens في قاعدة البيانات

سننشئ نموذجين رئيسيين: User و RefreshToken.

from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
from datetime import datetime, timedelta
import uuid

Base = declarative_base()

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)

    refresh_tokens = relationship("RefreshToken", back_populates="user")


class RefreshToken(Base):
    __tablename__ = "refresh_tokens"

    id = Column(Integer, primary_key=True, index=True)
    token = Column(String, unique=True, index=True, nullable=False)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    expires_at = Column(DateTime, nullable=False)
    revoked = Column(Boolean, default=False)
    user_agent = Column(String, nullable=True)
    ip_address = Column(String, nullable=True)

    user = relationship("User", back_populates="refresh_tokens")

    @staticmethod
    def create_token(user_id: int, days: int, user_agent: str = None, ip: str = None):
        now = datetime.utcnow()
        return RefreshToken(
            token=str(uuid.uuid4()),
            user_id=user_id,
            created_at=now,
            expires_at=now + timedelta(days=days),
            user_agent=user_agent,
            ip_address=ip,
        )

بهذا الشكل، كل Refresh Token يتم تخزينه في قاعدة البيانات مع حالة الإبطال (revoked) وتاريخ الانتهاء.

5. إنشاء JWT Access Token و Refresh Token

سننشئ ملف auth_utils.py يحتوي على دوال التشفير والتوليد:

from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext

from config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
    return encoded_jwt


def decode_token(token: str) -> dict:
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        return payload
    except JWTError:
        raise ValueError("Invalid or expired token")

هنا نُنشئ فقط Access Token باستخدام JWT، بينما يتم توليد Refresh Token كـ UUID وتخزينه في قاعدة البيانات من خلال نموذج RefreshToken الذي أنشأناه.

6. نقاط الـ API الأساسية للمصادقة

6.1 نماذج (Schemas) باستخدام Pydantic

أنشئ ملف schemas.py:

from pydantic import BaseModel, EmailStr

class TokenPair(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"


class TokenRefreshRequest(BaseModel):
    refresh_token: str


class UserCreate(BaseModel):
    email: EmailStr
    password: str


class UserLogin(BaseModel):
    email: EmailStr
    password: str


class UserOut(BaseModel):
    id: int
    email: EmailStr

    class Config:
        from_attributes = True

6.2 مسارات التسجيل وتسجيل الدخول

في ملف main.py (أو auth_routes.py ثم تضمينه):

from fastapi import FastAPI, Depends, HTTPException, status, Request
from sqlalchemy.orm import Session
from datetime import timedelta

from config import settings
from database import SessionLocal, engine
from models import Base, User, RefreshToken
from schemas import UserCreate, UserLogin, TokenPair, TokenRefreshRequest, UserOut
from auth_utils import hash_password, verify_password, create_access_token
from datetime import datetime

Base.metadata.create_all(bind=engine)

app = FastAPI(title="FastAPI JWT Refresh Tokens Example")


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


@app.post("/auth/register", response_model=UserOut)
def register(user_in: UserCreate, db: Session = Depends(get_db)):
    existing = db.query(User).filter(User.email == user_in.email).first()
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")
    user = User(email=user_in.email, hashed_password=hash_password(user_in.password))
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


@app.post("/auth/login", response_model=TokenPair)
def login(request: Request, creds: UserLogin, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.email == creds.email).first()
    if not user or not verify_password(creds.password, user.hashed_password):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")

    access_token = create_access_token({"sub": str(user.id)})
    refresh = RefreshToken.create_token(
        user_id=user.id,
        days=settings.REFRESH_TOKEN_EXPIRE_DAYS,
        user_agent=request.headers.get("user-agent"),
        ip=request.client.host if request.client else None,
    )
    db.add(refresh)
    db.commit()
    return TokenPair(access_token=access_token, refresh_token=refresh.token)

بهذا الشكل، عند تسجيل الدخول:

  • نولّد Access Token مع claim sub يحمل user_id.
  • نولّد Refresh Token ونخزنه في قاعدة البيانات.
  • نعيد الزوج (access_token, refresh_token) للعميل.

6.3 مسار تجديد التوكن /auth/refresh

@app.post("/auth/refresh", response_model=TokenPair)
def refresh_tokens(body: TokenRefreshRequest, db: Session = Depends(get_db)):
    stored = (
        db.query(RefreshToken)
        .filter(
            RefreshToken.token == body.refresh_token,
            RefreshToken.revoked == False,
            RefreshToken.expires_at > datetime.utcnow(),
        )
        .first()
    )

    if not stored:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token")

    user = db.query(User).filter(User.id == stored.user_id, User.is_active == True).first()
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")

    access_token = create_access_token({"sub": str(user.id)})
    # خيار: إنشاء Refresh Token جديد وإبطال القديم (Rotation)
    new_refresh = RefreshToken.create_token(
        user_id=user.id,
        days=settings.REFRESH_TOKEN_EXPIRE_DAYS,
        user_agent=stored.user_agent,
        ip=stored.ip_address,
    )

    stored.revoked = True  # إبطال القديم (أفضل من إبقائه فعال)
    db.add(new_refresh)
    db.commit()

    return TokenPair(access_token=access_token, refresh_token=new_refresh.token)

لاحظ هنا أننا طبقنا مفهوم Refresh Token Rotation:

  • كل عملية تجديد تنتج Refresh Token جديد.
  • نقوم بإبطال القديم فوراً.
  • هذا يقلل من خطر إعادة استخدام Refresh Token مسروق.

6.4 مسار تسجيل الخروج /auth/logout

@app.post("/auth/logout")
def logout(body: TokenRefreshRequest, db: Session = Depends(get_db)):
    stored = db.query(RefreshToken).filter(RefreshToken.token == body.refresh_token).first()
    if stored:
        stored.revoked = True
        db.commit()
    return {"detail": "Logged out"}

يمكنك أيضاً إضافة مسار لإبطال جميع Refresh Tokens المرتبطة بمستخدم معيّن (تسجيل خروج من كل الأجهزة).

7. حماية المسارات باستخدام Access Token

الآن نحتاج إلى Dependency في FastAPI يقوم بما يلي:

  • قراءة Access Token من الـ Header (Authorization: Bearer <token>).
  • التحقق من صحته وصلاحيته.
  • استخراج user_id وجلب المستخدم من قاعدة البيانات.
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")  # استخدمناه فقط كتعريف

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
    from auth_utils import decode_token
    try:
        payload = decode_token(token)
    except ValueError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired access token",
            headers={"WWW-Authenticate": "Bearer"},
        )

    user_id: str = payload.get("sub")
    if user_id is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")

    user = db.query(User).filter(User.id == int(user_id), User.is_active == True).first()
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
    return user


@app.get("/me", response_model=UserOut)
def read_me(current_user: User = Depends(get_current_user)):
    return current_user

بهذا الشكل، أي مسار يحتاج مصادقة يمكنه استخدام Depends(get_current_user) بسهولة.

8. أفضل الممارسات مع FastAPI JWT Refresh Tokens

8.1 الأمن على مستوى المتصفح (Front-end)

  • يفضل تخزين Access Token في الذاكرة (In-memory state) أو في HttpOnly Cookie وليس في localStorage لتقليل خطر XSS.
  • يمكن تخزين Refresh Token في HttpOnly Cookie فقط، وعدم كشفه لـ JavaScript.
  • استخدام HTTPS دائماً لتجنب تسرب التوكن عبر الشبكة.

8.2 مدة صلاحية التوكن

  • اجعل Access Token قصير المدة (10–30 دقيقة) قدر الإمكان.
  • اجعل Refresh Token من أيام إلى أسابيع حسب حساسية النظام، مع إمكانية إبطال الجلسات من لوحة الإدارة.

8.3 إدارة الـ Refresh Tokens في قاعدة البيانات

  • احذف التوكنات المنتهية (expires_at < now) بشكل دوري عبر Background Task أو Cron Job.
  • احتفظ بسجل (Log) لمحاولات استخدام Refresh Token منتهي أو موقوف لتتبع محاولات الاختراق.

يمكنك الاستفادة من: التعامل مع Background Tasks في Django وFastAPI لتنفيذ مهمة دورية لتنظيف التوكنات المنتهية.

8.4 حماية إضافية: ربط التوكن بـ User-Agent و IP

قمنا بتخزين user_agent و ip_address في جدول RefreshToken. يمكنك:

  • رفض استخدام Refresh Token من متصفح أو IP مختلف فجأة.
  • تنبيه المستخدم أو طلب إعادة تسجيل الدخول في هذه الحالة.

8.5 التعامل مع Rate Limiting

مسارات مثل /auth/login و /auth/refresh معرضة لمحاولات brute-force، لذلك من المهم إضافة Rate Limiting:

  • تحديد عدد معين من الطلبات لكل IP لكل دقيقة/ساعة.
  • حظر مؤقت لـ IP عند تجاوز الحد.

يمكنك مراجعة المقال: دليل التعامل مع Rate Limiting في API: حماية بدون خنق المستخدمين لتطبيق Rate Limiting مع FastAPI.

9. مقارنة سريعة مع OAuth2 في FastAPI

في بعض الأنظمة، قد تحتاج للانتقال إلى OAuth2 أو دمجه مع JWT. في FastAPI، يمكن بناء أنظمة OAuth2 مع Password Flow أو Authorization Code Flow بسهولة. إذا كنت مهتماً ببناء نظام أكثر تعقيداً مثل تسجيل الدخول عبر Google أو GitHub، راجع: تنفيذ OAuth2 في Django و FastAPI: دليل عملي كامل.

لكن حتى مع OAuth2، يظل مفهوم Access Token + Refresh Token هو الأساس لإدارة الجلسات بشكل آمن.

10. خلاصة: ما الذي يوفره FastAPI JWT Refresh Tokens؟

  • حماية أفضل من اختراق التوكن مقارنة باستخدام Access Token طويل المدى فقط.
  • تجربة مستخدم أفضل عبر تجديد تلقائي للتوكن دون إعادة تسجيل الدخول بشكل متكرر.
  • مرونة عالية في إبطال الجلسات وإدارة الأجهزة النشطة.
  • قابلية توسع مع إضافة طبقات أمنية أخرى مثل Rate Limiting، Web Application Firewall، ومراقبة النشاط.

بتطبيق هذا التصميم في مشروعك، ستحصل على طبقة مصادقة قوية مبنية على FastAPI JWT Refresh Tokens، قابلة للتوسع والتطوير، وملتزمة بأفضل الممارسات الأمنية المستخدمة في الأنظمة الاحترافية.

حول المحتوى:

دليل شامل لشرح JWT وRefresh Tokens وكيفية إنشاء نظام مصادقة آمن يدعم انتهاء الجلسات والتجديد وإبطال التوكن.

هل كان هذا مفيدًا لك؟

أضف تعليقك