FastAPI is a modern, fast (high - performance) web framework for building APIs with Python 3.7+ based on standard Python type hints. It uses Starlette for the web parts and Pydantic for data validation. FastAPI is designed to be easy to use, efficient, and has excellent performance.
Celery is an asynchronous task queue/job queue based on distributed message passing. It allows you to run tasks asynchronously, which means that these tasks can be executed in the background without blocking the main application thread. Celery uses a message broker (such as RabbitMQ or Redis) to send and receive messages between the main application and the worker processes.
An asynchronous task queue is a mechanism that allows you to enqueue tasks for later execution. When a task is added to the queue, it doesn’t need to be executed immediately. Instead, worker processes pick up tasks from the queue and execute them in the background. This is especially useful for handling long - running tasks in a web application, as it prevents the application from becoming unresponsive.
First, you need to install FastAPI, Celery, and a message broker. In this example, we will use Redis as the message broker.
pip install fastapi uvicorn celery redis
Create a celery_app.py
file to configure Celery:
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def example_task():
import time
time.sleep(5)
return 'Task completed'
Create a main.py
file to create the FastAPI application:
from fastapi import FastAPI
from celery_app import example_task
app = FastAPI()
@app.get("/trigger_task")
def trigger_task():
task = example_task.delay()
return {"task_id": task.id}
@app.get("/task_status/{task_id}")
def task_status(task_id):
from celery_app import celery_app
task = celery_app.AsyncResult(task_id)
return {"status": task.status, "result": task.result}
Make sure Redis is running on your local machine. You can start Redis using the following command:
redis - server
Start the Celery worker in a separate terminal:
celery -A celery_app worker --loglevel=info
Start the FastAPI application using Uvicorn:
uvicorn main:app --reload
In the FastAPI application, you can enqueue a task by calling the delay()
method on the Celery task. For example:
from celery_app import example_task
@app.get("/trigger_task")
def trigger_task():
task = example_task.delay()
return {"task_id": task.id}
You can check the status of a task by using the AsyncResult
class provided by Celery.
@app.get("/task_status/{task_id}")
def task_status(task_id):
from celery_app import celery_app
task = celery_app.AsyncResult(task_id)
return {"status": task.status, "result": task.result}
In Celery tasks, it’s important to handle errors properly. You can use try - except blocks in your tasks to catch and handle exceptions.
@celery_app.task
def example_task():
try:
import time
time.sleep(5)
return 'Task completed'
except Exception as e:
return f'Error: {str(e)}'
For tasks that may fail due to transient errors (such as network issues), you can configure Celery to retry the task a certain number of times.
@celery_app.task(bind=True, default_retry_delay=300, max_retries=5)
def example_task(self):
try:
import time
time.sleep(5)
return 'Task completed'
except Exception as exc:
self.retry(exc=exc)
It’s a good practice to run Celery workers in a separate environment or container. This allows you to scale the number of workers independently of the FastAPI application.
Use monitoring tools such as Flower to monitor the performance and status of Celery workers. You can install Flower using pip install flower
and start it with celery -A celery_app flower
.
Design your tasks to be as independent and atomic as possible. This makes it easier to scale and manage the task queue.
Integrating Celery with FastAPI is a powerful way to handle asynchronous tasks in your web application. By offloading long - running tasks to a separate worker process, you can improve the performance and responsiveness of your application. In this blog post, we have covered the fundamental concepts, installation and setup, usage methods, common practices, and best practices of using FastAPI with Celery to build an asynchronous task queue. With this knowledge, you can start building more efficient and robust web applications.