damccorm commented on code in PR #33994: URL: https://github.com/apache/beam/pull/33994#discussion_r1960051738
########## sdks/python/apache_beam/ml/anomaly/univariate/quantile.py: ########## @@ -0,0 +1,171 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating quantiles in windowed fashion. + +This module defines different types of quantile trackers that operate on +windows of data. It includes: + + * `SimpleSlidingQuantileTracker`: Calculates quantile using numpy in a sliding + window. + * `BufferedLandmarkQuantileTracker`: Sortedlist based quantile tracker in + landmark window mode. + * `BufferedSlidingQuantileTracker`: Sortedlist based quantile tracker in + sliding window mode. +""" + +import math +import typing +import warnings + +import numpy as np +from sortedcontainers import SortedList + +from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + + +class QuantileTracker(WindowedTracker): + """Abstract base class for quantile trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for quantile trackers. + """ + pass + + +class SimpleSlidingQuantileTracker(QuantileTracker): + """Sliding window quantile tracker using NumPy. + + This tracker uses NumPy's `nanquantile` function to calculate the specified + quantile of the values currently in the sliding window. It's a simple, + non-incremental approach. + + Args: + window_size: The size of the sliding window. + q: The quantile to calculate, a float between 0 and 1 (inclusive). + """ + def __init__(self, window_size, q): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + assert 0 <= q <= 1, "quantile argument should be between 0 and 1" + self._q = q Review Comment: Nit: does it make sense to abstract this check out to the base class? ########## sdks/python/apache_beam/ml/anomaly/univariate/mean.py: ########## @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Trackers for calculating mean in windowed fashion. + +This module defines different types of mean trackers that operate on windows +of data. It includes: + + * `SimpleSlidingMeanTracker`: Calculates mean using numpy in a sliding window. + * `IncLandmarkMeanTracker`: Incremental mean tracker in landmark window mode. + * `IncSlidingMeanTracker`: Incremental mean tracker in sliding window mode. +""" + +import math +import warnings + +import numpy as np + +from apache_beam.ml.anomaly.univariate.base import WindowedTracker +from apache_beam.ml.anomaly.univariate.base import WindowMode + + +class MeanTracker(WindowedTracker): + """Abstract base class for mean trackers. + + Currently, it does not add any specific functionality but provides a type + hierarchy for mean trackers. + """ + pass + + +class SimpleSlidingMeanTracker(MeanTracker): + """Sliding window mean tracker that calculates mean using NumPy. + + This tracker uses NumPy's `nanmean` function to calculate the mean of the + values currently in the sliding window. It's a simple, non-incremental + approach. + + Args: + window_size: The size of the sliding window. + """ + def __init__(self, window_size): + super().__init__(window_mode=WindowMode.SLIDING, window_size=window_size) + + def get(self): + """Calculates and returns the mean of the current sliding window. + + Returns: + float: The mean of the values in the current sliding window. + Returns NaN if the window is empty. + """ + if len(self._queue) == 0: + return float('nan') + + with warnings.catch_warnings(record=False): + warnings.simplefilter("ignore") + return np.nanmean(self._queue) + + +class IncMeanTracker(MeanTracker): + """Base class for incremental mean trackers. + + This class implements incremental calculation of the mean, which is more + efficient for streaming data as it updates the mean with each new data point + instead of recalculating from scratch. + + Args: + window_mode: A `WindowMode` enum specifying whether the window is `LANDMARK` + or `SLIDING`. + **kwargs: Keyword arguments passed to the parent class constructor. + """ + def __init__(self, window_mode, **kwargs): + super().__init__(window_mode=window_mode, **kwargs) + self._mean = 0 + + def push(self, x): + """Pushes a new value and updates the incremental mean. + + Args: + x: The new value to be pushed. + """ + if not math.isnan(x): + self._n += 1 + delta = x - self._mean + else: + delta = 0 Review Comment: Should we just early return here? If `x` is not a number, I'd expect us to just ignore it (or maybe even throw) ########## sdks/python/apache_beam/ml/anomaly/univariate/base.py: ########## @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +from collections import deque +from enum import Enum + + +class BaseTracker(abc.ABC): + """Abstract base class for all univariate trackers.""" + @abc.abstractmethod + def push(self, x): + """Push a new value to the tracker. + + Args: + x: The value to be pushed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get(self): + """Get the current tracking value. + + Returns: + The current tracked value, the type of which depends on the specific + tracker implementation. + """ + raise NotImplementedError() + + +class WindowMode(Enum): + """Enum representing the window mode for windowed trackers.""" + #: operating on all data points from the beginning. Review Comment: I think reading other things, the answer is yes, but it would be good to be explicit here. ########## sdks/python/apache_beam/ml/anomaly/univariate/base.py: ########## @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +from collections import deque +from enum import Enum + + +class BaseTracker(abc.ABC): + """Abstract base class for all univariate trackers.""" + @abc.abstractmethod + def push(self, x): + """Push a new value to the tracker. + + Args: + x: The value to be pushed. + """ + raise NotImplementedError() + + @abc.abstractmethod + def get(self): + """Get the current tracking value. + + Returns: + The current tracked value, the type of which depends on the specific + tracker implementation. + """ + raise NotImplementedError() + + +class WindowMode(Enum): + """Enum representing the window mode for windowed trackers.""" + #: operating on all data points from the beginning. Review Comment: I assume this doesn't mean we're buffering all data points we've ever seen, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
