import picamera from datetime import datetime import io import logging import socketserver from threading import Condition from http import server from time import sleep from PIL import ImageFont, ImageDraw, Image import cv2 import traceback from picamera.array import PiRGBArray import time import numpy as np import datetime as dt fcam0=250 # фокусное расстояние в пикселях z1=100 xt=0 flag=0 ev=0 # компенсация экспозиции def camz(z): global fcam, K, D, Knew, z1 z1=float(z) cx=320-6*xt*(1-z1/100) cy=240 fcam=100*fcam0/(z1+0) fsc=0.85 +(100-z1)/(500-2*z1) K = np.array([[ fcam, 0. , cx], [ 0. , fcam, cy], [ 0. , 0. , 1. ]]) D = np.array([0., 0., 0., 0.]) Knew = K.copy() Knew[(0,1), (0,1)] = fsc * Knew[(0,1), (0,1)] #x=(1-z1/100)/2 +xt*(1-z1/100)/100 #y=(1-z1/100)/2 #w=z1/100 #h=z1/100 #self.camera.zoom = (x,y,w,h) #finfo=cv2.calibrationMatrixValues(K,(640,480),36,24) #print(finfo) #print(z1,x,y,w,h) PAGE="""\ Raspberry Pi

Raspberry Pi

Снимок

""" Redirection="""""" class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() self.camera=picamera.PiCamera(resolution='640x480', framerate=12) def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf) def shot_camera(self): self.camera.wait_recording(0.5) self.camera.stop_recording() sleep(0.5) self.camera.resolution=(2592, 1944) #self.camera.start_preview() sleep(0.5) self.camera.capture("/home/pi/" + datetime.now().strftime("%d-%b-%Y.(%H_%M_%S_%f)") + ".jpg") self.camera.resolution=(640,480) self.camera.start_recording(self, format='mjpeg') def shot_camera3(self): global flag flag=1 def shot_camera2(self): global flag flag=0 def shot_camera0(self): global ev ev=ev-2 self.camera.exposure_compensation =ev def shot_camera4(self): global ev ev=ev+2 self.camera.exposure_compensation =ev def start_recording(self): #Uncomment the next line to change your Pi's Camera rotation (in degrees) #camera.rotation = 90 self.camera.start_recording(self, format='mjpeg') def stop_camera(self): self.camera.stop_recording() class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame if flag == 1: # Convert to PIL Image npframe = np.fromstring(frame, dtype=np.uint8) pil_frame = cv2.imdecode(npframe,1) pil_frame = cv2.fisheye.undistortImage(pil_frame, K, D=D, Knew=Knew) cv2_im_rgb = cv2.cvtColor(pil_frame, cv2.COLOR_BGR2RGB) pil_im = Image.fromarray(cv2_im_rgb) buf= io.BytesIO() pil_im.save(buf, format= 'JPEG') frame = buf.getvalue() self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) elif self.path == '/capture/': output.shot_camera() content = Redirection.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/index.html?hid1=': output.shot_camera2() content = Redirection.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/index.html?hid=': output.shot_camera3() content = Redirection.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/index.html?hid0=': output.shot_camera0() content = Redirection.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/index.html?hid2=': output.shot_camera4() content = Redirection.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True camz(100) output = StreamingOutput() output.start_recording() try: address = ('', 8000) serveur = StreamingServer(address, StreamingHandler) serveur.serve_forever() finally: output.stop_camera()