One way to clear each phase. Compare the approach with your own, then try the patterns yourself.
"""Sliding-window aggregator for streaming events.
Phase 1 fixes two bugs in `SlidingWindow`:
* eviction boundary was inclusive — events exactly at
`current_time - window_size` were being thrown away.
* `_min` / `_max` were never re-computed after eviction, so a freshly
evicted minimum stayed cached as the live min.
"""
from typing import List, Optional, Dict, Any
from collections import deque
class Event:
"""A timestamped event with a numeric value and optional tags."""
def __init__(self, timestamp: float, value: float, tags: Optional[Dict[str, str]] = None):
self.timestamp = timestamp
self.value = value
self.tags = tags or {}
class SlidingWindow:
"""Maintains a time-based sliding window of events and computes
aggregations. The window includes events with timestamps in
[now - window_size, now] (boundary inclusive)."""
def __init__(self, window_size: float):
if window_size <= 0:
raise ValueError("window_size must be > 0")
self._window_size = window_size
self._events: deque = deque()
self._sum = 0.0
self._count = 0
self._min: Optional[float] = None
self._max: Optional[float] = None
@property
def window_size(self) -> float:
return self._window_size
def add_event(self, event: Event) -> None:
"""Add an event and evict expired events."""
self._events.append(event)
self._sum += event.value
self._count += 1
if self._min is None or event.value < self._min:
self._min = event.value
if self._max is None or event.value > self._max:
self._max = event.value
self._evict(event.timestamp)
def _evict(self, current_time: float) -> None:
"""Remove events that have fallen outside the window."""
cutoff = current_time - self._window_size
evicted_any = False
# Bug 1 fix: strict `<` so events at exactly the boundary survive.
while self._events and self._events[0].timestamp < cutoff:
old = self._events.popleft()
self._sum -= old.value
self._count -= 1
evicted_any = True
# Bug 2 fix: cached min/max may be stale if the evicted event WAS
# the min or max. Recompute from what's left.
if evicted_any:
if self._count == 0:
self._min = None
self._max = None
else:
self._min = min(e.value for e in self._events)
self._max = max(e.value for e in self._events)
def get_count(self) -> int:
return self._count
def get_sum(self) -> float:
return self._sum
def get_average(self) -> Optional[float]:
if self._count == 0:
return None
return self._sum / self._count
def get_min(self) -> Optional[float]:
return self._min
def get_max(self) -> Optional[float]:
return self._max
def get_percentile(self, p: float) -> Optional[float]:
"""Return the p-th percentile (0-100) of current window values.
Raises ValueError if p is outside [0, 100]."""
if p < 0 or p > 100:
raise ValueError(f"p must be in [0, 100], got {p}")
if self._count == 0:
return None
values = sorted(e.value for e in self._events)
idx = int((p / 100) * (len(values) - 1))
return values[idx]
Sign in to join the discussion