The most straightforward way to compute a moving average is to store all incoming values in a list and calculate the average of the last size elements each time. When a new value arrives, we append it to the list, then sum the most recent values up to the window size. This approach is simple to implement but recalculates the sum from scratch on every call.
next(val) is called, append the new value to the list.size elements (or all elements if fewer than size exist).class MovingAverage:
def __init__(self, size: int):
self.size = size
self.queue = []
def next(self, val: int) -> float:
size, queue = self.size, self.queue
queue.append(val)
# calculate the sum of the moving window
window_sum = sum(queue[-size:])
return window_sum / min(len(queue), size)Where is the size of the moving window and is the number of calls made to
next.
Instead of recalculating the sum each time, we can maintain a running sum and a queue that holds only the elements within the current window. When the window is full and a new element arrives, we remove the oldest element from both the queue and the running sum, then add the new element. This way, each next() call runs in constant time.
window_sum, and a count of elements seen.next(val) is called:count and add the new value to the queue.count exceeds the window size, remove the front element from the queue and subtract it from the running sum.count and size).class MovingAverage:
def __init__(self, size: int):
self.size = size
self.queue = deque()
# number of elements seen so far
self.window_sum = 0
self.count = 0
def next(self, val: int) -> float:
self.count += 1
# calculate the new sum by shifting the window
self.queue.append(val)
tail = self.queue.popleft() if self.count > self.size else 0
self.window_sum = self.window_sum - tail + val
return self.window_sum / min(self.size, self.count)next() call is . Therefore, the total time complexity for calls is Where is the size of the moving window and is the number of calls made to
next.
A circular queue (ring buffer) avoids the overhead of shifting elements when removing from the front. We use a fixed-size array where the head pointer wraps around when it reaches the end. The element being overwritten is the one leaving the window, so we subtract it from the running sum before adding the new value. This achieves constant time per operation with minimal memory overhead.
size, a head pointer, a running sum, and a count of elements seen.next(val) is called:(head + 1) % size, which points to the oldest element that will be replaced.head = (head + 1) % size.class MovingAverage:
def __init__(self, size: int):
self.size = size
self.queue = [0] * self.size
self.head = self.window_sum = 0
# number of elements seen so far
self.count = 0
def next(self, val: int) -> float:
self.count += 1
# calculate the new sum by shifting the window
tail = (self.head + 1) % self.size
self.window_sum = self.window_sum - self.queue[tail] + val
# move on to the next head
self.head = (self.head + 1) % self.size
self.queue[self.head] = val
return self.window_sum / min(self.size, self.count)next() call is . Therefore, the total time complexity for calls is Where is the size of the moving window and is the number of calls made to
next.
When fewer than size elements have been added, the average should be calculated using the actual count of elements, not the window size. Dividing by size when only a few elements exist produces incorrect results. For example, with size = 3 and only one element 5, the average should be 5.0, not 5/3 = 1.67.
When using a running sum approach, forgetting to subtract the element leaving the window causes the sum to grow unbounded. After receiving more than size elements, each new element should trigger removal of the oldest one from both the queue and the running sum. Failing to do this results in the average including more elements than the window size allows.