Jackie-Jiang commented on a change in pull request #3819: Refactor periodic task
URL: https://github.com/apache/incubator-pinot/pull/3819#discussion_r256193623
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 ##########
 @@ -44,134 +43,126 @@
  * <p><code>PinotTaskManager</code> is also responsible for checking the 
health status on each type of tasks, detect and
  * fix issues accordingly.
  */
-public class PinotTaskManager extends ControllerPeriodicTask {
+public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTaskManager.class);
 
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoProvider _clusterInfoProvider;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
-  private Map<String, List<TableConfig>> _enabledTableConfigMap;
-  private Set<String> _taskTypes;
-  private int _numTaskTypes;
-  private Map<String, String> _tasksScheduled;
-
-  public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager 
helixTaskResourceManager,
-      @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull 
ControllerConf controllerConf,
-      @Nonnull ControllerMetrics controllerMetrics) {
+  public PinotTaskManager(PinotHelixTaskResourceManager 
helixTaskResourceManager,
+      PinotHelixResourceManager helixResourceManager, ControllerConf 
controllerConf,
+      ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", 
controllerConf.getTaskManagerFrequencyInSeconds(),
         controllerConf.getPeriodicTaskInitialDelayInSeconds(), 
helixResourceManager, controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, 
helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
   }
 
-  @Override
-  protected void initTask() {
-
-  }
-
   /**
-   * Get the cluster info provider.
-   * <p>Cluster info provider might be needed to initialize task generators.
+   * Returns the cluster info provider.
+   * <p>
+   * Cluster info provider might be useful when initializing task generators.
    *
    * @return Cluster info provider
    */
-  @Nonnull
   public ClusterInfoProvider getClusterInfoProvider() {
     return _clusterInfoProvider;
   }
 
   /**
-   * Register a task generator.
-   * <p>This is for pluggable task generators.
+   * Registers a task generator.
+   * <p>
+   * This method can be used to plug in custom task generators.
    *
    * @param pinotTaskGenerator Task generator to be registered
    */
-  public void registerTaskGenerator(@Nonnull PinotTaskGenerator 
pinotTaskGenerator) {
+  public void registerTaskGenerator(PinotTaskGenerator pinotTaskGenerator) {
     _taskGeneratorRegistry.registerTaskGenerator(pinotTaskGenerator);
   }
 
   /**
    * Public API to schedule tasks. It doesn't matter whether current pinot 
controller is leader.
    */
-  public Map<String, String> scheduleTasks() {
-    process(_pinotHelixResourceManager.getAllTables());
-    return getTasksScheduled();
+  public synchronized Map<String, String> scheduleTasks() {
+    Map<String, String> tasksScheduled = 
scheduleTasks(_pinotHelixResourceManager.getAllTables());
+
+    // For non-leader controller, perform non-leader cleanup
+    if (!isStarted()) {
+      nonLeaderCleanUp();
+    }
+    return tasksScheduled;
   }
 
-  @Override
-  protected void preprocess() {
-    
_metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
+  /**
+   * Check the Pinot cluster status and schedule new tasks for the given 
tables.
+   *
+   * @param tableNamesWithType List of table names with type suffix
+   * @return Map from task type to task scheduled
+   */
+  private Map<String, String> scheduleTasks(List<String> tableNamesWithType) {
+    
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
-    _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
-    _numTaskTypes = _taskTypes.size();
-    _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
+    Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+    int numTaskTypes = taskTypes.size();
+    Map<String, List<TableConfig>> enabledTableConfigMap = new 
HashMap<>(numTaskTypes);
 
-    for (String taskType : _taskTypes) {
-      _enabledTableConfigMap.put(taskType, new ArrayList<>());
+    for (String taskType : taskTypes) {
+      enabledTableConfigMap.put(taskType, new ArrayList<>());
 
       // Ensure all task queues exist
       _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     }
-  }
 
-  @Override
-  protected void processTable(String tableNameWithType) {
-    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    if (tableConfig != null) {
-      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-      if (taskConfig != null) {
-        for (String taskType : _taskTypes) {
-          if (taskConfig.isTaskTypeEnabled(taskType)) {
-            _enabledTableConfigMap.get(taskType).add(tableConfig);
+    // Scan all table configs to get the tables with tasks enabled
+    for (String tableNameWithType : tableNamesWithType) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig != null) {
+        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+        if (taskConfig != null) {
+          for (String taskType : taskTypes) {
+            if (taskConfig.isTaskTypeEnabled(taskType)) {
+              enabledTableConfigMap.get(taskType).add(tableConfig);
+            }
           }
         }
       }
     }
-  }
-
-  @Override
-  protected void exceptionHandler(String tableNameWithType, Exception e) {
-    LOGGER.error("Exception in PinotTaskManager for table {}", 
tableNameWithType, e);
-  }
 
-  @Override
-  protected void postprocess() {
     // Generate each type of tasks
-    _tasksScheduled = new HashMap<>(_numTaskTypes);
-    for (String taskType : _taskTypes) {
+    Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
+    for (String taskType : taskTypes) {
       LOGGER.info("Generating tasks for task type: {}", taskType);
       PinotTaskGenerator pinotTaskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-      List<PinotTaskConfig> pinotTaskConfigs = 
pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
+      List<PinotTaskConfig> pinotTaskConfigs = 
pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
       int numTasks = pinotTaskConfigs.size();
       if (numTasks > 0) {
         LOGGER
             .info("Submitting {} tasks for task type: {} with task configs: 
{}", numTasks, taskType, pinotTaskConfigs);
-        _tasksScheduled.put(taskType, _helixTaskResourceManager
+        tasksScheduled.put(taskType, _helixTaskResourceManager
             .submitTask(pinotTaskConfigs, 
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
-        _metricsRegistry.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+        _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
       }
     }
-  }
 
-  /**
-   * Returns the tasks that have been scheduled as part of the postprocess
-   * @return
-   */
-  private Map<String, String> getTasksScheduled() {
-    return _tasksScheduled;
+    return tasksScheduled;
   }
 
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller 
leadership changes.
-   */
-  @Override
-  public void stopTask() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
+  private void nonLeaderCleanUp() {
 
 Review comment:
   Added comments in BasePeriodicTask

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@pinot.apache.org
For additional commands, e-mail: dev-h...@pinot.apache.org

Reply via email to