Compare commits
2 Commits
c6c6eb1034
...
b3572fb730
| Author | SHA1 | Date |
|---|---|---|
|
|
b3572fb730 | 3 years ago |
|
|
1794682da5 | 3 years ago |
6 changed files with 224 additions and 0 deletions
@ -0,0 +1,40 @@ |
|||
import cv2 |
|||
import numpy as np |
|||
from IDataStreamModule import IDataStreamModule |
|||
|
|||
|
|||
class DataStreamModule(IDataStreamModule): |
|||
|
|||
# Method to find available cameras and return their IDs |
|||
def findCameras(self): |
|||
# List to store the IDs of the found cameras |
|||
cameras = [] |
|||
# Iterating through all possible camera IDs |
|||
i = 0 |
|||
while True: |
|||
cap = cv2.VideoCapture(i) |
|||
if not cap.isOpened(): |
|||
# No further cameras found, break the loop |
|||
break |
|||
# Camera found, store ID in the list |
|||
cameras.append(i) |
|||
cap.release() |
|||
i += 1 |
|||
# Return the IDs of the found cameras |
|||
return cameras |
|||
|
|||
# Method to get the camera stream from a given camera |
|||
def get_camera_stream(self, camera_name): |
|||
cap = cv2.VideoCapture(camera_name) |
|||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|||
current_frame = 0 |
|||
while cap.isOpened(): |
|||
ret, frame = cap.read() |
|||
# Yield each frame as a numpy array |
|||
yield np.array(frame) |
|||
current_frame += 1 |
|||
if current_frame == total_frames - 1: |
|||
# If the current frame is the second-to-last frame, reset to the beginning of the file |
|||
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) |
|||
current_frame = 0 |
|||
cap.release() |
|||
@ -0,0 +1,88 @@ |
|||
import os |
|||
|
|||
import cv2 |
|||
import mediapipe as mp |
|||
from DataStreamModule import DataStreamModule |
|||
from IHPEModule import IHPEModule |
|||
|
|||
# initialize mediapipe drawing utilities and pose models |
|||
mp_drawing = mp.solutions.drawing_utils |
|||
mp_pose = mp.solutions.pose |
|||
|
|||
|
|||
# This class implements Human Pose Estimation (HPE) using the mediapipe library |
|||
class HPEModule(IHPEModule): |
|||
|
|||
# This method starts HPE using a camera specified by its name |
|||
def startHPEwithCamera(self, camera_name): |
|||
out = None |
|||
# check if the camera_name is a file path or not |
|||
if os.path.isfile(camera_name): |
|||
# open the video file using cv2.VideoCapture |
|||
cap = cv2.VideoCapture(camera_name) |
|||
# set the output video file path |
|||
output_path = os.path.splitext(camera_name)[0] + "_output.mp4" |
|||
# get the frame rate and size of the input video |
|||
fps = cap.get(cv2.CAP_PROP_FPS) |
|||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|||
|
|||
# initialize video writer to save the output video |
|||
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|||
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|||
else: |
|||
# initialize data stream and camera object |
|||
data_stream = DataStreamModule() |
|||
cap = data_stream.get_camera_stream(camera_name) |
|||
|
|||
# set the window name using the camera name |
|||
window_name = f"Pose Estimation on Camera {camera_name}" |
|||
|
|||
# start pose detection |
|||
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: |
|||
while True: |
|||
# get the next frame from the camera |
|||
if out is not None: |
|||
ret, frame = cap.read() |
|||
if not ret: |
|||
break |
|||
else: |
|||
frame = next(cap) |
|||
|
|||
|
|||
|
|||
# Recolor image to RGB |
|||
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|||
image.flags.writeable = False |
|||
|
|||
# Make detection |
|||
results = pose.process(image) |
|||
|
|||
# Recolor back to BGR |
|||
image.flags.writeable = True |
|||
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) |
|||
|
|||
# Extract landmarks |
|||
try: |
|||
landmarks = results.pose_landmarks.landmark |
|||
print(landmarks) |
|||
except: |
|||
pass |
|||
|
|||
# Render detections |
|||
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, |
|||
mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2), |
|||
mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2) |
|||
) |
|||
|
|||
cv2.imshow(window_name, image) |
|||
|
|||
if out is not None: |
|||
out.write(image) |
|||
|
|||
keyCode = cv2.waitKey(1) |
|||
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) < 1: |
|||
break |
|||
|
|||
# destroy the window after exiting the loop |
|||
#cv2.destroyWindow(window_name) |
|||
@ -0,0 +1,14 @@ |
|||
from abc import ABC, abstractmethod |
|||
|
|||
|
|||
class IDataStreamModule(ABC): |
|||
|
|||
# Method to find available cameras and return their IDs |
|||
@abstractmethod |
|||
def findCameras(self): |
|||
pass |
|||
|
|||
# Method to get the camera stream from a given camera |
|||
@abstractmethod |
|||
def get_camera_stream(self, camera_name): |
|||
pass |
|||
@ -0,0 +1,9 @@ |
|||
from abc import ABC, abstractmethod |
|||
|
|||
|
|||
class IHPEModule(ABC): |
|||
|
|||
# This method starts HPE using a camera specified by its name |
|||
@abstractmethod |
|||
def startHPEwithCamera(self, camera_name): |
|||
pass |
|||
@ -0,0 +1,68 @@ |
|||
import tkinter as tk |
|||
from tkinter import filedialog |
|||
from tkinter import Label |
|||
from DataStreamModule import DataStreamModule |
|||
from HPEModule import HPEModule |
|||
import threading |
|||
|
|||
|
|||
class PoseEstimationGUI: |
|||
|
|||
def __init__(self): |
|||
self.mp4_file_path = None |
|||
self.root = tk.Tk() |
|||
self.root.title("Pose Estimation GUI") |
|||
self.root.geometry("400x200") |
|||
|
|||
self.camera_options = [] # Initialize camera options list |
|||
|
|||
# Create a drop-down menu with available cameras |
|||
self.camera_options = DataStreamModule().findCameras() |
|||
self.selected_camera = tk.StringVar() |
|||
self.selected_camera.set(self.camera_options[0]) # default value |
|||
camera_menu = tk.OptionMenu(self.root, self.selected_camera, *self.camera_options) |
|||
camera_menu.pack() |
|||
|
|||
# Create a button to start pose estimation |
|||
self.start_button = tk.Button(self.root, text="Start Pose Estimation", command=self.start_pose_estimation) |
|||
self.start_button.pack() |
|||
|
|||
# Create a button to select an mp4 file |
|||
self.select_file_button = tk.Button(self.root, text="Select MP4 File to Start HPE on", command=self.select_file) |
|||
self.select_file_button.pack() |
|||
|
|||
# Create label widget to indicate file selection status |
|||
self.file_selected_label = Label(self.root, text="No file selected", fg="red") |
|||
self.file_selected_label.pack(pady=10) |
|||
|
|||
self.root.mainloop() |
|||
|
|||
def start_pose_estimation(self): |
|||
if self.mp4_file_path is not None: |
|||
# Start pose estimation on the selected mp4 file |
|||
pose_estimator = HPEModule() |
|||
pose_thread_file = threading.Thread(target=pose_estimator.startHPEwithCamera, args=(self.mp4_file_path,)) |
|||
self.mp4_file_path = None |
|||
self.file_selected_label.config(text="No File selected", fg="red") |
|||
pose_thread_file.start() |
|||
else: |
|||
# Start pose estimation on the selected camera |
|||
# print(self.selected_camera.get()) |
|||
pose_estimator = HPEModule() |
|||
camera = int(self.selected_camera.get()) |
|||
pose_thread_camera = threading.Thread(target=pose_estimator.startHPEwithCamera, args=(camera,)) |
|||
pose_thread_camera.start() |
|||
|
|||
def select_file(self): |
|||
self.mp4_file_path = filedialog.askopenfilename(initialdir="/", title="Select MP4 File to Start HPE on", |
|||
filetypes=(("mp4 files", "*.mp4"), ("all files", "*.*"))) |
|||
# Update file selection status label |
|||
if self.mp4_file_path: |
|||
self.file_selected_label.config(text="File selected", fg="green") |
|||
else: |
|||
self.file_selected_label.config(text="No File selected", fg="red") |
|||
self.mp4_file_path = None |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
PoseEstimationGUI() |
|||
@ -0,0 +1,5 @@ |
|||
from HPEModule import HPEModule |
|||
|
|||
if __name__ == '__main__': |
|||
pe = HPEModule() # use camera 0 |
|||
pe.startHPEwithCamera(0) |
|||
Loading…
Reference in new issue