Pi Camera
Since the raspberry pi is just a nomormal computer, you can plug any old USB camera into it. But there's also a couple of Camera Serial Interface (CSI) ports, where you can plug in cameras specially optomized for a raspberry pi.
I'm using the Raspberry Pi Camera Module V3 which has a bunch of tech specs I don't quite understand but the quality seems pretty good to me.
Software
Technically you can use any language with the camera, but anything raspbery pi specific is has much better support in python or c++. So we'll switch to python for a moment.
This comes pre installed on Raspberry Pi OS, but the software we're going to use is called libcamera
https://github.com/raspberrypi/libcamera. It's a C++ library that has python bindings and there's some ok documentation on how to use the preinstalled binaires and do some C++ programming. But more importantly, there's a Pycamera2 python library that's a libcamera-based easy-to-use python library for working with the camera.
In short, use:
- libcamera directly if you're programming with c++ or ffi from another language.
- Picamera2 if you want a much easier time getting started and you enjoy coding with python.
Getting Started
sudo apt-get update
sudo apt-get install python3-picamera2
sudo apt-get update
sudo apt-get install python3-picamera2
sudo apt-get update
sudo apt-get install python3-picamera2
sudo apt-get update
sudo apt-get install python3-picamera2
mkdir camera-app
cd camera-app
python3 -m venv --system-site-packages venv
source venv/bin/activate
touch main.py
mkdir camera-app
cd camera-app
python3 -m venv --system-site-packages venv
source venv/bin/activate
touch main.py
mkdir camera-app
cd camera-app
python3 -m venv --system-site-packages venv
source venv/bin/activate
touch main.py
mkdir camera-app
cd camera-app
python3 -m venv --system-site-packages venv
source venv/bin/activate
touch main.py
To get started, there's a Picamera 2 manual, a basic getting started tutorial, or probably the most helpful thing, code examples. Use all three resources.
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.NULL)
picam2.start_and_capture_file("test.jpg")
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.NULL)
picam2.start_and_capture_file("test.jpg")
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.NULL)
picam2.start_and_capture_file("test.jpg")
from picamera2 import Picamera2, Preview
picam2 = Picamera2()
picam2.start_preview(Preview.NULL)
picam2.start_and_capture_file("test.jpg")
We don't have a GUI on the pi, so we'll just create a new image on the pi. You could then copy this over to another computer to view it, or if you're using vscode ssh, you can just view it there. At the very least, this will test that the camera is working and you can take pictures with it.
Fast API
Let's just send this image over an http request so we can see it in the browser. We can use the fastapi framework to setup the server and then just send down the image data as a response.
pip install fastapi
pip install fastapi
pip install fastapi
pip install fastapi
import io
from picamera2 import Picamera2
from fastapi import FastAPI
from fastapi.responses import Response
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
import io
from picamera2 import Picamera2
from fastapi import FastAPI
from fastapi.responses import Response
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
import io
from picamera2 import Picamera2
from fastapi import FastAPI
from fastapi.responses import Response
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
import io
from picamera2 import Picamera2
from fastapi import FastAPI
from fastapi.responses import Response
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
Every time you refresh, you should see a new image from the camera.
We can also view this from the next.js app in an img
tag.
const nextConfig = {
async rewrites() {
return [
{
source: '/py/:path*',
destination: 'http://127.0.0.1:8000/:path*',
},
]
},
};
const nextConfig = {
async rewrites() {
return [
{
source: '/py/:path*',
destination: 'http://127.0.0.1:8000/:path*',
},
]
},
};
const nextConfig = {
async rewrites() {
return [
{
source: '/py/:path*',
destination: 'http://127.0.0.1:8000/:path*',
},
]
},
};
const nextConfig = {
async rewrites() {
return [
{
source: '/py/:path*',
destination: 'http://127.0.0.1:8000/:path*',
},
]
},
};
We're using next.js rewrites to allow us to call the python server easily from the next.js app. Any client-side request that starts with py
will just get forwarded to the python server.
export default async function Home() {
return (
<main className="min-h-screen bg-background flex flex-col items-center justify-center p-6">
<img
src="/py/image"
alt="Raspberry Pi"
className="w-2/3"
/>
</main>
);
}
export default async function Home() {
return (
<main className="min-h-screen bg-background flex flex-col items-center justify-center p-6">
<img
src="/py/image"
alt="Raspberry Pi"
className="w-2/3"
/>
</main>
);
}
export default async function Home() {
return (
<main className="min-h-screen bg-background flex flex-col items-center justify-center p-6">
<img
src="/py/image"
alt="Raspberry Pi"
className="w-2/3"
/>
</main>
);
}
export default async function Home() {
return (
<main className="min-h-screen bg-background flex flex-col items-center justify-center p-6">
<img
src="/py/image"
alt="Raspberry Pi"
className="w-2/3"
/>
</main>
);
}
Now you should see an image from the camera on the /camera page of the next.js app.
Stream Video
Let's crank this up a notch and stream the camera feed to a web page. There's a really simple way of streaming video over http called mjpeg. You essentially just continuously send jpeg frames over a single http response and the browser will display them like a video. There's an mjpeg server example on the Picamera2 github which we're going to modify slightly to use fastapi.
import io
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import Response
from fastapi.responses import StreamingResponse
from threading import Condition
import logging
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
def read(self):
with self.condition:
self.condition.wait()
return self.frame
def generate_frames(output):
while True:
try:
frame = output.read()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
except Exception as e:
logging.error(f"Error in generate_frames: {str(e)}")
break
print("done")
@app.get("/mjpeg")
async def mjpeg():
picam2 = Picamera2()
video_config = picam2.create_video_configuration(main={"size": (1920, 1080)})
picam2.configure(video_config)
output = StreamingOutput()
picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.VERY_HIGH)
def stop():
print("Stopping recording")
picam2.stop_recording()
picam2.close()
return StreamingResponse(
generate_frames(output),
media_type="multipart/x-mixed-replace; boundary=frame",
background=BackgroundTask(stop),
)
import io
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import Response
from fastapi.responses import StreamingResponse
from threading import Condition
import logging
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
def read(self):
with self.condition:
self.condition.wait()
return self.frame
def generate_frames(output):
while True:
try:
frame = output.read()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
except Exception as e:
logging.error(f"Error in generate_frames: {str(e)}")
break
print("done")
@app.get("/mjpeg")
async def mjpeg():
picam2 = Picamera2()
video_config = picam2.create_video_configuration(main={"size": (1920, 1080)})
picam2.configure(video_config)
output = StreamingOutput()
picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.VERY_HIGH)
def stop():
print("Stopping recording")
picam2.stop_recording()
picam2.close()
return StreamingResponse(
generate_frames(output),
media_type="multipart/x-mixed-replace; boundary=frame",
background=BackgroundTask(stop),
)
import io
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import Response
from fastapi.responses import StreamingResponse
from threading import Condition
import logging
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
def read(self):
with self.condition:
self.condition.wait()
return self.frame
def generate_frames(output):
while True:
try:
frame = output.read()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
except Exception as e:
logging.error(f"Error in generate_frames: {str(e)}")
break
print("done")
@app.get("/mjpeg")
async def mjpeg():
picam2 = Picamera2()
video_config = picam2.create_video_configuration(main={"size": (1920, 1080)})
picam2.configure(video_config)
output = StreamingOutput()
picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.VERY_HIGH)
def stop():
print("Stopping recording")
picam2.stop_recording()
picam2.close()
return StreamingResponse(
generate_frames(output),
media_type="multipart/x-mixed-replace; boundary=frame",
background=BackgroundTask(stop),
)
import io
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import Response
from fastapi.responses import StreamingResponse
from threading import Condition
import logging
app = FastAPI()
@app.get("/image")
def get_image():
picam2 = Picamera2()
capture_config = picam2.create_still_configuration(main={"size": (1920, 1080)})
picam2.configure(capture_config)
data = io.BytesIO()
picam2.start()
picam2.capture_file(data, format="jpeg")
picam2.stop()
picam2.close()
return Response(content=data.getvalue(), media_type="image/jpeg")
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
def read(self):
with self.condition:
self.condition.wait()
return self.frame
def generate_frames(output):
while True:
try:
frame = output.read()
yield (b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + frame + b"\r\n")
except Exception as e:
logging.error(f"Error in generate_frames: {str(e)}")
break
print("done")
@app.get("/mjpeg")
async def mjpeg():
picam2 = Picamera2()
video_config = picam2.create_video_configuration(main={"size": (1920, 1080)})
picam2.configure(video_config)
output = StreamingOutput()
picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.VERY_HIGH)
def stop():
print("Stopping recording")
picam2.stop_recording()
picam2.close()
return StreamingResponse(
generate_frames(output),
media_type="multipart/x-mixed-replace; boundary=frame",
background=BackgroundTask(stop),
)
now we can use <img src="/py/mjpeg" />
to view the video stream from the camera. There's a lot of room for improvement here, for example, only one person can view a stream at a time, and there's a race condition if you refresh the browser, but this is a good start.
WebSockets
To get a little bit more control, and fix some of the issues, we can convert this example to use WebSockets. I like this approach for because it's pretty easy for a web dev to understand.
pip install websockets
pip install websockets
pip install websockets
pip install websockets
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
tasks = [
websocket.send_bytes(jpeg_data)
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
tasks = [
websocket.send_bytes(jpeg_data)
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
tasks = [
websocket.send_bytes(jpeg_data)
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
import io
import asyncio
from picamera2 import Picamera2
from picamera2.encoders import MJPEGEncoder, Quality
from picamera2.outputs import FileOutput
from fastapi import FastAPI, WebSocket
from threading import Condition
from contextlib import asynccontextmanager
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
super().__init__()
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
async def read(self):
with self.condition:
self.condition.wait()
return self.frame
class JpegStream:
def __init__(self):
self.active = False
self.connections = set()
self.picam2 = None
self.task = None
async def stream_jpeg(self):
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(
main={"size": (1920, 1080)}
)
self.picam2.configure(video_config)
output = StreamingOutput()
self.picam2.start_recording(MJPEGEncoder(), FileOutput(output), Quality.MEDIUM)
try:
while self.active:
jpeg_data = await output.read()
tasks = [
websocket.send_bytes(jpeg_data)
for websocket in self.connections.copy()
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
self.picam2.stop_recording()
self.picam2.close()
self.picam2 = None
async def start(self):
if not self.active:
self.active = True
self.task = asyncio.create_task(self.stream_jpeg())
async def stop(self):
if self.active:
self.active = False
if self.task:
await self.task
self.task = None
jpeg_stream = JpegStream()
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
print("done")
await jpeg_stream.stop()
app = FastAPI(lifespan=lifespan)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
jpeg_stream.connections.add(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
pass
finally:
jpeg_stream.connections.remove(websocket)
if not jpeg_stream.connections:
await jpeg_stream.stop()
@app.post("/start")
async def start_stream():
await jpeg_stream.start()
return {"message": "Stream started"}
@app.post("/stop")
async def stop_stream():
await jpeg_stream.stop()
return {"message": "Stream stopped"}
Now we need to add websockets to the react app to view the stream.
"use client";
import { useState, useCallback, useRef } from "react";
import useWebSocket, { ReadyState } from "react-use-websocket";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Alert, AlertDescription } from "@/components/ui/alert";
export default function CameraStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<string | null>(null);
const imgRef = useRef<HTMLImageElement>(null);
const { lastMessage, readyState } = useWebSocket("/py/ws", {
shouldReconnect: () => true,
});
if (imgRef.current?.src) {
URL.revokeObjectURL(imgRef.current.src);
}
if (lastMessage?.data instanceof Blob) {
const url = URL.createObjectURL(lastMessage.data);
if (imgRef.current) imgRef.current.src = url;
}
const toggleStream = useCallback(async () => {
const action = isStreaming ? "stop" : "start";
await fetch(`/py/${action}`, { method: "POST" });
setIsStreaming(!isStreaming);
}, [isStreaming]);
const connectionStatus = ReadyState[readyState];
return (
<Card className="w-full max-w-2xl mx-auto">
<CardHeader>
<CardTitle>JPEG Stream</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
<div className="flex items-center justify-between">
<span className="text-sm text-gray-500">
Status: {connectionStatus}
</span>
<Button
onClick={toggleStream}
variant={isStreaming ? "destructive" : "default"}
>
{isStreaming ? "Stop" : "Start"} Stream
</Button>
</div>
{error && (
<Alert variant="destructive">
<AlertDescription>{error}</AlertDescription>
</Alert>
)}
<div className="relative bg-gray-100 aspect-video">
{readyState !== ReadyState.OPEN && (
<div className="absolute inset-0 flex items-center justify-center">
<span className="text-gray-500">{connectionStatus}...</span>
</div>
)}
<img
ref={imgRef}
alt="JPEG Stream"
className="w-full h-full object-contain"
/>
</div>
</CardContent>
</Card>
);
}
"use client";
import { useState, useCallback, useRef } from "react";
import useWebSocket, { ReadyState } from "react-use-websocket";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Alert, AlertDescription } from "@/components/ui/alert";
export default function CameraStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<string | null>(null);
const imgRef = useRef<HTMLImageElement>(null);
const { lastMessage, readyState } = useWebSocket("/py/ws", {
shouldReconnect: () => true,
});
if (imgRef.current?.src) {
URL.revokeObjectURL(imgRef.current.src);
}
if (lastMessage?.data instanceof Blob) {
const url = URL.createObjectURL(lastMessage.data);
if (imgRef.current) imgRef.current.src = url;
}
const toggleStream = useCallback(async () => {
const action = isStreaming ? "stop" : "start";
await fetch(`/py/${action}`, { method: "POST" });
setIsStreaming(!isStreaming);
}, [isStreaming]);
const connectionStatus = ReadyState[readyState];
return (
<Card className="w-full max-w-2xl mx-auto">
<CardHeader>
<CardTitle>JPEG Stream</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
<div className="flex items-center justify-between">
<span className="text-sm text-gray-500">
Status: {connectionStatus}
</span>
<Button
onClick={toggleStream}
variant={isStreaming ? "destructive" : "default"}
>
{isStreaming ? "Stop" : "Start"} Stream
</Button>
</div>
{error && (
<Alert variant="destructive">
<AlertDescription>{error}</AlertDescription>
</Alert>
)}
<div className="relative bg-gray-100 aspect-video">
{readyState !== ReadyState.OPEN && (
<div className="absolute inset-0 flex items-center justify-center">
<span className="text-gray-500">{connectionStatus}...</span>
</div>
)}
<img
ref={imgRef}
alt="JPEG Stream"
className="w-full h-full object-contain"
/>
</div>
</CardContent>
</Card>
);
}
"use client";
import { useState, useCallback, useRef } from "react";
import useWebSocket, { ReadyState } from "react-use-websocket";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Alert, AlertDescription } from "@/components/ui/alert";
export default function CameraStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<string | null>(null);
const imgRef = useRef<HTMLImageElement>(null);
const { lastMessage, readyState } = useWebSocket("/py/ws", {
shouldReconnect: () => true,
});
if (imgRef.current?.src) {
URL.revokeObjectURL(imgRef.current.src);
}
if (lastMessage?.data instanceof Blob) {
const url = URL.createObjectURL(lastMessage.data);
if (imgRef.current) imgRef.current.src = url;
}
const toggleStream = useCallback(async () => {
const action = isStreaming ? "stop" : "start";
await fetch(`/py/${action}`, { method: "POST" });
setIsStreaming(!isStreaming);
}, [isStreaming]);
const connectionStatus = ReadyState[readyState];
return (
<Card className="w-full max-w-2xl mx-auto">
<CardHeader>
<CardTitle>JPEG Stream</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
<div className="flex items-center justify-between">
<span className="text-sm text-gray-500">
Status: {connectionStatus}
</span>
<Button
onClick={toggleStream}
variant={isStreaming ? "destructive" : "default"}
>
{isStreaming ? "Stop" : "Start"} Stream
</Button>
</div>
{error && (
<Alert variant="destructive">
<AlertDescription>{error}</AlertDescription>
</Alert>
)}
<div className="relative bg-gray-100 aspect-video">
{readyState !== ReadyState.OPEN && (
<div className="absolute inset-0 flex items-center justify-center">
<span className="text-gray-500">{connectionStatus}...</span>
</div>
)}
<img
ref={imgRef}
alt="JPEG Stream"
className="w-full h-full object-contain"
/>
</div>
</CardContent>
</Card>
);
}
"use client";
import { useState, useCallback, useRef } from "react";
import useWebSocket, { ReadyState } from "react-use-websocket";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Alert, AlertDescription } from "@/components/ui/alert";
export default function CameraStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState<string | null>(null);
const imgRef = useRef<HTMLImageElement>(null);
const { lastMessage, readyState } = useWebSocket("/py/ws", {
shouldReconnect: () => true,
});
if (imgRef.current?.src) {
URL.revokeObjectURL(imgRef.current.src);
}
if (lastMessage?.data instanceof Blob) {
const url = URL.createObjectURL(lastMessage.data);
if (imgRef.current) imgRef.current.src = url;
}
const toggleStream = useCallback(async () => {
const action = isStreaming ? "stop" : "start";
await fetch(`/py/${action}`, { method: "POST" });
setIsStreaming(!isStreaming);
}, [isStreaming]);
const connectionStatus = ReadyState[readyState];
return (
<Card className="w-full max-w-2xl mx-auto">
<CardHeader>
<CardTitle>JPEG Stream</CardTitle>
</CardHeader>
<CardContent className="space-y-4">
<div className="flex items-center justify-between">
<span className="text-sm text-gray-500">
Status: {connectionStatus}
</span>
<Button
onClick={toggleStream}
variant={isStreaming ? "destructive" : "default"}
>
{isStreaming ? "Stop" : "Start"} Stream
</Button>
</div>
{error && (
<Alert variant="destructive">
<AlertDescription>{error}</AlertDescription>
</Alert>
)}
<div className="relative bg-gray-100 aspect-video">
{readyState !== ReadyState.OPEN && (
<div className="absolute inset-0 flex items-center justify-center">
<span className="text-gray-500">{connectionStatus}...</span>
</div>
)}
<img
ref={imgRef}
alt="JPEG Stream"
className="w-full h-full object-contain"
/>
</div>
</CardContent>
</Card>
);
}
npm i react-use-websocket
npm i react-use-websocket
npm i react-use-websocket
npm i react-use-websocket
npx shadcn-ui@latest add button alert
npx shadcn-ui@latest add button alert
npx shadcn-ui@latest add button alert
npx shadcn-ui@latest add button alert