mirror of
https://github.com/rendies/People-Counting-in-Real-Time.git
synced 2025-05-14 09:59:29 +07:00
163 lines
5.8 KiB
Python
163 lines
5.8 KiB
Python
# import the necessary packages
|
|
from scipy.spatial import distance as dist
|
|
from collections import OrderedDict
|
|
import numpy as np
|
|
|
|
class CentroidTracker:
|
|
def __init__(self, maxDisappeared=50, maxDistance=50):
|
|
# initialize the next unique object ID along with two ordered
|
|
# dictionaries used to keep track of mapping a given object
|
|
# ID to its centroid and number of consecutive frames it has
|
|
# been marked as "disappeared", respectively
|
|
self.nextObjectID = 0
|
|
self.objects = OrderedDict()
|
|
self.disappeared = OrderedDict()
|
|
|
|
# store the number of maximum consecutive frames a given
|
|
# object is allowed to be marked as "disappeared" until we
|
|
# need to deregister the object from tracking
|
|
self.maxDisappeared = maxDisappeared
|
|
|
|
# store the maximum distance between centroids to associate
|
|
# an object -- if the distance is larger than this maximum
|
|
# distance we'll start to mark the object as "disappeared"
|
|
self.maxDistance = maxDistance
|
|
|
|
def register(self, centroid):
|
|
# when registering an object we use the next available object
|
|
# ID to store the centroid
|
|
self.objects[self.nextObjectID] = centroid
|
|
self.disappeared[self.nextObjectID] = 0
|
|
self.nextObjectID += 1
|
|
|
|
def deregister(self, objectID):
|
|
# to deregister an object ID we delete the object ID from
|
|
# both of our respective dictionaries
|
|
del self.objects[objectID]
|
|
del self.disappeared[objectID]
|
|
|
|
def update(self, rects):
|
|
# check to see if the list of input bounding box rectangles
|
|
# is empty
|
|
if len(rects) == 0:
|
|
# loop over any existing tracked objects and mark them
|
|
# as disappeared
|
|
for objectID in list(self.disappeared.keys()):
|
|
self.disappeared[objectID] += 1
|
|
|
|
# if we have reached a maximum number of consecutive
|
|
# frames where a given object has been marked as
|
|
# missing, deregister it
|
|
if self.disappeared[objectID] > self.maxDisappeared:
|
|
self.deregister(objectID)
|
|
|
|
# return early as there are no centroids or tracking info
|
|
# to update
|
|
return self.objects
|
|
|
|
# initialize an array of input centroids for the current frame
|
|
inputCentroids = np.zeros((len(rects), 2), dtype="int")
|
|
|
|
# loop over the bounding box rectangles
|
|
for (i, (startX, startY, endX, endY)) in enumerate(rects):
|
|
# use the bounding box coordinates to derive the centroid
|
|
cX = int((startX + endX) / 2.0)
|
|
cY = int((startY + endY) / 2.0)
|
|
inputCentroids[i] = (cX, cY)
|
|
|
|
# if we are currently not tracking any objects take the input
|
|
# centroids and register each of them
|
|
if len(self.objects) == 0:
|
|
for i in range(0, len(inputCentroids)):
|
|
self.register(inputCentroids[i])
|
|
|
|
# otherwise, are are currently tracking objects so we need to
|
|
# try to match the input centroids to existing object
|
|
# centroids
|
|
else:
|
|
# grab the set of object IDs and corresponding centroids
|
|
objectIDs = list(self.objects.keys())
|
|
objectCentroids = list(self.objects.values())
|
|
|
|
# compute the distance between each pair of object
|
|
# centroids and input centroids, respectively -- our
|
|
# goal will be to match an input centroid to an existing
|
|
# object centroid
|
|
D = dist.cdist(np.array(objectCentroids), inputCentroids)
|
|
|
|
# in order to perform this matching we must (1) find the
|
|
# smallest value in each row and then (2) sort the row
|
|
# indexes based on their minimum values so that the row
|
|
# with the smallest value as at the *front* of the index
|
|
# list
|
|
rows = D.min(axis=1).argsort()
|
|
|
|
# next, we perform a similar process on the columns by
|
|
# finding the smallest value in each column and then
|
|
# sorting using the previously computed row index list
|
|
cols = D.argmin(axis=1)[rows]
|
|
|
|
# in order to determine if we need to update, register,
|
|
# or deregister an object we need to keep track of which
|
|
# of the rows and column indexes we have already examined
|
|
usedRows = set()
|
|
usedCols = set()
|
|
|
|
# loop over the combination of the (row, column) index
|
|
# tuples
|
|
for (row, col) in zip(rows, cols):
|
|
# if we have already examined either the row or
|
|
# column value before, ignore it
|
|
if row in usedRows or col in usedCols:
|
|
continue
|
|
|
|
# if the distance between centroids is greater than
|
|
# the maximum distance, do not associate the two
|
|
# centroids to the same object
|
|
if D[row, col] > self.maxDistance:
|
|
continue
|
|
|
|
# otherwise, grab the object ID for the current row,
|
|
# set its new centroid, and reset the disappeared
|
|
# counter
|
|
objectID = objectIDs[row]
|
|
self.objects[objectID] = inputCentroids[col]
|
|
self.disappeared[objectID] = 0
|
|
|
|
# indicate that we have examined each of the row and
|
|
# column indexes, respectively
|
|
usedRows.add(row)
|
|
usedCols.add(col)
|
|
|
|
# compute both the row and column index we have NOT yet
|
|
# examined
|
|
unusedRows = set(range(0, D.shape[0])).difference(usedRows)
|
|
unusedCols = set(range(0, D.shape[1])).difference(usedCols)
|
|
|
|
# in the event that the number of object centroids is
|
|
# equal or greater than the number of input centroids
|
|
# we need to check and see if some of these objects have
|
|
# potentially disappeared
|
|
if D.shape[0] >= D.shape[1]:
|
|
# loop over the unused row indexes
|
|
for row in unusedRows:
|
|
# grab the object ID for the corresponding row
|
|
# index and increment the disappeared counter
|
|
objectID = objectIDs[row]
|
|
self.disappeared[objectID] += 1
|
|
|
|
# check to see if the number of consecutive
|
|
# frames the object has been marked "disappeared"
|
|
# for warrants deregistering the object
|
|
if self.disappeared[objectID] > self.maxDisappeared:
|
|
self.deregister(objectID)
|
|
|
|
# otherwise, if the number of input centroids is greater
|
|
# than the number of existing object centroids we need to
|
|
# register each new input centroid as a trackable object
|
|
else:
|
|
for col in unusedCols:
|
|
self.register(inputCentroids[col])
|
|
|
|
# return the set of trackable objects
|
|
return self.objects |