This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 153c52d [TE] Speed up minute level detection (#4053) 153c52d is described below commit 153c52de62e282f5778e0534d626922701b96355 Author: Xiaohui Sun <xh...@linkedin.com> AuthorDate: Tue Apr 2 09:50:42 2019 -0700 [TE] Speed up minute level detection (#4053) * [TE] Speed up minute level detection * [TE] Speed up minute level detection --- .../pinot/thirdeye/detection/alert/AlertUtils.java | 3 -- .../detection/wrapper/AnomalyDetectorWrapper.java | 52 ++++++++++++++-------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java index 8e6adf3..a298267 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java @@ -24,9 +24,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Collections2; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; -import com.mysql.jdbc.StringUtils; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -38,7 +36,6 @@ import javax.mail.internet.InternetAddress; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; - public class AlertUtils { private AlertUtils() { //left blank diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java index b888379..8b499ca 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java @@ -81,9 +81,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60); // disable minute level cache warm up private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1; - // fail detection job if it failed successively for the first 10 windows - private static final long EARLY_TERMINATE_WINDOW = 10; - + // fail detection job if it failed successively for the first 5 windows + private static final long EARLY_TERMINATE_WINDOW = 5; private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class); @@ -92,18 +91,18 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { private final int windowDelay; private final TimeUnit windowDelayUnit; - private final int windowSize; - private final TimeUnit windowUnit; + private int windowSize; + private TimeUnit windowUnit; private final MetricConfigDTO metric; private final MetricEntity metricEntity; private final boolean isMovingWindowDetection; // need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes. private final TimeGranularity functionFrequency; private final String detectorName; - private final long windowSizeMillis; + private long windowSizeMillis; private final DatasetConfigDTO dataset; private final DateTimeZone dateTimeZone; - private final Period bucketPeriod; + private Period bucketPeriod; private final long cachingPeriodLookback; public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) { @@ -142,6 +141,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { this.bucketPeriod = bucketStr == null ? this.getBucketSizePeriodForDataset() : Period.parse(bucketStr); this.cachingPeriodLookback = config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ? MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) : getCachingPeriodLookback(this.dataset.bucketTimeGranularity()); + + speedUpMinuteLevelDetection(); } @Override @@ -168,12 +169,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { Interval window = monitoringWindows.get(i); List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>(); try { - LOG.info("[Pipeline] running detection for config {} metricUrn {} window ({}/{}) - start {} end {}", - config.getId(), metricUrn, i, monitoringWindows.size(), window.getStart(), window.getEnd()); + LOG.info("[Pipeline] start detection for config {} metricUrn {} window ({}/{}) - start {} end {}", + config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd()); long ts = System.currentTimeMillis(); anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn); - LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies", - i, monitoringWindows.size(), window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size()); + LOG.info("[Pipeline] end detection for config {} metricUrn {} window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies", + config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd(), + System.currentTimeMillis() - ts, anomaliesForOneWindow.size()); successWindows++; } catch (DetectorDataInsufficientException e) { @@ -256,13 +258,6 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { for (Interval window : monitoringWindows){ LOG.info("Will run detection in window {}", window); } - // pre cache the time series for the whole detection time period instead of fetching for each window - if (this.cachingPeriodLookback >= 0) { - MetricSlice cacheSlice = - MetricSlice.from(this.metricEntity.getId(), startTime - cachingPeriodLookback, endTime, - this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod)); - this.provider.fetchTimeseries(Collections.singleton(cacheSlice)); - } return monitoringWindows; } catch (Exception e) { LOG.info("can't generate moving monitoring windows, calling with single detection window", e); @@ -371,4 +366,23 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS); } } -} + + /** + * Speed up minute level detection. + * + * It will generate lots of small windows if the bucket size smaller than 15 minutes and detection window larger than 1 day. + * This optimization is to change the bucket period, window size and window unit to 1 day. + * Please note we need to change all the three parameters together since the detection window is: + * [bucketPeriod_end - windowSize * windowUnit, bucketPeriod_end] + * + * It is possible to have bucketPeriod as 5 minutes but windowSize is 6 hours. + */ + private void speedUpMinuteLevelDetection() { + if (bucketPeriod.getMinutes() <= 15 && endTime - startTime >= Period.days(1).getMillis()) { + bucketPeriod = Period.days(1); + windowSize = 1; + windowUnit = TimeUnit.DAYS; + windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit); + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org