Skip to main content
TopMiniSite

Back to all posts

How to Cancel Previous Request In Fastapi?

Published on
4 min read
How to Cancel Previous Request In Fastapi? image

In FastAPI, you can cancel a previous request by using the standard Python asyncio module to create a task and then cancelling that task before it completes.

Here is an example of how you can cancel a previous request in FastAPI:

from fastapi import FastAPI import asyncio

app = FastAPI()

Store the task object in a global variable

current_task = None

@app.get("/") async def cancel_previous_request(): global current_task

# Cancel the previous request if it is still processing
if current\_task and not current\_task.done():
    current\_task.cancel()

# Create a new task
current\_task = asyncio.create\_task(delayed\_response())

return {"message": "New request started"}

async def delayed_response(): await asyncio.sleep(5) # Simulate a delay

return {"message": "Delayed response"}

In this example, we create a global variable current_task to store the current running asyncio task. When a new request is received, we check if there is a previous task running and cancel it if it is not completed. Then we create a new task to handle the current request.

How to efficiently cancel requests in FastAPI?

To efficiently cancel requests in FastAPI, you can introduce a mechanism to periodically check if a request should be cancelled using Python's asyncio module. Here is an example of how you can implement this:

  1. Create a function that checks if a request should be cancelled:

import asyncio

async def check_cancelled(req_id: int, cancellation_token: asyncio.Event): try: await cancellation_token.wait() # Raise an exception to cancel the request raise asyncio.CancelledError() except asyncio.CancelledError: print(f"Request with ID {req_id} was cancelled.")

  1. Use the check_cancelled function inside your route function, passing the request ID and a cancellation token:

from fastapi import FastAPI import asyncio

app = FastAPI()

async def long_running_task(req_id: int, cancellation_token: asyncio.Event): await check_cancelled(req_id, cancellation_token) # Perform some long-running task here

@app.get("/cancel/{req_id}") async def cancel_request(req_id: int): # Set cancellation token for the request ID cancellation_token = asyncio.Event() cancellation_tokens[req_id] = cancellation_token # Cancel the request cancellation_token.set() return {"message": "Request cancelled"}

Store cancellation tokens for each request ID

cancellation_tokens = {}

@app.get("/process/{req_id}") async def process_request(req_id: int): # Start a long running task asyncio.create_task(long_running_task(req_id, cancellation_tokens.get(req_id))) return {"message": "Processing request"}

In this example, the check_cancelled function periodically checks if a cancellation token has been set for a given request ID. When the cancellation token is set, an asyncio.CancelledError exception is raised to cancel the request. The cancel_request route is used to set the cancellation token for a specific request ID, while the process_request route starts a long-running task using asyncio.create_task.

By implementing this mechanism, you can efficiently cancel requests in FastAPI by checking for cancellation requests at regular intervals.

How to cancel a previous request in FastAPI?

To cancel a previous request in FastAPI, you can use the Task.cancel() method in Python.

Here is an example of how you can cancel a request by raising an exception:

import asyncio

async def long_running_task(): print("Starting long running task...") await asyncio.sleep(10) # Simulating a long running task print("Task complete")

async def cancel_task(): task = asyncio.create_task(long_running_task())

# Wait for a while before cancelling the task
await asyncio.sleep(5)

# Cancel the task
task.cancel()

try: asyncio.run(cancel_task()) except asyncio.CancelledError: print("Task cancelled")

In this example, a long running task is started using asyncio.create_task(). After waiting for 5 seconds, the task is cancelled using the Task.cancel() method. The asyncio.CancelledError exception is caught to handle the cancellation of the task.

You can incorporate this approach into your FastAPI application to cancel previous requests as needed.

How can I terminate a request that is in progress in FastAPI?

One way to terminate a request that is in progress in FastAPI is by raising an exception. You can create a custom exception like RequestTerminated and raise it when you want to terminate the request.

Here is an example code snippet demonstrating how to terminate a request in FastAPI:

from fastapi import FastAPI, HTTPException

app = FastAPI()

class RequestTerminated(Exception): pass

@app.get("/") async def read_root(): raise RequestTerminated("Request terminated")

@app.exception_handler(RequestTerminated) async def request_terminated_exception_handler(request, exc): return JSONResponse(status_code=400, content={"message": "Request terminated"})

In this example, when the root route is accessed, it will raise the RequestTerminated exception which will be handled by the request_terminated_exception_handler and return a JSON response with the message "Request terminated".