Copilot commented on code in PR #16613:
URL: https://github.com/apache/pinot/pull/16613#discussion_r2279873670
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java:
##########
@@ -67,19 +67,19 @@ public RebalanceChecker(TableRebalanceManager
tableRebalanceManager,
@Override
protected void processTables(List<String> tableNamesWithType, Properties
periodicTaskProperties) {
- int numTables = tableNamesWithType.size();
- LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
- int numTablesProcessed = retryRebalanceTables(new
HashSet<>(tableNamesWithType));
-
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
_taskName,
- numTablesProcessed);
- LOGGER.info("Finish processing {}/{} tables in task: {}",
numTablesProcessed, numTables, _taskName);
+ synchronized (_taskLock) {
+ int numTables = tableNamesWithType.size();
+ LOGGER.info("Processing {} tables in task: {}", numTables, _taskName);
+ // Rare but the task may be executed by more than one threads because
user can trigger the periodic task to run
Review Comment:
The comment contains a grammatical error. 'more than one threads' should be
'more than one thread'.
```suggestion
// Rare but the task may be executed by more than one thread because
user can trigger the periodic task to run
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -619,102 +637,111 @@ public synchronized TaskSchedulingInfo
scheduleTaskForDatabase(String taskType,
* - list of task scheduling errors if any
*/
@Deprecated(forRemoval = true)
- public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType,
String tableNameWithType,
+ public TaskSchedulingInfo scheduleTaskForTable(String taskType, String
tableNameWithType,
@Nullable String minionInstanceTag) {
- TaskSchedulingContext context = new TaskSchedulingContext()
- .setTasksToSchedule(Collections.singleton(taskType))
- .setTablesToSchedule(Collections.singleton(tableNameWithType))
- .setMinionInstanceTag(minionInstanceTag);
- return scheduleTasks(context).get(taskType);
+ synchronized (_taskLock) {
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTasksToSchedule(Collections.singleton(taskType))
+ .setTablesToSchedule(Collections.singleton(tableNameWithType))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context).get(taskType);
+ }
}
/**
* Helper method to schedule tasks (all task types) for the given tables
that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the
tasks scheduled.
*/
@Deprecated(forRemoval = true)
- protected synchronized Map<String, TaskSchedulingInfo>
scheduleTasks(List<String> tableNamesWithType,
+ protected Map<String, TaskSchedulingInfo> scheduleTasks(List<String>
tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
- TaskSchedulingContext context = new TaskSchedulingContext()
- .setTablesToSchedule(new HashSet<>(tableNamesWithType))
- .setLeader(isLeader)
- .setMinionInstanceTag(minionInstanceTag);
- return scheduleTasks(context);
+ synchronized (_taskLock) {
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(new HashSet<>(tableNamesWithType))
+ .setLeader(isLeader)
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context);
+ }
}
/**
* Helper method to schedule tasks (all task types) for the given tables
that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the
tasks scheduled.
*/
- public synchronized Map<String, TaskSchedulingInfo>
scheduleTasks(TaskSchedulingContext context) {
-
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
-
- Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
- Set<String> targetTables = context.getTablesToSchedule();
- Set<String> targetDatabases = context.getDatabasesToSchedule();
- Set<String> tasksToSchedule = context.getTasksToSchedule();
- Set<String> consolidatedTables = new HashSet<>();
- if (targetTables != null) {
- consolidatedTables.addAll(targetTables);
- }
- if (targetDatabases != null) {
- targetDatabases.forEach(database ->
-
consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database)));
- }
- for (String tableNameWithType : consolidatedTables.isEmpty()
- ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null && tableConfig.getTaskConfig() != null) {
- Set<String> enabledTaskTypes =
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
- Set<String> validTasks;
- if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
- // if no specific task types are provided schedule for all tasks
- validTasks = enabledTaskTypes;
- } else {
- validTasks = new HashSet<>(tasksToSchedule);
- validTasks.retainAll(enabledTaskTypes);
+ public Map<String, TaskSchedulingInfo> scheduleTasks(TaskSchedulingContext
context) {
+ synchronized (_taskLock) {
+
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
+
+ Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
+ Set<String> targetTables = context.getTablesToSchedule();
+ Set<String> targetDatabases = context.getDatabasesToSchedule();
+ Set<String> tasksToSchedule = context.getTasksToSchedule();
+ Set<String> consolidatedTables = new HashSet<>();
+ if (targetTables != null) {
+ consolidatedTables.addAll(targetTables);
+ }
+ if (targetDatabases != null) {
+ targetDatabases.forEach(database ->
+
consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database)));
+ }
+ for (String tableNameWithType : consolidatedTables.isEmpty()
+ ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) {
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null && tableConfig.getTaskConfig() != null) {
+ Set<String> enabledTaskTypes =
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
+ Set<String> validTasks;
+ if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
+ // if no specific task types are provided schedule for all tasks
+ validTasks = enabledTaskTypes;
+ } else {
+ validTasks = new HashSet<>(tasksToSchedule);
+ validTasks.retainAll(enabledTaskTypes);
+ }
+ for (String taskType : validTasks) {
+ enabledTableConfigMap.computeIfAbsent(taskType, k -> new
ArrayList<>()).add(tableConfig);
+ }
}
- for (String taskType : validTasks) {
- enabledTableConfigMap.computeIfAbsent(taskType, k -> new
ArrayList<>()).add(tableConfig);
+ }
+
+ // Generate each type of tasks
+ Map<String, TaskSchedulingInfo> tasksScheduled = new HashMap<>();
+ for (Map.Entry<String, List<TableConfig>> entry :
enabledTableConfigMap.entrySet()) {
+ String taskType = entry.getKey();
+ List<TableConfig> enabledTableConfigs = entry.getValue();
+ PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
+ if (taskGenerator != null) {
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ addTaskTypeMetricsUpdaterIfNeeded(taskType);
+ tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, context.isLeader(),
+ context.getMinionInstanceTag(), context.getTriggeredBy()));
+ } else {
+ List<String> enabledTables =
+
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
+ String message =
Review Comment:
This line appears to be incorrectly indented with extra spaces. The
assignment should align with the surrounding code block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]