Real - time analytics is the process of analyzing data as soon as it is generated or received. Unlike traditional batch analytics, which processes data in large batches at regular intervals, real - time analytics provides immediate insights. This is useful in various scenarios such as financial trading, IoT monitoring, and social media sentiment analysis.
FastAPI is a modern, high - performance web framework for building APIs in Python. It uses Python type hints to validate, serialize, and deserialize data, which makes the code more robust and easier to understand. FastAPI is built on top of Starlette for the web parts and Pydantic for the data parts, and it offers features like automatic API documentation generation and asynchronous processing.
Websockets are a communication protocol that provides a full - duplex communication channel over a single TCP connection. Unlike HTTP, which is a request - response protocol, Websockets allow for continuous data exchange between the client and the server. This makes them ideal for real - time applications such as chat applications, live dashboards, and real - time analytics.
First, you need to install the necessary libraries. You can use pip
to install fastapi
and uvicorn
(a server for running FastAPI applications) and websockets
for handling WebSocket connections.
pip install fastapi uvicorn websockets
Here is a simple example of a FastAPI application with a WebSocket endpoint:
from fastapi import FastAPI, WebSocket
import uvicorn
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
In this code:
/ws
.To run the application, save the code in a file (e.g., main.py
) and run the following command:
uvicorn main:app --reload
Let’s assume we are receiving data from a client in JSON format. We can modify our previous code to handle JSON data.
from fastapi import FastAPI, WebSocket
import uvicorn
import json
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
try:
data = await websocket.receive_text()
json_data = json.loads(data)
# Here we have successfully ingested the data
print("Received data:", json_data)
except json.JSONDecodeError:
await websocket.send_text("Invalid JSON data received.")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Once we have ingested the data, we can perform some real - time analytics. For example, if we are receiving a list of numbers, we can calculate their sum.
from fastapi import FastAPI, WebSocket
import uvicorn
import json
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
try:
data = await websocket.receive_text()
json_data = json.loads(data)
if isinstance(json_data, list) and all(isinstance(i, (int, float)) for i in json_data):
result = sum(json_data)
await websocket.send_text(f"The sum of the numbers is: {result}")
else:
await websocket.send_text("Invalid data format. Expected a list of numbers.")
except json.JSONDecodeError:
await websocket.send_text("Invalid JSON data received.")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
After processing the data, we send the results back to the client using the send_text
method of the WebSocket object. In the above example, we send the sum of the numbers back to the client.
In real - world applications, errors can occur at various stages, such as when receiving data, parsing JSON, or performing calculations. It is important to handle these errors gracefully. In our examples, we used a try - except
block to catch JSONDecodeError
and send an appropriate error message to the client.
As the number of clients and the volume of data increase, the application needs to be scalable. FastAPI supports asynchronous processing, which allows the application to handle multiple requests concurrently. You can also use techniques like load balancing and horizontal scaling to handle a large number of connections.
websockets
support asynchronous programming.Real - time analytics with Python FastAPI and Websockets is a powerful combination for building modern, data - driven applications. By understanding the fundamental concepts, following the usage methods, common practices, and best practices, you can build robust and scalable real - time analytics applications. The examples provided in this blog are a starting point, and you can extend them to meet the specific requirements of your application.