Back to AI Coding

Streaming Analytics

One way to clear each phase. Compare the approach with your own, then try the patterns yourself.

src/window.py
"""Sliding-window aggregator for streaming events.

Phase 1 fixes two bugs in `SlidingWindow`:
  * eviction boundary was inclusive — events exactly at
    `current_time - window_size` were being thrown away.
  * `_min` / `_max` were never re-computed after eviction, so a freshly
    evicted minimum stayed cached as the live min.
"""

from typing import List, Optional, Dict, Any
from collections import deque


class Event:
    """A timestamped event with a numeric value and optional tags."""

    def __init__(self, timestamp: float, value: float, tags: Optional[Dict[str, str]] = None):
        self.timestamp = timestamp
        self.value = value
        self.tags = tags or {}


class SlidingWindow:
    """Maintains a time-based sliding window of events and computes
    aggregations. The window includes events with timestamps in
    [now - window_size, now] (boundary inclusive)."""

    def __init__(self, window_size: float):
        if window_size <= 0:
            raise ValueError("window_size must be > 0")
        self._window_size = window_size
        self._events: deque = deque()
        self._sum = 0.0
        self._count = 0
        self._min: Optional[float] = None
        self._max: Optional[float] = None

    @property
    def window_size(self) -> float:
        return self._window_size

    def add_event(self, event: Event) -> None:
        """Add an event and evict expired events."""
        self._events.append(event)
        self._sum += event.value
        self._count += 1
        if self._min is None or event.value < self._min:
            self._min = event.value
        if self._max is None or event.value > self._max:
            self._max = event.value
        self._evict(event.timestamp)

    def _evict(self, current_time: float) -> None:
        """Remove events that have fallen outside the window."""
        cutoff = current_time - self._window_size
        evicted_any = False
        # Bug 1 fix: strict `<` so events at exactly the boundary survive.
        while self._events and self._events[0].timestamp < cutoff:
            old = self._events.popleft()
            self._sum -= old.value
            self._count -= 1
            evicted_any = True
        # Bug 2 fix: cached min/max may be stale if the evicted event WAS
        # the min or max. Recompute from what's left.
        if evicted_any:
            if self._count == 0:
                self._min = None
                self._max = None
            else:
                self._min = min(e.value for e in self._events)
                self._max = max(e.value for e in self._events)

    def get_count(self) -> int:
        return self._count

    def get_sum(self) -> float:
        return self._sum

    def get_average(self) -> Optional[float]:
        if self._count == 0:
            return None
        return self._sum / self._count

    def get_min(self) -> Optional[float]:
        return self._min

    def get_max(self) -> Optional[float]:
        return self._max

    def get_percentile(self, p: float) -> Optional[float]:
        """Return the p-th percentile (0-100) of current window values.
        Raises ValueError if p is outside [0, 100]."""
        if p < 0 or p > 100:
            raise ValueError(f"p must be in [0, 100], got {p}")
        if self._count == 0:
            return None
        values = sorted(e.value for e in self._events)
        idx = int((p / 100) * (len(values) - 1))
        return values[idx]

Discuss