Files
peepr/python/pypeepr.py

129 lines
3.6 KiB
Python

import io
import logging
import socketserver
from http import server
from threading import Condition
from picamera2 import Picamera2
from picamera2.encoders import JpegEncoder
from picamera2.outputs import FileOutput
PAGE = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>peepr</title>
<style>
body {
margin: 0;
font-family: Arial, sans-serif;
background-color: #121212;
color: #b9f084;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
}
.container {
position: relative;
display: inline-block;
}
.title {
position: absolute;
top: 10px;
left: 10px;
background: rgba(0, 0, 0, 0.5);
padding: 5px 10px;
border-radius: 5px;
font-size: 18px;
color: #b9f084;
}
img {
border-radius: 10px;
display: block;
}
</style>
</head>
<body>
<div class="container">
<div class="title">peepr</div>
<img src="stream.mjpg" width="800" height="480" alt="Webcam Stream">
</div>
</body>
</html>
"""
class StreamingOutput(io.BufferedIOBase):
def __init__(self):
self.frame = None
self.condition = Condition()
def write(self, buf):
with self.condition:
self.frame = buf
self.condition.notify_all()
class StreamingHandler(server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
content = PAGE.encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with output.condition:
output.condition.wait()
frame = output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
else:
self.send_error(404)
self.end_headers()
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
picam2 = Picamera2()
picam2.configure(picam2.create_video_configuration(main={"size": (800, 480)}))
output = StreamingOutput()
picam2.start_recording(JpegEncoder(), FileOutput(output))
try:
address = ('', 7123)
server = StreamingServer(address, StreamingHandler)
server.serve_forever()
finally:
picam2.stop_recording()