6 changed files with 166 additions and 0 deletions
@ -0,0 +1,40 @@ |
|||||
|
import cv2 |
||||
|
import numpy as np |
||||
|
from IDataStreamModule import IDataStreamModule |
||||
|
|
||||
|
|
||||
|
class DataStreamModule(IDataStreamModule): |
||||
|
|
||||
|
# Method to find available cameras and return their IDs |
||||
|
def findCameras(self): |
||||
|
# List to store the IDs of the found cameras |
||||
|
cameras = [] |
||||
|
# Iterating through all possible camera IDs |
||||
|
i = 0 |
||||
|
while True: |
||||
|
cap = cv2.VideoCapture(i) |
||||
|
if not cap.isOpened(): |
||||
|
# No further cameras found, break the loop |
||||
|
break |
||||
|
# Camera found, store ID in the list |
||||
|
cameras.append(i) |
||||
|
cap.release() |
||||
|
i += 1 |
||||
|
# Return the IDs of the found cameras |
||||
|
return cameras |
||||
|
|
||||
|
# Method to get the camera stream from a given camera |
||||
|
def get_camera_stream(self, camera_name): |
||||
|
cap = cv2.VideoCapture(camera_name) |
||||
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
||||
|
current_frame = 0 |
||||
|
while cap.isOpened(): |
||||
|
ret, frame = cap.read() |
||||
|
# Yield each frame as a numpy array |
||||
|
yield np.array(frame) |
||||
|
current_frame += 1 |
||||
|
if current_frame == total_frames - 1: |
||||
|
# If the current frame is the second-to-last frame, reset to the beginning of the file |
||||
|
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
||||
|
current_frame = 0 |
||||
|
cap.release() |
||||
@ -0,0 +1,60 @@ |
|||||
|
import cv2 |
||||
|
import mediapipe as mp |
||||
|
from DataStreamModule import DataStreamModule |
||||
|
from IHPEModule import IHPEModule |
||||
|
|
||||
|
# initialize mediapipe drawing utilities and pose models |
||||
|
mp_drawing = mp.solutions.drawing_utils |
||||
|
mp_pose = mp.solutions.pose |
||||
|
|
||||
|
|
||||
|
# This class implements Human Pose Estimation (HPE) using the mediapipe library |
||||
|
class HPEModule(IHPEModule): |
||||
|
|
||||
|
# This method starts HPE using a camera specified by its name |
||||
|
def startHPEwithCamera(self, camera_name): |
||||
|
|
||||
|
# initialize data stream and camera object |
||||
|
data_stream = DataStreamModule() |
||||
|
cap = data_stream.get_camera_stream(camera_name) |
||||
|
|
||||
|
# set the window name using the camera name |
||||
|
window_name = f"Pose Estimation on Camera {camera_name}" |
||||
|
|
||||
|
# start pose detection |
||||
|
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: |
||||
|
while True: |
||||
|
# get the next frame from the camera |
||||
|
frame = next(cap) |
||||
|
|
||||
|
# Recolor image to RGB |
||||
|
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
||||
|
image.flags.writeable = False |
||||
|
|
||||
|
# Make detection |
||||
|
results = pose.process(image) |
||||
|
|
||||
|
# Recolor back to BGR |
||||
|
image.flags.writeable = True |
||||
|
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) |
||||
|
|
||||
|
# Extract landmarks |
||||
|
try: |
||||
|
landmarks = results.pose_landmarks.landmark |
||||
|
print(landmarks) |
||||
|
except: |
||||
|
pass |
||||
|
|
||||
|
# Render detections |
||||
|
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, |
||||
|
mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2), |
||||
|
mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2) |
||||
|
) |
||||
|
|
||||
|
cv2.imshow(window_name, image) |
||||
|
keyCode = cv2.waitKey(1) |
||||
|
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1: |
||||
|
break |
||||
|
|
||||
|
# destroy the window after exiting the loop |
||||
|
#cv2.destroyWindow(window_name) |
||||
@ -0,0 +1,14 @@ |
|||||
|
from abc import ABC, abstractmethod |
||||
|
|
||||
|
|
||||
|
class IDataStreamModule(ABC): |
||||
|
|
||||
|
# Method to find available cameras and return their IDs |
||||
|
@abstractmethod |
||||
|
def findCameras(self): |
||||
|
pass |
||||
|
|
||||
|
# Method to get the camera stream from a given camera |
||||
|
@abstractmethod |
||||
|
def get_camera_stream(self, camera_name): |
||||
|
pass |
||||
@ -0,0 +1,9 @@ |
|||||
|
from abc import ABC, abstractmethod |
||||
|
|
||||
|
|
||||
|
class IHPEModule(ABC): |
||||
|
|
||||
|
# This method starts HPE using a camera specified by its name |
||||
|
@abstractmethod |
||||
|
def startHPEwithCamera(self, camera_name): |
||||
|
pass |
||||
@ -0,0 +1,38 @@ |
|||||
|
import tkinter as tk |
||||
|
from DataStreamModule import DataStreamModule |
||||
|
from HPEModule import HPEModule |
||||
|
import threading |
||||
|
|
||||
|
|
||||
|
class PoseEstimationGUI: |
||||
|
|
||||
|
def __init__(self): |
||||
|
self.root = tk.Tk() |
||||
|
self.root.title("Pose Estimation GUI") |
||||
|
|
||||
|
self.camera_options = [] # Initialize camera options list |
||||
|
|
||||
|
# Create a drop-down menu with available cameras |
||||
|
self.camera_options = DataStreamModule().findCameras() |
||||
|
self.selected_camera = tk.StringVar() |
||||
|
self.selected_camera.set(self.camera_options[0]) # default value |
||||
|
camera_menu = tk.OptionMenu(self.root, self.selected_camera, *self.camera_options) |
||||
|
camera_menu.pack() |
||||
|
|
||||
|
# Create a button to start pose estimation |
||||
|
self.start_button = tk.Button(self.root, text="Start Pose Estimation", command=self.start_pose_estimation) |
||||
|
self.start_button.pack() |
||||
|
|
||||
|
self.root.mainloop() |
||||
|
|
||||
|
def start_pose_estimation(self): |
||||
|
# Start pose estimation on the selected camera |
||||
|
# print(self.selected_camera.get()) |
||||
|
pose_estimator = HPEModule() |
||||
|
camera = int(self.selected_camera.get()) |
||||
|
pose_thread = threading.Thread(target=pose_estimator.startHPEwithCamera, args=(camera,)) |
||||
|
pose_thread.start() |
||||
|
|
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
PoseEstimationGUI() |
||||
@ -0,0 +1,5 @@ |
|||||
|
from HPEModule import HPEModule |
||||
|
|
||||
|
if __name__ == '__main__': |
||||
|
pe = HPEModule() # use camera 0 |
||||
|
pe.startHPEwithCamera(0) |
||||
Loading…
Reference in new issue