import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch
from ultralytics import YOLO
class KalmanFilter:
def __init__(self):
# State vector [x, y, vx, vy]
self.x = np.array([[0], [0], [0], [0]])
# State covariance matrix
self.P = np.eye(4) * 1000
# State transition matrix (constant velocity model)
self.F = np.array([[1, 0, 1, 0],
[0, 1, 0, 1],
[0, 0, 1, 0],
[0, 0, 0, 1]])
# Measurement matrix (we only observe position)
self.H = np.array([[1, 0, 0, 0],
[0, 1, 0, 0]])
# Process and measurement noise
self.Q = np.eye(4) * 0.01 # Process noise covariance
self.R = np.eye(2) * 1.0 # Measurement noise covariance
# Identity matrix
self.I = np.eye(4)
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x[:2].flatten()
def update(self, z):
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
innovation = z.reshape(2, 1) - (self.H @ self.x)
self.x = self.x + K @ innovation
self.P = (self.I - K @ self.H) @ self.P
return self.x[:2].flatten()
# Real-time tracking with camera motion compensation
def track_object():
cap = cv2.VideoCapture(0) # Open webcam
model = YOLO("yolov8n.pt") # Load YOLO model
kf = KalmanFilter()
prev_gray = None # For optical flow
prev_pts = None # Keypoints from previous frame
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Compute camera motion using optical flow
if prev_gray is not None:
lk_params = dict(winSize=(15, 15), maxLevel=2,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
next_pts, status, _ = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_pts, None, **lk_params)
if next_pts is not None and prev_pts is not None:
good_new = next_pts[status == 1]
good_old = prev_pts[status == 1]
if len(good_new) > 10:
H, _ = cv2.findHomography(good_old, good_new, cv2.RANSAC, 5.0)
else:
H = np.eye(3) # No reliable motion estimate
else:
H = np.eye(3)
else:
H = np.eye(3) # First frame
prev_gray = gray.copy()
# Run YOLO detection
results = model(frame)
detections = results[0].boxes.xyxy.cpu().numpy()
if len(detections) > 0:
x1, y1, x2, y2 = detections[0]
centroid = np.array([(x1 + x2) // 2, (y1 + y2) // 2, 1])
# Transform centroid using homography to compensate for motion
stabilized_centroid = H @ centroid.T
stabilized_centroid /= stabilized_centroid[2] # Normalize
stabilized_centroid = stabilized_centroid[:2]
# Predict and update Kalman filter
predicted = kf.predict()
estimated = kf.update(stabilized_centroid)
# Draw tracking
cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
cv2.circle(frame, (int(centroid[0]), int(centroid[1])), 5, (0, 0, 255), -1) # Measured position
cv2.circle(frame, (int(estimated[0]), int(estimated[1])), 5, (255, 0, 0), -1) # Estimated position
# Detect features for optical flow in the next frame
prev_pts = cv2.goodFeaturesToTrack(gray, maxCorners=100, qualityLevel=0.3, minDistance=7)
cv2.imshow('Tracking with Motion Compensation', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
track_object()