Raspi Course

AI Object Detection

It's fairly simple to just add some basic python computer vision code any project on a raspberry pi.

YOLOv8

Ultralytics has a guide on the basics of using yolo on a pi. So let's add basic object detection to the app.

step 1:

Install the ultralytics, numpy, and opencv

pip install ultralytics opencv-python-headless numpy
pip install ultralytics opencv-python-headless numpy
pip install ultralytics opencv-python-headless numpy
pip install ultralytics opencv-python-headless numpy
step 2:

Update the main.py server file from before to use the YOLOv8n model

main.py
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2


class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()

def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()

async def read(self):
with self.condition:
self.condition.wait()
return self.frame


class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model

async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)

try:
while self.active:
jpeg_data = await output.read()

# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()

# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)

# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None

async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())

async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None


jpeg_stream = JpegStream()


@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()


app = FastAPI(lifespan=lifespan)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()


@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}


@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
main.py
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2


class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()

def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()

async def read(self):
with self.condition:
self.condition.wait()
return self.frame


class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model

async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)

try:
while self.active:
jpeg_data = await output.read()

# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()

# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)

# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None

async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())

async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None


jpeg_stream = JpegStream()


@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()


app = FastAPI(lifespan=lifespan)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()


@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}


@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
main.py
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2


class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()

def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()

async def read(self):
with self.condition:
self.condition.wait()
return self.frame


class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model

async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)

try:
while self.active:
jpeg_data = await output.read()

# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()

# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)

# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None

async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())

async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None


jpeg_stream = JpegStream()


@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()


app = FastAPI(lifespan=lifespan)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()


@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}


@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
main.py
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
from ultralytics import YOLO
import numpy as np
import cv2


class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()

def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()

async def read(self):
with self.condition:
self.condition.wait()
return self.frame


class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
self.model = YOLO("yolov8n.pt") # Load the YOLOv8 model

async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)

try:
while self.active:
jpeg_data = await output.read()

# Convert JPEG data to OpenCV format
np_arr = np.frombuffer(jpeg_data, np.uint8)
img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)

# # Perform object detection
results = self.model(img)
annotated_frame = results[0].plot()

# Encode image back to JPEG
_, annotated_frame_jpeg = cv2.imencode('.jpg', annotated_frame)

# Send the annotated frame to all connected clients
tasks = [
websocket.send_bytes(annotated_frame_jpeg.tobytes())
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None

async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())

async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None


jpeg_stream = JpegStream()


@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()


app = FastAPI(lifespan=lifespan)


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()


@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}


@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}

If you run the server now (fastapi dev --host 0.0.0.0) you should see the object detection results in the browser. But it's probably running at like 1 frame per second, super janky. That's because the raspberry pi isn't powerful enough to do this effeciently, we're already using the smallest (nano) version of the yolo model, but the pi has 0 AI hardware to help out.

That beind said, we can improve the performance by converting the model to the NCNN format.

Convert to NCNN

This will optomize the model for the pi's hardware and make it run much faster.

step 3:

Create a new file called convert.py and add the following code to convert the model to NCNN format

Run the script python convert.py to convert the model

convert.py
from ultralytics import YOLO

# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")

# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
convert.py
from ultralytics import YOLO

# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")

# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
convert.py
from ultralytics import YOLO

# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")

# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
convert.py
from ultralytics import YOLO

# Load a YOLOv8n PyTorch model
model = YOLO("yolov8n.pt")

# Export the model to NCNN format
model.export(format="ncnn") # creates 'yolov8n_ncnn_model'
step 4:

Update the main.py file to use the new NCNN model

main.py
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
main.py
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
main.py
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model
main.py
self.model = YOLO("yolov8n_ncnn_model") # Load the YOLOv8 model

Now if you run the server again, the object detection should be much faster, but still super slow and janky. The pi just isn't powerful enough to do this effeciently. However, we can upgrade the pi hardware to make it run in real time.

AI Kit

The Raspberry Pi AI Kit is a Hailo Neural Processing Unit (NPU) that plugs straight into the PCIe port on the pi. You can find all the tech specs and details on the raspberry pi website, but it's basically a small AI chip that can run AI models much faster than the pi's CPU.

The docs on how to integrate the AI Kit with the pi are not great, but aparently they have more docs and examples "coming soon".

I could not figure out how to get this board working with python, so I tweaked one of their C++ examples and the speads are incredible. My C++ code is kind of 💩, but i'll leave a link to it incase you want to check it out.

View at your own risk: https://github.com/meech-ward/cpp-raspberry-pi-camera-ai