AI Object Detection
It's fairly simple to just add some basic python computer vision code any project on a raspberry pi.
YOLOv8
Ultralytics has a guide on the basics of using yolo on a pi. So let's add basic object detection to the app.
pip install ultralytics opencv-python-headless numpy
pip install ultralytics opencv-python-headless numpy
pip install ultralytics opencv-python-headless numpy
pip install ultralytics opencv-python-headless numpy
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()
# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)
# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()
# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)
# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()
# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)
# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()
# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)
# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
If you run the server now (fastapi dev --host 0.0.0.0
) you should see the object detection results in the browser. But it's probably running at like 1 frame per second, super janky. That's because the raspberry pi isn't powerful enough to do this effeciently, we're already using the smallest (nano) version of the yolo model, but the pi has 0 AI hardware to help out.
That beind said, we can improve the performance by converting the model to the NCNN format.
Convert to NCNN
This will optomize the model for the pi's hardware and make it run much faster.
from ultralytics import YOLO
# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")
# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
from ultralytics import YOLO
# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")
# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
from ultralytics import YOLO
# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")
# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
from ultralytics import YOLO
# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")
# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
Now if you run the server again, the object detection should be much faster, but still super slow and janky. The pi just isn't powerful enough to do this effeciently. However, we can upgrade the pi hardware to make it run in real time.
AI Kit
The Raspberry Pi AI Kit is a Hailo Neural Processing Unit (NPU) that plugs straight into the PCIe port on the pi. You can find all the tech specs and details on the raspberry pi website, but it's basically a small AI chip that can run AI models much faster than the pi's CPU.
The docs on how to integrate the AI Kit with the pi are not great, but aparently they have more docs and examples "coming soon".
I could not figure out how to get this board working with python, so I tweaked one of their C++ examples and the speads are incredible. My C++ code is kind of 💩, but i'll leave a link to it incase you want to check it out.
View at your own risk: https://github.com/meech-ward/cpp-raspberry-pi-camera-ai