python.fastapi.missing_request_timeout
Stability
Medium
Detects FastAPI applications without request timeout middleware configured.
Why It Matters
Section titled “Why It Matters”Missing request timeouts:
- Worker exhaustion — Slow downstream services tie up workers indefinitely
- Cascading failures — One slow service can bring down your entire API
- 503 storms — All workers become occupied, causing unavailability
- Poor user experience — Users wait indefinitely with no feedback
What Unfault Detects
Section titled “What Unfault Detects”- FastAPI apps without timeout middleware
- Missing
TimeoutMiddlewareor similar timeout handling - Applications vulnerable to slow downstream services
Auto-Fix
Section titled “Auto-Fix”Unfault generates a production-ready TimeoutMiddleware that:
- Excludes health check endpoints by default
- Supports per-route timeout configuration via
request.state - Uses proper Python type annotations
- Is fully configurable via constructor parameters
Example
Section titled “Example”# ❌ Before (no timeout middleware)from fastapi import FastAPI
app = FastAPI()
@app.get("/api/data")async def get_data(): return await slow_external_api() # Can hang forever!# ✅ After (with Unfault's auto-fix)import asynciofrom typing import Callablefrom starlette.middleware.base import ( BaseHTTPMiddleware, RequestResponseEndpoint,)from starlette.requests import Requestfrom starlette.responses import JSONResponse, Responsefrom fastapi import FastAPI
DEFAULT_TIMEOUT: float = 30.0 # secondsDEFAULT_EXEMPT_PATHS: set[str] = { "/health", "/healthz", "/ready", "/readiness", "/live", "/liveness", "/metrics",}
class TimeoutMiddleware(BaseHTTPMiddleware): """Request timeout middleware with per-route configuration.
Args: app: The ASGI application. timeout: Default timeout in seconds for all requests. exempt_paths: Paths that bypass timeout enforcement. """
timeout: float exempt_paths: set[str]
def __init__( self, app: Callable, timeout: float = DEFAULT_TIMEOUT, exempt_paths: set[str] | None = None, ) -> None: super().__init__(app) self.timeout = timeout self.exempt_paths = ( exempt_paths if exempt_paths is not None else DEFAULT_EXEMPT_PATHS )
async def dispatch( self, request: Request, call_next: RequestResponseEndpoint, ) -> Response: # Skip timeout for exempt paths (health checks, etc.) if request.url.path in self.exempt_paths: return await call_next(request)
# Check for per-route timeout override route_timeout: float | None = getattr( request.state, "timeout", self.timeout )
# If timeout is None, skip timeout for this route if route_timeout is None: return await call_next(request)
try: return await asyncio.wait_for( call_next(request), timeout=route_timeout ) except asyncio.TimeoutError: return JSONResponse( status_code=504, content={ "detail": f"Request timeout after {route_timeout}s" }, )
app = FastAPI()app.add_middleware(TimeoutMiddleware, timeout=30.0)Configuration Options
Section titled “Configuration Options”Custom Default Timeout
Section titled “Custom Default Timeout”# Set a 60-second default timeoutapp.add_middleware(TimeoutMiddleware, timeout=60.0)Custom Exempt Paths
Section titled “Custom Exempt Paths”# Add custom paths that should bypass timeoutapp.add_middleware( TimeoutMiddleware, timeout=30.0, exempt_paths={"/health", "/metrics", "/long-job"},)Per-Route Timeout Override
Section titled “Per-Route Timeout Override”For routes that need more time (file uploads, report generation):
from fastapi import Depends, Request
def timeout_120s(request: Request) -> None: """Set 120 second timeout for this route.""" request.state.timeout = 120.0
@app.post("/upload", dependencies=[Depends(timeout_120s)])async def upload_file(file: UploadFile): return await process_large_file(file)Disable Timeout for Specific Routes
Section titled “Disable Timeout for Specific Routes”For streaming or WebSocket-like endpoints:
def no_timeout(request: Request) -> None: """Disable timeout for this route.""" request.state.timeout = None
@app.get("/stream", dependencies=[Depends(no_timeout)])async def stream_data(): async def generate(): for i in range(100): yield f"data: {i}\n\n" await asyncio.sleep(1) return StreamingResponse(generate())