Prerequisites

Before attempting this problem, you should be comfortable with:

  • Queues/Deques - Understanding FIFO data structures for maintaining a sliding window of elements
  • Sliding Window - Tracking a fixed-size window of recent elements as new data arrives
  • Circular Arrays - Using modular arithmetic to implement efficient fixed-size buffers

1. Array or List

Intuition

The most straightforward way to compute a moving average is to store all incoming values in a list and calculate the average of the last size elements each time. When a new value arrives, we append it to the list, then sum the most recent values up to the window size. This approach is simple to implement but recalculates the sum from scratch on every call.

Algorithm

  1. Initialize an empty list to store all incoming values and save the window size.
  2. When next(val) is called, append the new value to the list.
  3. Calculate the sum of the last size elements (or all elements if fewer than size exist).
  4. Return the sum divided by the number of elements in the window (minimum of list length and size).
class MovingAverage:
    def __init__(self, size: int):
        self.size = size
        self.queue = []

    def next(self, val: int) -> float:
        size, queue = self.size, self.queue
        queue.append(val)
        # calculate the sum of the moving window
        window_sum = sum(queue[-size:])

        return window_sum / min(len(queue), size)

Time & Space Complexity

  • Time complexity: O(NM)O(N \cdot M)
  • Space complexity: O(M)O(M)

Where NN is the size of the moving window and MM is the number of calls made to next.


2. Double-ended Queue

Intuition

Instead of recalculating the sum each time, we can maintain a running sum and a queue that holds only the elements within the current window. When the window is full and a new element arrives, we remove the oldest element from both the queue and the running sum, then add the new element. This way, each next() call runs in constant time.

Algorithm

  1. Initialize a deque (double-ended queue), a running sum window_sum, and a count of elements seen.
  2. When next(val) is called:
    • Increment the count and add the new value to the queue.
    • If the count exceeds the window size, remove the front element from the queue and subtract it from the running sum.
    • Add the new value to the running sum.
  3. Return the running sum divided by the current window size (minimum of count and size).
class MovingAverage:
    def __init__(self, size: int):
        self.size = size
        self.queue = deque()
        # number of elements seen so far
        self.window_sum = 0
        self.count = 0

    def next(self, val: int) -> float:
        self.count += 1
        # calculate the new sum by shifting the window
        self.queue.append(val)
        tail = self.queue.popleft() if self.count > self.size else 0

        self.window_sum = self.window_sum - tail + val

        return self.window_sum / min(self.size, self.count)

Time & Space Complexity

  • Time complexity: O(M)O(M)
    • Time complexity per next() call is O(1)O(1). Therefore, the total time complexity for MM calls is O(M)O(M)
  • Space complexity: O(N)O(N)

Where NN is the size of the moving window and MM is the number of calls made to next.


3. Circular Queue with Array

Intuition

A circular queue (ring buffer) avoids the overhead of shifting elements when removing from the front. We use a fixed-size array where the head pointer wraps around when it reaches the end. The element being overwritten is the one leaving the window, so we subtract it from the running sum before adding the new value. This achieves constant time per operation with minimal memory overhead.

Algorithm

  1. Initialize a fixed-size array of length size, a head pointer, a running sum, and a count of elements seen.
  2. When next(val) is called:
    • Increment the count.
    • Calculate the tail position as (head + 1) % size, which points to the oldest element that will be replaced.
    • Subtract the value at the tail position from the running sum and add the new value.
    • Move the head pointer forward: head = (head + 1) % size.
    • Store the new value at the head position.
  3. Return the running sum divided by the current window size (minimum of count and size).
class MovingAverage:
    def __init__(self, size: int):
        self.size = size
        self.queue = [0] * self.size
        self.head = self.window_sum = 0

        # number of elements seen so far
        self.count = 0

    def next(self, val: int) -> float:
        self.count += 1

        # calculate the new sum by shifting the window
        tail = (self.head + 1) % self.size
        self.window_sum = self.window_sum - self.queue[tail] + val

        # move on to the next head
        self.head = (self.head + 1) % self.size
        self.queue[self.head] = val
        return self.window_sum / min(self.size, self.count)

Time & Space Complexity

  • Time complexity: O(M)O(M)
    • Time complexity per next() call is O(1)O(1). Therefore, the total time complexity for MM calls is O(M)O(M)
  • Space complexity: O(N)O(N)

Where NN is the size of the moving window and MM is the number of calls made to next.

Common Pitfalls

Dividing by Window Size Before It Is Full

When fewer than size elements have been added, the average should be calculated using the actual count of elements, not the window size. Dividing by size when only a few elements exist produces incorrect results. For example, with size = 3 and only one element 5, the average should be 5.0, not 5/3 = 1.67.

Not Removing the Oldest Element When Window Is Full

When using a running sum approach, forgetting to subtract the element leaving the window causes the sum to grow unbounded. After receiving more than size elements, each new element should trigger removal of the oldest one from both the queue and the running sum. Failing to do this results in the average including more elements than the window size allows.