FastAPI is a Python web framework designed for building APIs. It uses Python type hints for request validation, response serialization, and automatic API documentation generation. FastAPI is built on top of Starlette for the web handling parts and Pydantic for data validation. It is known for its high performance, thanks to its asynchronous capabilities and the use of modern Python features.
Apache Kafka is an open - source distributed streaming platform developed by the Apache Software Foundation. It is designed to handle high - volume, real - time data streams. Kafka has a publish - subscribe model where producers send messages to topics, and consumers read messages from these topics. Kafka stores messages in a fault - tolerant and scalable manner across multiple servers called brokers.
Combining FastAPI and Kafka allows developers to build applications that can handle real - time data in a web - based context. For example, a FastAPI application can receive user requests, process them, and then send relevant data to Kafka topics for further processing by other services. On the other hand, a FastAPI application can also consume data from Kafka topics to provide real - time updates to users.
You can install FastAPI using pip
, the Python package manager. Open your terminal and run the following command:
pip install fastapi uvicorn
Here, uvicorn
is an ASGI server that can run FastAPI applications.
bin/zookeeper - server - start.sh config/zookeeper.properties
bin/kafka - server - start.sh config/server.properties
First, install the kafka - python
library to interact with Kafka in Python:
pip install kafka - python
Here is a simple example of a FastAPI application that sends messages to a Kafka topic:
from fastapi import FastAPI
from kafka import KafkaProducer
import json
app = FastAPI()
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf - 8')
)
@app.post("/send - message/")
async def send_message(message: dict):
producer.send('test_topic', value=message)
producer.flush()
return {"status": "Message sent successfully"}
To run this application, save the code in a file (e.g., main.py
) and run the following command in the terminal:
uvicorn main:app --reload
Here is an example of a FastAPI application that consumes messages from a Kafka topic:
from fastapi import FastAPI
from kafka import KafkaConsumer
import json
app = FastAPI()
consumer = KafkaConsumer(
'test_topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf - 8'))
)
@app.get("/consume - messages/")
async def consume_messages():
messages = []
for message in consumer:
messages.append(message.value)
if len(messages) >= 10:
break
return {"messages": messages}
When working with Kafka in a FastAPI application, it’s important to handle errors properly. For example, when producing messages, the send
method of the KafkaProducer
can raise exceptions. Here is an updated version of the message - sending code with error handling:
from fastapi import FastAPI
from kafka import KafkaProducer
import json
app = FastAPI()
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf - 8')
)
@app.post("/send - message/")
async def send_message(message: dict):
try:
producer.send('test_topic', value=message)
producer.flush()
return {"status": "Message sent successfully"}
except Exception as e:
return {"status": "Error sending message", "error": str(e)}
FastAPI is designed for asynchronous processing. You can use the asyncio
library to perform asynchronous operations when interacting with Kafka. For example, when consuming messages, you can use asyncio
to handle multiple tasks concurrently.
Combining FastAPI and Kafka provides a powerful solution for building real - time data - driven applications. FastAPI offers a high - performance web framework for handling user requests, while Kafka provides a reliable and scalable data streaming platform. By following the concepts, usage methods, common practices, and best practices outlined in this blog, developers can build robust applications that can handle real - time data efficiently.