xiaohui-sun commented on a change in pull request #4777: [TE] add event driven 
scheduler
URL: https://github.com/apache/incubator-pinot/pull/4777#discussion_r346109592
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/detection/trigger/DataAvailabilityTaskScheduler.java
 ##########
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.anomaly.detection.trigger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.anomaly.task.TaskInfoFactory;
+import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datalayer.pojo.DetectionConfigBean;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineTaskInfo;
+import org.apache.pinot.thirdeye.formatter.DetectionConfigFormatter;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is to schedule detection tasks based on data availability events.
+ */
+public class DataAvailabilityTaskScheduler implements Runnable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataAvailabilityTaskScheduler.class);
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private ScheduledExecutorService executorService;
+  private long delayInSec; // delay time after each run for the scheduler to 
reduce DB polling
+  private int maxNotRunThresholdInMin; // default threshold if detection level 
threshold is not set
+  private Map<Long, Long> lastTaskCreateTimeMap;
+  private TaskManager taskDAO;
+  private DetectionConfigManager detectionConfigDAO;
+  private DatasetConfigManager datasetConfigDAO;
+  /**
+   * Construct an instance of {@link DataAvailabilityTaskScheduler}
+   * @param delayInSec delay after each run to avoid polling the database too 
often
+   * @param maxNotRunThresholdInMin global threshold for fallback if detection 
level one is not set
+   */
+  public DataAvailabilityTaskScheduler(long delayInSec, int 
maxNotRunThresholdInMin) {
+    this.delayInSec = delayInSec;
+    this.maxNotRunThresholdInMin = maxNotRunThresholdInMin;
+    this.lastTaskCreateTimeMap = new HashMap<>();
+    this.executorService = Executors.newSingleThreadScheduledExecutor();
+    this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.detectionConfigDAO = 
DAORegistry.getInstance().getDetectionConfigManager();
+    this.datasetConfigDAO = DAORegistry.getInstance().getDatasetConfigDAO();
+  }
+
+  @Override
+  public void run() {
+    Map<DetectionConfigDTO, Set<String>> detection2DatasetMap = new 
HashMap<>();
+    Map<String, Long> dataset2RefreshTimeMap = new HashMap<>();
+    populateDetectionMapAndDataset2RefreshTimeMap(detection2DatasetMap, 
dataset2RefreshTimeMap);
+    Map<Long, TaskDTO> runningDetection = retrieveRunningDetection();
+    int taskCount = 0;
+    for (DetectionConfigDTO detectionConfig : detection2DatasetMap.keySet()) {
+      try {
+        long detectionConfigId = detectionConfig.getId();
+        if (!runningDetection.containsKey(detectionConfigId)) {
+          if (isAllDatasetUpdated(detectionConfig, 
detection2DatasetMap.get(detectionConfig), dataset2RefreshTimeMap)) {
+            //TODO: additional check is required if detection is based on 
aggregated value across multiple data points
+            createDetectionTask(detectionConfig);
+            taskCount++;
+          } else if (isTimeForFallback(detectionConfig)) {
+            LOG.info("Scheduling a task for detection {} due to the fallback 
mechanism", detectionConfigId);
+            createDetectionTask(detectionConfig);
+            taskCount++;
+          }
+        } else {
+          LOG.info("Skipping creating detection task for detection {} because 
task {} is not finished.",
+              detectionConfigId, runningDetection.get(detectionConfigId));
+        }
+      } catch (Exception e) {
+        LOG.error("Error in scheduling a detection...", e);
+      }
+    }
+    ThirdeyeMetricsUtil.eventDrivenScheduledTaskCounter.inc(taskCount);
+    LOG.info("Schedule {} tasks in this run...", taskCount);
+
+  }
+
+  public void start() {
+    executorService.scheduleWithFixedDelay(this, 0, delayInSec, 
TimeUnit.SECONDS);
+  }
+
+  public void close() {
+    executorService.shutdownNow();
+  }
+
+  private void populateDetectionMapAndDataset2RefreshTimeMap(
 
 Review comment:
   The name is too long...
   Can we split it into two functions?
   1. initDetectionDatasetMap()
   2. initDatasetRefreshTimeMap()

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to