FastAPI with Celery: Building an Asynchronous Task Queue
In modern web development, handling time - consuming tasks efficiently is crucial. When building web applications with FastAPI, a high - performance Python web framework, you may encounter tasks such as sending emails, processing large files, or making external API calls that can block the main thread and slow down the application. Celery, a powerful asynchronous task queue library for Python, comes to the rescue. By integrating Celery with FastAPI, you can offload these time - consuming tasks to a separate worker process, allowing your FastAPI application to handle more requests concurrently and provide a better user experience. In this blog post, we will explore the fundamental concepts of using Celery with FastAPI to build an asynchronous task queue, learn about usage methods, common practices, and best practices.
Table of Contents
- [Fundamental Concepts](#fundamental - concepts)
- [Installation and Setup](#installation - and - setup)
- [Usage Methods](#usage - methods)
- [Common Practices](#common - practices)
- [Best Practices](#best - practices)
- Conclusion
- References
Fundamental Concepts
FastAPI
FastAPI is a modern, fast (high - performance) web framework for building APIs with Python 3.7+ based on standard Python type hints. It uses Starlette for the web parts and Pydantic for data validation. FastAPI is designed to be easy to use, efficient, and has excellent performance.
Celery
Celery is an asynchronous task queue/job queue based on distributed message passing. It allows you to run tasks asynchronously, which means that these tasks can be executed in the background without blocking the main application thread. Celery uses a message broker (such as RabbitMQ or Redis) to send and receive messages between the main application and the worker processes.
Asynchronous Task Queue
An asynchronous task queue is a mechanism that allows you to enqueue tasks for later execution. When a task is added to the queue, it doesn’t need to be executed immediately. Instead, worker processes pick up tasks from the queue and execute them in the background. This is especially useful for handling long - running tasks in a web application, as it prevents the application from becoming unresponsive.
Installation and Setup
Install Dependencies
First, you need to install FastAPI, Celery, and a message broker. In this example, we will use Redis as the message broker.
pip install fastapi uvicorn celery redis
Configure Celery
Create a celery_app.py file to configure Celery:
from celery import Celery
celery_app = Celery('tasks', broker='redis://localhost:6379/0')
@celery_app.task
def example_task():
import time
time.sleep(5)
return 'Task completed'
Create a FastAPI Application
Create a main.py file to create the FastAPI application:
from fastapi import FastAPI
from celery_app import example_task
app = FastAPI()
@app.get("/trigger_task")
def trigger_task():
task = example_task.delay()
return {"task_id": task.id}
@app.get("/task_status/{task_id}")
def task_status(task_id):
from celery_app import celery_app
task = celery_app.AsyncResult(task_id)
return {"status": task.status, "result": task.result}
Start Redis
Make sure Redis is running on your local machine. You can start Redis using the following command:
redis - server
Start Celery Worker
Start the Celery worker in a separate terminal:
celery -A celery_app worker --loglevel=info
Start FastAPI Application
Start the FastAPI application using Uvicorn:
uvicorn main:app --reload
Usage Methods
Enqueuing a Task
In the FastAPI application, you can enqueue a task by calling the delay() method on the Celery task. For example:
from celery_app import example_task
@app.get("/trigger_task")
def trigger_task():
task = example_task.delay()
return {"task_id": task.id}
Checking Task Status
You can check the status of a task by using the AsyncResult class provided by Celery.
@app.get("/task_status/{task_id}")
def task_status(task_id):
from celery_app import celery_app
task = celery_app.AsyncResult(task_id)
return {"status": task.status, "result": task.result}
Common Practices
Error Handling
In Celery tasks, it’s important to handle errors properly. You can use try - except blocks in your tasks to catch and handle exceptions.
@celery_app.task
def example_task():
try:
import time
time.sleep(5)
return 'Task completed'
except Exception as e:
return f'Error: {str(e)}'
Task Retries
For tasks that may fail due to transient errors (such as network issues), you can configure Celery to retry the task a certain number of times.
@celery_app.task(bind=True, default_retry_delay=300, max_retries=5)
def example_task(self):
try:
import time
time.sleep(5)
return 'Task completed'
except Exception as exc:
self.retry(exc=exc)
Best Practices
Use a Separate Environment for Celery Workers
It’s a good practice to run Celery workers in a separate environment or container. This allows you to scale the number of workers independently of the FastAPI application.
Monitor Celery Workers
Use monitoring tools such as Flower to monitor the performance and status of Celery workers. You can install Flower using pip install flower and start it with celery -A celery_app flower.
Optimize Task Design
Design your tasks to be as independent and atomic as possible. This makes it easier to scale and manage the task queue.
Conclusion
Integrating Celery with FastAPI is a powerful way to handle asynchronous tasks in your web application. By offloading long - running tasks to a separate worker process, you can improve the performance and responsiveness of your application. In this blog post, we have covered the fundamental concepts, installation and setup, usage methods, common practices, and best practices of using FastAPI with Celery to build an asynchronous task queue. With this knowledge, you can start building more efficient and robust web applications.