حول المحتوى:
دليل شامل لشرح JWT وRefresh Tokens وكيفية إنشاء نظام مصادقة آمن يدعم انتهاء الجلسات والتجديد وإبطال التوكن.
في أنظمة الـ API الحديثة، الاعتماد على JWT فقط كـ Access Token بدون نظام Refresh Tokens يعتبر مخاطرة أمنية كبيرة، خصوصًا مع تطور طرق الهجمات، وسهولة سرقة التوكن من المتصفح أو من جهاز المستخدم. في هذا الدليل سنشرح بالتفصيل كيفية تطبيق FastAPI JWT Refresh Tokens لبناء نظام مصادقة آمن يدعم:
إذا لم تكن لديك خبرة سابقة مع FastAPI أو بناء RESTful APIs، يمكنك مراجعة: بناء RESTful APIs باستخدام FastAPI و أفضل ممارسات تصميم RESTful APIs آمن مع أمثلة قبل الغوص في هذا الدليل.
JWT أو JSON Web Token هو معيار لتمثيل بيانات الوصول (Claims) بشكل مشفر (موقّع رقمياً) في شكل سلسلة نصية. يتكون من ثلاثة أجزاء:
في FastAPI، غالباً نستخدم JWT كـ Access Token لإثبات هوية المستخدم والسماح له بالوصول لنقاط API محمية.
إذا جعلت صلاحية الـ Access Token طويلة (مثلاً 7 أيام)، فأي اختراق أو سرقة للتوكن تعني وصول مستمر للـ API لفترة طويلة. وإذا جعلتها قصيرة جداً (15 دقيقة مثلاً)، سيُطلب من المستخدم تسجيل الدخول كثيراً.
هنا يأتي دور Refresh Tokens:
الفكرة: عند انتهاء الـ Access Token، يرسل العميل (Client) الـ Refresh Token للحصول على Access Token جديد دون إعادة إدخال بيانات الدخول، مع إمكانية إبطال الـ Refresh Token من السيرفر في أي وقت.
قبل كتابة الكود، نحتاج لتصميم بسيط يوضح مكونات النظام:
هذا التصميم يعطي مرونة كبيرة، ويتيح:
نفترض أنك تعمل على بيئة Python 3.10+، سنبدأ بتثبيت الحزم الأساسية:
pip install fastapi uvicorn[standard] python-jose[cryptography] passlib[bcrypt] pydantic[email] sqlalchemy سنستخدم:
أنشئ ملف config.py:
from datetime import timedelta
class Settings:
SECRET_KEY = "CHANGE_ME_TO_A_LONG_RANDOM_SECRET"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
settings = Settings() سننشئ نموذجين رئيسيين: User و RefreshToken.
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
from datetime import datetime, timedelta
import uuid
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
refresh_tokens = relationship("RefreshToken", back_populates="user")
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id = Column(Integer, primary_key=True, index=True)
token = Column(String, unique=True, index=True, nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime, nullable=False)
revoked = Column(Boolean, default=False)
user_agent = Column(String, nullable=True)
ip_address = Column(String, nullable=True)
user = relationship("User", back_populates="refresh_tokens")
@staticmethod
def create_token(user_id: int, days: int, user_agent: str = None, ip: str = None):
now = datetime.utcnow()
return RefreshToken(
token=str(uuid.uuid4()),
user_id=user_id,
created_at=now,
expires_at=now + timedelta(days=days),
user_agent=user_agent,
ip_address=ip,
) بهذا الشكل، كل Refresh Token يتم تخزينه في قاعدة البيانات مع حالة الإبطال (revoked) وتاريخ الانتهاء.
سننشئ ملف auth_utils.py يحتوي على دوال التشفير والتوليد:
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext
from config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
raise ValueError("Invalid or expired token") هنا نُنشئ فقط Access Token باستخدام JWT، بينما يتم توليد Refresh Token كـ UUID وتخزينه في قاعدة البيانات من خلال نموذج RefreshToken الذي أنشأناه.
أنشئ ملف schemas.py:
from pydantic import BaseModel, EmailStr
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class TokenRefreshRequest(BaseModel):
refresh_token: str
class UserCreate(BaseModel):
email: EmailStr
password: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserOut(BaseModel):
id: int
email: EmailStr
class Config:
from_attributes = True في ملف main.py (أو auth_routes.py ثم تضمينه):
from fastapi import FastAPI, Depends, HTTPException, status, Request
from sqlalchemy.orm import Session
from datetime import timedelta
from config import settings
from database import SessionLocal, engine
from models import Base, User, RefreshToken
from schemas import UserCreate, UserLogin, TokenPair, TokenRefreshRequest, UserOut
from auth_utils import hash_password, verify_password, create_access_token
from datetime import datetime
Base.metadata.create_all(bind=engine)
app = FastAPI(title="FastAPI JWT Refresh Tokens Example")
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/auth/register", response_model=UserOut)
def register(user_in: UserCreate, db: Session = Depends(get_db)):
existing = db.query(User).filter(User.email == user_in.email).first()
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
user = User(email=user_in.email, hashed_password=hash_password(user_in.password))
db.add(user)
db.commit()
db.refresh(user)
return user
@app.post("/auth/login", response_model=TokenPair)
def login(request: Request, creds: UserLogin, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == creds.email).first()
if not user or not verify_password(creds.password, user.hashed_password):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
access_token = create_access_token({"sub": str(user.id)})
refresh = RefreshToken.create_token(
user_id=user.id,
days=settings.REFRESH_TOKEN_EXPIRE_DAYS,
user_agent=request.headers.get("user-agent"),
ip=request.client.host if request.client else None,
)
db.add(refresh)
db.commit()
return TokenPair(access_token=access_token, refresh_token=refresh.token) بهذا الشكل، عند تسجيل الدخول:
sub يحمل user_id.@app.post("/auth/refresh", response_model=TokenPair)
def refresh_tokens(body: TokenRefreshRequest, db: Session = Depends(get_db)):
stored = (
db.query(RefreshToken)
.filter(
RefreshToken.token == body.refresh_token,
RefreshToken.revoked == False,
RefreshToken.expires_at > datetime.utcnow(),
)
.first()
)
if not stored:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token")
user = db.query(User).filter(User.id == stored.user_id, User.is_active == True).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
access_token = create_access_token({"sub": str(user.id)})
# خيار: إنشاء Refresh Token جديد وإبطال القديم (Rotation)
new_refresh = RefreshToken.create_token(
user_id=user.id,
days=settings.REFRESH_TOKEN_EXPIRE_DAYS,
user_agent=stored.user_agent,
ip=stored.ip_address,
)
stored.revoked = True # إبطال القديم (أفضل من إبقائه فعال)
db.add(new_refresh)
db.commit()
return TokenPair(access_token=access_token, refresh_token=new_refresh.token) لاحظ هنا أننا طبقنا مفهوم Refresh Token Rotation:
@app.post("/auth/logout")
def logout(body: TokenRefreshRequest, db: Session = Depends(get_db)):
stored = db.query(RefreshToken).filter(RefreshToken.token == body.refresh_token).first()
if stored:
stored.revoked = True
db.commit()
return {"detail": "Logged out"} يمكنك أيضاً إضافة مسار لإبطال جميع Refresh Tokens المرتبطة بمستخدم معيّن (تسجيل خروج من كل الأجهزة).
الآن نحتاج إلى Dependency في FastAPI يقوم بما يلي:
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") # استخدمناه فقط كتعريف
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
from auth_utils import decode_token
try:
payload = decode_token(token)
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired access token",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")
user = db.query(User).filter(User.id == int(user_id), User.is_active == True).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
return user
@app.get("/me", response_model=UserOut)
def read_me(current_user: User = Depends(get_current_user)):
return current_user بهذا الشكل، أي مسار يحتاج مصادقة يمكنه استخدام Depends(get_current_user) بسهولة.
expires_at < now) بشكل دوري عبر Background Task أو Cron Job.يمكنك الاستفادة من: التعامل مع Background Tasks في Django وFastAPI لتنفيذ مهمة دورية لتنظيف التوكنات المنتهية.
قمنا بتخزين user_agent و ip_address في جدول RefreshToken. يمكنك:
مسارات مثل /auth/login و /auth/refresh معرضة لمحاولات brute-force، لذلك من المهم إضافة Rate Limiting:
يمكنك مراجعة المقال: دليل التعامل مع Rate Limiting في API: حماية بدون خنق المستخدمين لتطبيق Rate Limiting مع FastAPI.
في بعض الأنظمة، قد تحتاج للانتقال إلى OAuth2 أو دمجه مع JWT. في FastAPI، يمكن بناء أنظمة OAuth2 مع Password Flow أو Authorization Code Flow بسهولة. إذا كنت مهتماً ببناء نظام أكثر تعقيداً مثل تسجيل الدخول عبر Google أو GitHub، راجع: تنفيذ OAuth2 في Django و FastAPI: دليل عملي كامل.
لكن حتى مع OAuth2، يظل مفهوم Access Token + Refresh Token هو الأساس لإدارة الجلسات بشكل آمن.
بتطبيق هذا التصميم في مشروعك، ستحصل على طبقة مصادقة قوية مبنية على FastAPI JWT Refresh Tokens، قابلة للتوسع والتطوير، وملتزمة بأفضل الممارسات الأمنية المستخدمة في الأنظمة الاحترافية.
دليل شامل لشرح JWT وRefresh Tokens وكيفية إنشاء نظام مصادقة آمن يدعم انتهاء الجلسات والتجديد وإبطال التوكن.
مساحة اعلانية