Copilot commented on code in PR #16613:
URL: https://github.com/apache/pinot/pull/16613#discussion_r2279873670


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java:
##########
@@ -67,19 +67,19 @@ public RebalanceChecker(TableRebalanceManager 
tableRebalanceManager,
 
   @Override
   protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
-    int numTables = tableNamesWithType.size();
-    LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
-    int numTablesProcessed = retryRebalanceTables(new 
HashSet<>(tableNamesWithType));
-    
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
 _taskName,
-        numTablesProcessed);
-    LOGGER.info("Finish processing {}/{} tables in task: {}", 
numTablesProcessed, numTables, _taskName);
+    synchronized (_taskLock) {
+      int numTables = tableNamesWithType.size();
+      LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+      // Rare but the task may be executed by more than one threads because 
user can trigger the periodic task to run

Review Comment:
   The comment contains a grammatical error. 'more than one threads' should be 
'more than one thread'.
   ```suggestion
         // Rare but the task may be executed by more than one thread because 
user can trigger the periodic task to run
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -619,102 +637,111 @@ public synchronized TaskSchedulingInfo 
scheduleTaskForDatabase(String taskType,
    *  - list of task scheduling errors if any
    */
   @Deprecated(forRemoval = true)
-  public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, 
String tableNameWithType,
+  public TaskSchedulingInfo scheduleTaskForTable(String taskType, String 
tableNameWithType,
       @Nullable String minionInstanceTag) {
-    TaskSchedulingContext context = new TaskSchedulingContext()
-        .setTasksToSchedule(Collections.singleton(taskType))
-        .setTablesToSchedule(Collections.singleton(tableNameWithType))
-        .setMinionInstanceTag(minionInstanceTag);
-    return scheduleTasks(context).get(taskType);
+    synchronized (_taskLock) {
+      TaskSchedulingContext context = new TaskSchedulingContext()
+          .setTasksToSchedule(Collections.singleton(taskType))
+          .setTablesToSchedule(Collections.singleton(tableNameWithType))
+          .setMinionInstanceTag(minionInstanceTag);
+      return scheduleTasks(context).get(taskType);
+    }
   }
 
   /**
    * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled.
    * Returns a map from the task type to the {@link TaskSchedulingInfo} of the 
tasks scheduled.
    */
   @Deprecated(forRemoval = true)
-  protected synchronized Map<String, TaskSchedulingInfo> 
scheduleTasks(List<String> tableNamesWithType,
+  protected Map<String, TaskSchedulingInfo> scheduleTasks(List<String> 
tableNamesWithType,
       boolean isLeader, @Nullable String minionInstanceTag) {
-    TaskSchedulingContext context = new TaskSchedulingContext()
-        .setTablesToSchedule(new HashSet<>(tableNamesWithType))
-        .setLeader(isLeader)
-        .setMinionInstanceTag(minionInstanceTag);
-    return scheduleTasks(context);
+    synchronized (_taskLock) {
+      TaskSchedulingContext context = new TaskSchedulingContext()
+          .setTablesToSchedule(new HashSet<>(tableNamesWithType))
+          .setLeader(isLeader)
+          .setMinionInstanceTag(minionInstanceTag);
+      return scheduleTasks(context);
+    }
   }
 
   /**
    * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled.
    * Returns a map from the task type to the {@link TaskSchedulingInfo} of the 
tasks scheduled.
    */
-  public synchronized Map<String, TaskSchedulingInfo> 
scheduleTasks(TaskSchedulingContext context) {
-    
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
-
-    Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
-    Set<String> targetTables = context.getTablesToSchedule();
-    Set<String> targetDatabases = context.getDatabasesToSchedule();
-    Set<String> tasksToSchedule = context.getTasksToSchedule();
-    Set<String> consolidatedTables = new HashSet<>();
-    if (targetTables != null) {
-      consolidatedTables.addAll(targetTables);
-    }
-    if (targetDatabases != null) {
-      targetDatabases.forEach(database ->
-          
consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database)));
-    }
-    for (String tableNameWithType : consolidatedTables.isEmpty()
-        ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig != null && tableConfig.getTaskConfig() != null) {
-        Set<String> enabledTaskTypes = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
-        Set<String> validTasks;
-        if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
-          // if no specific task types are provided schedule for all tasks
-          validTasks = enabledTaskTypes;
-        } else {
-          validTasks = new HashSet<>(tasksToSchedule);
-          validTasks.retainAll(enabledTaskTypes);
+  public Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext 
context) {
+    synchronized (_taskLock) {
+      
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
+
+      Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
+      Set<String> targetTables = context.getTablesToSchedule();
+      Set<String> targetDatabases = context.getDatabasesToSchedule();
+      Set<String> tasksToSchedule = context.getTasksToSchedule();
+      Set<String> consolidatedTables = new HashSet<>();
+      if (targetTables != null) {
+        consolidatedTables.addAll(targetTables);
+      }
+      if (targetDatabases != null) {
+        targetDatabases.forEach(database ->
+            
consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database)));
+      }
+      for (String tableNameWithType : consolidatedTables.isEmpty()
+          ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) {
+        TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig != null && tableConfig.getTaskConfig() != null) {
+          Set<String> enabledTaskTypes = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
+          Set<String> validTasks;
+          if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
+            // if no specific task types are provided schedule for all tasks
+            validTasks = enabledTaskTypes;
+          } else {
+            validTasks = new HashSet<>(tasksToSchedule);
+            validTasks.retainAll(enabledTaskTypes);
+          }
+          for (String taskType : validTasks) {
+            enabledTableConfigMap.computeIfAbsent(taskType, k -> new 
ArrayList<>()).add(tableConfig);
+          }
         }
-        for (String taskType : validTasks) {
-          enabledTableConfigMap.computeIfAbsent(taskType, k -> new 
ArrayList<>()).add(tableConfig);
+      }
+
+      // Generate each type of tasks
+      Map<String, TaskSchedulingInfo> tasksScheduled = new HashMap<>();
+      for (Map.Entry<String, List<TableConfig>> entry : 
enabledTableConfigMap.entrySet()) {
+        String taskType = entry.getKey();
+        List<TableConfig> enabledTableConfigs = entry.getValue();
+        PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+        if (taskGenerator != null) {
+          _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+          addTaskTypeMetricsUpdaterIfNeeded(taskType);
+          tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, context.isLeader(),
+              context.getMinionInstanceTag(), context.getTriggeredBy()));
+        } else {
+          List<String> enabledTables =
+              
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
+          String message =

Review Comment:
   This line appears to be incorrectly indented with extra spaces. The 
assignment should align with the surrounding code block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to