This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0b4fd51 [TE] Implement Threshold based Time Window Suppressor (#3463)
0b4fd51 is described below
commit 0b4fd51e7e442d07d3caf95117e1d9cc4e0e33ab
Author: Akshay Rai <[email protected]>
AuthorDate: Tue Nov 13 15:12:00 2018 -0800
[TE] Implement Threshold based Time Window Suppressor (#3463)
---
.../detection/alert/DetectionAlertTaskFactory.java | 7 +-
.../detection/alert/DetectionAlertTaskRunner.java | 3 +-
.../alert/suppress/DetectionAlertSuppressor.java | 7 +
.../DetectionAlertTimeWindowSuppressor.java | 127 +++++++++++++++++
.../DetectionTimeWindowSuppressorTest.java | 157 +++++++++++++++++++++
5 files changed, 295 insertions(+), 6 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
index 3a51c95..61d118f 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
@@ -85,8 +85,7 @@ public class DetectionAlertTaskFactory {
return detectionAlertSchemeSet;
}
- public Set<DetectionAlertSuppressor>
loadAlertSuppressors(DetectionAlertConfigDTO alertConfig,
- ThirdEyeAnomalyConfiguration thirdeyeConfig) throws Exception {
+ public Set<DetectionAlertSuppressor>
loadAlertSuppressors(DetectionAlertConfigDTO alertConfig) throws Exception {
Preconditions.checkNotNull(alertConfig);
Set<DetectionAlertSuppressor> detectionAlertSuppressors = new HashSet<>();
Map<String, Map<String, Object>> alertSuppressors =
alertConfig.getAlertSuppressors();
@@ -99,8 +98,8 @@ public class DetectionAlertTaskFactory {
Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor));
Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor).get("className"));
Constructor<?> constructor =
Class.forName(alertSuppressors.get(alertSuppressor).get("className").toString().trim())
- .getConstructor(DetectionAlertConfigDTO.class,
ThirdEyeAnomalyConfiguration.class);
- detectionAlertSuppressors.add((DetectionAlertSuppressor)
constructor.newInstance(alertConfig, thirdeyeConfig));
+ .getConstructor(DetectionAlertConfigDTO.class);
+ detectionAlertSuppressors.add((DetectionAlertSuppressor)
constructor.newInstance(alertConfig));
}
return detectionAlertSuppressors;
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 4bc9b3a..669b442 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -101,8 +101,7 @@ public class DetectionAlertTaskRunner implements TaskRunner
{
DetectionAlertFilterResult result = alertFilter.run();
// Suppress alerts if any and get the filtered anomalies to be notified
- Set<DetectionAlertSuppressor> alertSuppressors =
- detAlertTaskFactory.loadAlertSuppressors(alertConfig,
taskContext.getThirdEyeAnomalyConfiguration());
+ Set<DetectionAlertSuppressor> alertSuppressors =
detAlertTaskFactory.loadAlertSuppressors(alertConfig);
for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) {
result = alertSuppressor.run(result);
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
index 72a67b0..6ff87b2 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
@@ -1,5 +1,6 @@
package com.linkedin.thirdeye.detection.alert.suppress;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
@@ -11,5 +12,11 @@ import
com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
*/
public abstract class DetectionAlertSuppressor {
+ protected final DetectionAlertConfigDTO config;
+
+ public DetectionAlertSuppressor(DetectionAlertConfigDTO config) {
+ this.config = config;
+ }
+
public abstract DetectionAlertFilterResult run(DetectionAlertFilterResult
result) throws Exception;
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
new file mode 100644
index 0000000..9fd5ded
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
@@ -0,0 +1,127 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyFeedback;
+import com.linkedin.thirdeye.constant.AnomalyFeedbackType;
+import com.linkedin.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datasource.DAORegistry;
+import com.linkedin.thirdeye.detection.ConfigUtils;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Suppress alerts from anomalies generated during a specific time period.
+ *
+ * This class enables 2 ways of suppressing alerts
+ * 1. Suppress all the alerts generated during the time window. No alerts will
be sent.
+ * ({@link #WINDOW_START_TIME_KEY} and {@link #WINDOW_END_TIME_KEY})
+ * 2. Suppress alerts in the time window based on some thresholds.
+ * ({@link #EXPECTED_CHANGE_KEY} and {@link #ACCEPTABLE_DEVIATION_KEY})
+ */
+public class DetectionAlertTimeWindowSuppressor extends
DetectionAlertSuppressor {
+ private static final Logger LOG =
LoggerFactory.getLogger(DetectionAlertTimeWindowSuppressor.class);
+
+ static final String TIME_WINDOW_SUPPRESSOR_KEY = "timeWindowSuppressor";
+ static final String TIME_WINDOWS_KEY = "timeWindows";
+
+ static final String WINDOW_START_TIME_KEY = "windowStartTime";
+ static final String WINDOW_END_TIME_KEY = "windowEndTime";
+ static final String IS_THRESHOLD_KEY = "isThresholdApplied";
+
+ // The expected rise or fall of a metric during the holiday or suppression
period (ex: -0.5 for 50% drop)
+ static final String EXPECTED_CHANGE_KEY = "expectedChange";
+
+ // The acceptable deviation from the dropped/risen value during the
suppression period (ex: 0.1 for +/- 10%)
+ static final String ACCEPTABLE_DEVIATION_KEY = "acceptableDeviation";
+
+ public DetectionAlertTimeWindowSuppressor(DetectionAlertConfigDTO config) {
+ super(config);
+ }
+
+ private boolean isAnomalySuppressedByThreshold(double anomalyWeight,
Map<String, Object> suppressWindowProps) {
+ double expectedDropOrSpike = (Double)
suppressWindowProps.get(EXPECTED_CHANGE_KEY);
+ double acceptableDeviation = (Double)
suppressWindowProps.get(ACCEPTABLE_DEVIATION_KEY);
+ if (anomalyWeight <= (expectedDropOrSpike + acceptableDeviation)
+ && anomalyWeight >= (expectedDropOrSpike - acceptableDeviation)) {
+ LOG.info("Anomaly id {} falls within the specified thresholds
(anomalyWeight = {}, expectedDropOrSpike = {},"
+ + " acceptableDeviation = {})", anomalyWeight,
expectedDropOrSpike, acceptableDeviation);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Check if the anomaly needs to be suppressed. An anomaly is suppressed if
the startTime
+ * of the anomaly falls in the suppression time window and is within the
user's expected range.
+ */
+ private boolean isAnomalySuppressed(MergedAnomalyResultDTO anomaly,
Map<String, Object> suppressWindowProps) {
+ boolean shouldSuppress = false;
+ try {
+ long windowStartTime = (Long)
suppressWindowProps.get(WINDOW_START_TIME_KEY);
+ long windowEndTime = (Long) suppressWindowProps.get(WINDOW_END_TIME_KEY);
+ if (anomaly.getStartTime() >= windowStartTime && anomaly.getStartTime()
< windowEndTime) {
+ LOG.info("Anomaly id {} falls in the suppression time window ({},
{})", anomaly.getId(), windowStartTime, windowEndTime);
+ if (suppressWindowProps.get(IS_THRESHOLD_KEY) != null && (Boolean)
suppressWindowProps.get(IS_THRESHOLD_KEY)) {
+ shouldSuppress = isAnomalySuppressedByThreshold(anomaly.getWeight(),
suppressWindowProps);
+ } else {
+ shouldSuppress = true;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while suppressing anomaly id {} with suppress window
properties {}", anomaly.getId(),
+ suppressWindowProps, e);
+ }
+
+ return shouldSuppress;
+ }
+
+ private void filterOutSuppressedAnomalies(final Set<MergedAnomalyResultDTO>
anomalies) {
+ Iterator<MergedAnomalyResultDTO> anomaliesIt = anomalies.iterator();
+ MergedAnomalyResultManager anomalyMergedResultDAO =
DAORegistry.getInstance().getMergedAnomalyResultDAO();
+
+ List<Map<String, Object>> suppressWindowPropsList
+ =
ConfigUtils.getList(config.getAlertSuppressors().get(TIME_WINDOW_SUPPRESSOR_KEY).get(TIME_WINDOWS_KEY));
+
+ while (anomaliesIt.hasNext()) {
+ MergedAnomalyResultDTO anomaly = anomaliesIt.next();
+ for (Map<String, Object> suppressWindowProps : suppressWindowPropsList) {
+ if (isAnomalySuppressed(anomaly, suppressWindowProps)) {
+ LOG.info("Suppressing anomaly id {} with suppress properties {}.
Anomaly Details = {}", anomaly.getId(), suppressWindowProps, anomaly);
+ anomaliesIt.remove();
+ AnomalyFeedback feedback = anomaly.getFeedback();
+ if (feedback == null) {
+ feedback = new AnomalyFeedbackDTO();
+ }
+
+ // Suppressing is a way by which users admit that anomalies during
this period
+ // are expected. We also do not want the algorithm to readjust the
baseline.
+ feedback.setFeedbackType(AnomalyFeedbackType.ANOMALY);
+ feedback.setComment("Suppressed anomaly. Auto-labeling as true
anomaly.");
+
+ anomaly.setFeedback(feedback);
+ anomalyMergedResultDAO.updateAnomalyFeedback(anomaly);
+ }
+ }
+ }
+ }
+
+ @Override
+ public DetectionAlertFilterResult run(DetectionAlertFilterResult results)
throws Exception {
+ Preconditions.checkNotNull(results);
+ for (Set<MergedAnomalyResultDTO> anomalies : results.getResult().values())
{
+ filterOutSuppressedAnomalies(anomalies);
+ }
+
+ return results;
+ }
+}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
new file mode 100644
index 0000000..2dddd4a
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
@@ -0,0 +1,157 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.linkedin.thirdeye.datalayer.bao.DAOTestBase;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static
com.linkedin.thirdeye.detection.alert.suppress.DetectionAlertTimeWindowSuppressor.*;
+
+
+public class DetectionTimeWindowSuppressorTest {
+
+ private DAOTestBase testDAOProvider;
+ private Set<MergedAnomalyResultDTO> anomalies;
+ private DetectionAlertConfigDTO config;
+
+ private Map<String, Object> createSuppressWindow(long startTime, long
endTime, boolean isThreshold, double expectedChange,
+ double acceptableDeviation) {
+ Map<String, Object> suppressWindowProps = new HashMap<>();
+ suppressWindowProps.put(WINDOW_START_TIME_KEY, startTime);
+ suppressWindowProps.put(WINDOW_END_TIME_KEY, endTime);
+ suppressWindowProps.put(IS_THRESHOLD_KEY, isThreshold);
+ suppressWindowProps.put(EXPECTED_CHANGE_KEY, expectedChange);
+ suppressWindowProps.put(ACCEPTABLE_DEVIATION_KEY, acceptableDeviation);
+ return suppressWindowProps;
+ }
+
+ private MergedAnomalyResultDTO createAnomaly(long id, long startTime, long
endTime, double weight) {
+ MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+ anomaly.setId(id);
+ anomaly.setStartTime(startTime);
+ anomaly.setEndTime(endTime);
+ anomaly.setWeight(weight);
+ return anomaly;
+ }
+
+ private void initDetectionAlertConfig() {
+ config = new DetectionAlertConfigDTO();
+
+ List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+ suppressWindowList.add(createSuppressWindow(1000, 3000, true, 0.5, 0.1));
+ suppressWindowList.add(createSuppressWindow(4500, 6000, true, 0.6, 0.2));
+
+ Map<String, Object> params = new HashMap<>();
+ params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+ Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+ alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+ config.setAlertSuppressors(alertSuppressors);
+ }
+
+ private void initAnomalies() {
+ anomalies = new HashSet<>();
+
+ anomalies.add(createAnomaly(1l, 500, 900, 0.5));
+ anomalies.add(createAnomaly(2l, 700, 1000, 0.8));
+ anomalies.add(createAnomaly(3l, 500, 1500, 0.2));
+ anomalies.add(createAnomaly(4l, 1000, 1500, 0.4));
+ anomalies.add(createAnomaly(5l, 1500, 2500, 0.6));
+ anomalies.add(createAnomaly(6l, 2500, 3000, 0.7));
+ anomalies.add(createAnomaly(7l, 2000, 3500, 0.5));
+ anomalies.add(createAnomaly(8l, 3000, 3500, 0.6));
+ anomalies.add(createAnomaly(9l, 3500, 4000, 0.1));
+ anomalies.add(createAnomaly(10l, 5000, 5500, 0.5));
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ this.testDAOProvider = DAOTestBase.getInstance();
+ }
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+ initAnomalies();
+ initDetectionAlertConfig();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ testDAOProvider.cleanup();
+ }
+
+ /**
+ * Anomaly distribution along with suppression windows.
+ *
+ * Anomalies 4, 5, 7, and 10 should be suppressed (not notified).
+ * Anomaly 6 is not suppressed because it falls outside the suppression
region.
+ *
+ * *-----3----* *------7-------*
+ * |
+ * | *-2-* *----5----* *--8-*
+ * | | |
+ * *-1-* *--4-* *--6-* *--9-* *---10---*
+ * | | |
+ * _____|_____|___________________|______________|________________|
+ * | | | | |
+ * 500 | | | |
+ * 1000----<window1>----3000 4500--<window2>--6000
+ */
+ @Test
+ public void testTimeWindowSuppressorWithThreshold() throws Exception {
+
+ DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+ result.addMapping(new
DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+ DetectionAlertTimeWindowSuppressor suppressor = new
DetectionAlertTimeWindowSuppressor(config);
+ DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+ Set<Long> filteredAnomalyIds = new HashSet<>(Arrays.asList(1l, 2l, 3l, 6l,
8l, 9l));
+
+ Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 6);
+ for (MergedAnomalyResultDTO anomaly :
resultsAfterSuppress.getAllAnomalies()) {
+ Assert.assertTrue(filteredAnomalyIds.contains(anomaly.getId()));
+ }
+ }
+
+ /**
+ * Overlapping time window suppressor without thresholds
+ */
+ @Test
+ public void testTimeWindowSuppressor() throws Exception {
+ List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+ suppressWindowList.add(createSuppressWindow(500, 6000, false, 0, 0));
+
+ Map<String, Object> params = new HashMap<>();
+ params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+ Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+ alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+ config.setAlertSuppressors(alertSuppressors);
+
+ DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+ result.addMapping(new
DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+ DetectionAlertTimeWindowSuppressor suppressor = new
DetectionAlertTimeWindowSuppressor(config);
+ DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+ Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 0);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]