This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new eb7489c453 Track the actor that triggers the minion task (#14829)
eb7489c453 is described below
commit eb7489c453e2b82b8199670bbea931e52ae609db
Author: Shounak kulkarni <[email protected]>
AuthorDate: Thu Feb 13 09:55:11 2025 +0530
Track the actor that triggers the minion task (#14829)
* Track the actor that triggered the task
* add java docs
* simplified TaskSchedulingContext
* Track the actor that triggered the task
* add java docs
* simplified TaskSchedulingContext
* variable renames
* fixes
---
.../api/resources/PinotTaskRestletResource.java | 41 +++---
.../helix/core/minion/CronJobScheduleJob.java | 8 +-
.../core/minion/PinotHelixTaskResourceManager.java | 31 +++-
.../helix/core/minion/PinotTaskManager.java | 156 ++++++++++++---------
.../helix/core/minion/TaskSchedulingContext.java | 135 ++++++++++++++++++
.../helix/core/minion/TaskSchedulingInfo.java | 56 ++++++++
.../src/main/resources/app/pages/SubTaskDetail.tsx | 3 +
.../src/main/resources/app/pages/TaskDetail.tsx | 3 +
.../core/minion/PinotTaskManagerStatelessTest.java | 12 +-
.../integration/tests/MinionTaskTestUtils.java | 23 ++-
.../MergeRollupMinionClusterIntegrationTest.java | 55 +++++---
.../tests/PurgeMinionClusterIntegrationTest.java | 45 ++++--
...fflineSegmentsMinionClusterIntegrationTest.java | 19 ++-
...RefreshSegmentMinionClusterIntegrationTest.java | 44 ++++--
.../tests/SimpleMinionClusterIntegrationTest.java | 8 +-
.../integration/tests/TlsIntegrationTest.java | 3 +-
.../tests/UpsertTableIntegrationTest.java | 10 +-
.../tests/UrlAuthRealtimeIntegrationTest.java | 3 +-
.../apache/pinot/spi/utils/CommonConstants.java | 4 +
19 files changed, 496 insertions(+), 163 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 24cd444dc5..d76ea4f3a2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -80,12 +81,15 @@ import
org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.jersey.server.ManagedAsync;
@@ -646,29 +650,26 @@ public class PinotTaskRestletResource {
Map<String, String> response = new HashMap<>();
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
+ .setMinionInstanceTag(minionInstanceTag)
+ .setLeader(false);
if (taskType != null) {
- // Schedule task for the given task type
- PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
- ? _pinotTaskManager.scheduleTaskForTable(taskType,
DatabaseUtils.translateTableName(tableName, headers),
- minionInstanceTag)
- : _pinotTaskManager.scheduleTaskForDatabase(taskType, database,
minionInstanceTag);
- response.put(taskType,
StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
- generationErrors.addAll(taskInfos.getGenerationErrors());
- schedulingErrors.addAll(taskInfos.getSchedulingErrors());
+ context.setTasksToSchedule(Collections.singleton(taskType));
+ }
+ if (tableName != null) {
+
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName,
headers)));
} else {
- // Schedule tasks for all task types
- Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos =
tableName != null
- ?
_pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName,
headers),
- minionInstanceTag)
- : _pinotTaskManager.scheduleAllTasksForDatabase(database,
minionInstanceTag);
- allTaskInfos.forEach((key, value) -> {
- if (value.getScheduledTaskNames() != null) {
- response.put(key, String.join(",", value.getScheduledTaskNames()));
- }
- generationErrors.addAll(value.getGenerationErrors());
- schedulingErrors.addAll(value.getSchedulingErrors());
- });
+ context.setDatabasesToSchedule(Collections.singleton(database));
}
+ Map<String, TaskSchedulingInfo> allTaskInfos =
_pinotTaskManager.scheduleTasks(context);
+ allTaskInfos.forEach((key, value) -> {
+ if (value.getScheduledTaskNames() != null) {
+ response.put(key, String.join(",", value.getScheduledTaskNames()));
+ }
+ generationErrors.addAll(value.getGenerationErrors());
+ schedulingErrors.addAll(value.getSchedulingErrors());
+ });
response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
return response;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index f9b250b2bc..8a4751196f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -64,8 +66,12 @@ public class CronJobScheduleJob implements Job {
ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
return;
}
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(table))
+ .setTasksToSchedule(Collections.singleton(taskType))
+ .setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
long jobStartTime = System.currentTimeMillis();
- pinotTaskManager.scheduleTaskForTable(taskType, table, null);
+ pinotTaskManager.scheduleTasks(context);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is
{}", table, taskType,
jobExecutionContext.getNextFireTime());
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table,
taskType),
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 87580c30b0..bbbb3fcee2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -876,6 +876,11 @@ public class PinotHelixTaskResourceManager {
if (jobFinishTimeMs > 0) {
taskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(jobFinishTimeMs));
}
+ String triggeredBy =
jobConfig.getTaskConfigMap().values().stream().findFirst()
+ .map(TaskConfig::getConfigMap)
+ .map(taskConfigs -> taskConfigs.get(PinotTaskManager.TRIGGERED_BY))
+ .orElse("");
+ taskDebugInfo.setTriggeredBy(triggeredBy);
Set<Integer> partitionSet = jobContext.getPartitionSet();
TaskCount subtaskCount = new TaskCount();
for (int partition : partitionSet) {
@@ -890,6 +895,7 @@ public class PinotHelixTaskResourceManager {
String taskIdForPartition =
jobContext.getTaskIdForPartition(partition);
subtaskDebugInfo.setTaskId(taskIdForPartition);
subtaskDebugInfo.setState(partitionState);
+ subtaskDebugInfo.setTriggeredBy(triggeredBy);
long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition);
if (subtaskStartTimeMs > 0) {
subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs));
@@ -987,7 +993,8 @@ public class PinotHelixTaskResourceManager {
return
MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
}
- @JsonPropertyOrder({"taskState", "subtaskCount", "startTime",
"executionStartTime", "finishTime", "subtaskInfos"})
+ @JsonPropertyOrder({"taskState", "subtaskCount", "startTime",
"executionStartTime", "finishTime", "triggeredBy",
+ "subtaskInfos"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class TaskDebugInfo {
// Time at which the task (which may have multiple subtasks) got created.
@@ -998,6 +1005,7 @@ public class PinotHelixTaskResourceManager {
private String _finishTime;
private TaskState _taskState;
private TaskCount _subtaskCount;
+ private String _triggeredBy;
private List<SubtaskDebugInfo> _subtaskInfos;
public TaskDebugInfo() {
@@ -1046,6 +1054,15 @@ public class PinotHelixTaskResourceManager {
return _taskState;
}
+ public String getTriggeredBy() {
+ return _triggeredBy;
+ }
+
+ public TaskDebugInfo setTriggeredBy(String triggeredBy) {
+ _triggeredBy = triggeredBy;
+ return this;
+ }
+
public TaskCount getSubtaskCount() {
return _subtaskCount;
}
@@ -1055,7 +1072,7 @@ public class PinotHelixTaskResourceManager {
}
}
- @JsonPropertyOrder({"taskId", "state", "startTime", "finishTime",
"participant", "info", "taskConfig"})
+ @JsonPropertyOrder({"taskId", "state", "startTime", "finishTime",
"participant", "info", "triggeredBy", "taskConfig"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class SubtaskDebugInfo {
private String _taskId;
@@ -1064,6 +1081,7 @@ public class PinotHelixTaskResourceManager {
private String _finishTime;
private String _participant;
private String _info;
+ private String _triggeredBy;
private PinotTaskConfig _taskConfig;
public SubtaskDebugInfo() {
@@ -1121,6 +1139,15 @@ public class PinotHelixTaskResourceManager {
return _info;
}
+ public String getTriggeredBy() {
+ return _triggeredBy;
+ }
+
+ public SubtaskDebugInfo setTriggeredBy(String triggeredBy) {
+ _triggeredBy = triggeredBy;
+ return this;
+ }
+
public PinotTaskConfig getTaskConfig() {
return _taskConfig;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 93002f9100..9f1f938187 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -94,6 +95,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
private static final String TASK_QUEUE_PATH_PATTERN =
"/TaskRebalancer/TaskQueue_%s/Context";
+ public static final String TRIGGERED_BY = "triggeredBy";
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoAccessor _clusterInfoAccessor;
@@ -208,6 +210,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
continue;
}
+ pinotTaskConfigs.forEach(pinotTaskConfig -> pinotTaskConfig.getConfigs()
+ .computeIfAbsent(TRIGGERED_BY, k ->
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
LOGGER.info("Submitting ad-hoc task for task type: {} with task configs:
{}", taskType, pinotTaskConfigs);
_controllerMetrics.addMeteredTableValue(taskType,
ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
responseMap.put(tableNameWithType,
@@ -488,8 +492,11 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* It might be called from the non-leader controller.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of
tasks scheduled.
*/
+ @Deprecated(forRemoval = true)
public synchronized Map<String, TaskSchedulingInfo>
scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false,
minionInstanceTag);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context);
}
/**
@@ -497,9 +504,13 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* It might be called from the non-leader controller.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of
tasks scheduled.
*/
+ @Deprecated(forRemoval = true)
public synchronized Map<String, TaskSchedulingInfo>
scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables(database),
false, minionInstanceTag);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setDatabasesToSchedule(Collections.singleton(database))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context);
}
/**
@@ -507,9 +518,13 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* It might be called from the non-leader controller.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of
tasks scheduled.
*/
+ @Deprecated(forRemoval = true)
public synchronized Map<String, TaskSchedulingInfo>
scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
- return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(tableNameWithType))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context);
}
/**
@@ -521,8 +536,12 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
+ @Deprecated(forRemoval = true)
public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String
taskType, @Nullable String minionInstanceTag) {
- return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(),
minionInstanceTag);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTasksToSchedule(Collections.singleton(taskType))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context).get(taskType);
}
/**
@@ -534,9 +553,14 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
+ @Deprecated(forRemoval = true)
public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String
taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
- return scheduleTask(taskType,
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTasksToSchedule(Collections.singleton(taskType))
+ .setDatabasesToSchedule(Collections.singleton(database))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context).get(taskType);
}
/**
@@ -548,27 +572,64 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
+ @Deprecated(forRemoval = true)
public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType,
String tableNameWithType,
@Nullable String minionInstanceTag) {
- return scheduleTask(taskType, List.of(tableNameWithType),
minionInstanceTag);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTasksToSchedule(Collections.singleton(taskType))
+ .setTablesToSchedule(Collections.singleton(tableNameWithType))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context).get(taskType);
}
/**
* Helper method to schedule tasks (all task types) for the given tables
that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the
tasks scheduled.
*/
+ @Deprecated(forRemoval = true)
protected synchronized Map<String, TaskSchedulingInfo>
scheduleTasks(List<String> tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(new HashSet<>(tableNamesWithType))
+ .setLeader(isLeader)
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context);
+ }
+
+ /**
+ * Helper method to schedule tasks (all task types) for the given tables
that have the tasks enabled.
+ * Returns a map from the task type to the {@link TaskSchedulingInfo} of the
tasks scheduled.
+ */
+ public synchronized Map<String, TaskSchedulingInfo>
scheduleTasks(TaskSchedulingContext context) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
1L);
- // Scan all table configs to get the tables with tasks enabled
Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
- for (String tableNameWithType : tableNamesWithType) {
+ Set<String> targetTables = context.getTablesToSchedule();
+ Set<String> targetDatabases = context.getDatabasesToSchedule();
+ Set<String> tasksToSchedule = context.getTasksToSchedule();
+ Set<String> consolidatedTables = new HashSet<>();
+ if (targetTables != null) {
+ consolidatedTables.addAll(targetTables);
+ }
+ if (targetDatabases != null) {
+ targetDatabases.forEach(database ->
+
consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database)));
+ }
+ for (String tableNameWithType : consolidatedTables.isEmpty()
+ ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) {
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null) {
Set<String> enabledTaskTypes =
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
- for (String enabledTaskType : enabledTaskTypes) {
- enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new
ArrayList<>()).add(tableConfig);
+ Set<String> validTasks;
+ if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
+ // if no specific task types are provided schedule for all tasks
+ validTasks = enabledTaskTypes;
+ } else {
+ validTasks = new HashSet<>(tasksToSchedule);
+ validTasks.retainAll(enabledTaskTypes);
+ }
+ for (String taskType : validTasks) {
+ enabledTableConfigMap.computeIfAbsent(taskType, k -> new
ArrayList<>()).add(tableConfig);
}
}
}
@@ -579,13 +640,14 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
String taskType = entry.getKey();
List<TableConfig> enabledTableConfigs = entry.getValue();
PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- List<String> enabledTables =
-
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
if (taskGenerator != null) {
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
- tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, isLeader, minionInstanceTag));
+ tasksScheduled.put(taskType, scheduleTask(taskGenerator,
enabledTableConfigs, context.isLeader(),
+ context.getMinionInstanceTag(), context.getTriggeredBy()));
} else {
+ List<String> enabledTables =
+
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
String message = "Task type: " + taskType + " is not registered,
cannot enable it for tables: " + enabledTables;
LOGGER.warn(message);
TaskSchedulingInfo taskSchedulingInfo = new TaskSchedulingInfo();
@@ -597,24 +659,16 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
return tasksScheduled;
}
+ @Deprecated(forRemoval = true)
protected synchronized TaskSchedulingInfo scheduleTask(String taskType,
List<String> tables,
@Nullable String minionInstanceTag) {
- PinotTaskGenerator taskGenerator =
_taskGeneratorRegistry.getTaskGenerator(taskType);
- Preconditions.checkState(taskGenerator != null, "Task type: %s is not
registered", taskType);
-
- // Scan all table configs to get the tables with task enabled
- List<TableConfig> enabledTableConfigs = new ArrayList<>();
- for (String tableNameWithType : tables) {
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig != null && tableConfig.getTaskConfig() != null &&
tableConfig.getTaskConfig()
- .isTaskTypeEnabled(taskType)) {
- enabledTableConfigs.add(tableConfig);
- }
- }
-
- _helixTaskResourceManager.ensureTaskQueueExists(taskType);
- addTaskTypeMetricsUpdaterIfNeeded(taskType);
- return scheduleTask(taskGenerator, enabledTableConfigs, false,
minionInstanceTag);
+
Preconditions.checkState(_taskGeneratorRegistry.getAllTaskTypes().contains(taskType),
+ "Task type: %s is not registered", taskType);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(new HashSet<>(tables))
+ .setTasksToSchedule(Collections.singleton(taskType))
+ .setMinionInstanceTag(minionInstanceTag);
+ return scheduleTasks(context).get(taskType);
}
/**
@@ -626,8 +680,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
* - list of task scheduling errors if any
*/
protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator,
List<TableConfig> enabledTableConfigs,
- boolean isLeader, @Nullable String minionInstanceTagForTask) {
- TaskSchedulingInfo response = new TaskSchedulingInfo();
+ boolean isLeader, @Nullable String minionInstanceTagForTask, String
triggeredBy) {
+ TaskSchedulingInfo response = new TaskSchedulingInfo();
String taskType = taskGenerator.getTaskType();
List<String> enabledTables =
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
@@ -693,6 +747,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
// This might lead to lot of logs, maybe sum it up and move outside
the loop
LOGGER.info("Submitting {} tasks for task type: {} to
minionInstance: {} with task configs: {}", numTasks,
taskType, minionInstanceTag, pinotTaskConfigs);
+ pinotTaskConfigs.forEach(pinotTaskConfig ->
+ pinotTaskConfig.getConfigs().computeIfAbsent(TRIGGERED_BY, k ->
triggeredBy));
String submittedTaskName =
_helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
taskGenerator.getTaskTimeoutMs(),
taskGenerator.getNumConcurrentTasksPerInstance(),
taskGenerator.getMaxAttemptsPerTask());
@@ -718,7 +774,11 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
@Override
protected void processTables(List<String> tableNamesWithType, Properties
taskProperties) {
- scheduleTasks(tableNamesWithType, true, null);
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setLeader(true)
+ .setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
+ // cron schedule
+ scheduleTasks(context);
}
@Override
@@ -781,36 +841,4 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
}
return true;
}
-
- public static class TaskSchedulingInfo {
- private List<String> _scheduledTaskNames;
- private final List<String> _generationErrors = new ArrayList<>();
- private final List<String> _schedulingErrors = new ArrayList<>();
-
- @Nullable
- public List<String> getScheduledTaskNames() {
- return _scheduledTaskNames;
- }
-
- public TaskSchedulingInfo setScheduledTaskNames(List<String>
scheduledTaskNames) {
- _scheduledTaskNames = scheduledTaskNames;
- return this;
- }
-
- public List<String> getGenerationErrors() {
- return _generationErrors;
- }
-
- public void addGenerationError(String generationError) {
- _generationErrors.add(generationError);
- }
-
- public List<String> getSchedulingErrors() {
- return _schedulingErrors;
- }
-
- public void addSchedulingError(String schedulingError) {
- _schedulingErrors.add(schedulingError);
- }
- }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java
new file mode 100644
index 0000000000..2a685c2ab7
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.Set;
+
+
+/**
+ * Wrapper class to manage all the inputs passed to schedule a task on minion.
+ * Tasks will be scheduled based on the combination on tables, databases and
taskTypes passed
+ * <p>Y -> contains elements
+ * <p>N -> is null or empty
+ * <table>
+ * <tr>
+ * <th> tablesToSchedule </th> <th> databasesToSchedule </th> <th>
tasksToSchedule </th>
+ * <th> {@link PinotTaskManager} behavior </th>
+ * </tr>
+ * <tr>
+ * <td> N </td> <td> N </td> <td> N </td>
+ * <td>schedule all the configured tasks on all tables</td>
+ * </tr>
+ * <tr>
+ * <td> Y </td> <td> N </td> <td> N </td>
+ * <td>schedule all the configured tasks on tables in tablesToSchedule</td>
+ * </tr>
+ * <tr>
+ * <td> N </td> <td> Y </td> <td> N </td>
+ * <td>schedule all the configured tasks on all tables under the databases
in databasesToSchedule</td>
+ * </tr>
+ * <tr>
+ * <td> N </td> <td> N </td> <td> Y </td>
+ * <td>schedule tasksToSchedule on all tables</td>
+ * </tr>
+ * <tr>
+ * <td> N </td> <td> Y </td> <td> Y </td>
+ * <td>schedule tasksToSchedule on all tables under the databases in
databasesToSchedule</td>
+ * </tr>
+ * <tr>
+ * <td> Y </td> <td> N </td> <td> Y </td>
+ * <td>schedule tasksToSchedule on tables in tablesToSchedule</td>
+ * </tr>
+ * <tr>
+ * <td> Y </td> <td> Y </td> <td> N </td>
+ * <td>schedule all the configured tasks on tables in tablesToSchedule
+ * and also on all tables under the databases in databasesToSchedule</td>
+ * </tr>
+ * <tr>
+ * <td> Y </td> <td> Y </td> <td> Y </td>
+ * <td>schedule tasksToSchedule on tables in tablesToSchedule and also
+ * on all tables under the databases in databasesToSchedule</td>
+ * </tr>
+ * </table>
+ *
+ * In short empty tasksToSchedule will schedule tasks for all types and
+ * empty tablesToSchedule and databasesToSchedule will schedule tasks for all
tables
+ *
+ */
+public class TaskSchedulingContext {
+ private Set<String> _tablesToSchedule;
+ private Set<String> _tasksToSchedule;
+ private Set<String> _databasesToSchedule;
+ private String _triggeredBy;
+ private String _minionInstanceTag;
+ private boolean _isLeader;
+
+ public Set<String> getTablesToSchedule() {
+ return _tablesToSchedule;
+ }
+
+ public TaskSchedulingContext setTablesToSchedule(Set<String>
tablesToSchedule) {
+ _tablesToSchedule = tablesToSchedule;
+ return this;
+ }
+
+ public Set<String> getTasksToSchedule() {
+ return _tasksToSchedule;
+ }
+
+ public TaskSchedulingContext setTasksToSchedule(Set<String> tasksToSchedule)
{
+ _tasksToSchedule = tasksToSchedule;
+ return this;
+ }
+
+ public Set<String> getDatabasesToSchedule() {
+ return _databasesToSchedule;
+ }
+
+ public TaskSchedulingContext setDatabasesToSchedule(Set<String>
databasesToSchedule) {
+ _databasesToSchedule = databasesToSchedule;
+ return this;
+ }
+
+ public String getTriggeredBy() {
+ return _triggeredBy;
+ }
+
+ public TaskSchedulingContext setTriggeredBy(String triggeredBy) {
+ _triggeredBy = triggeredBy;
+ return this;
+ }
+
+ public String getMinionInstanceTag() {
+ return _minionInstanceTag;
+ }
+
+ public TaskSchedulingContext setMinionInstanceTag(String minionInstanceTag) {
+ _minionInstanceTag = minionInstanceTag;
+ return this;
+ }
+
+ public boolean isLeader() {
+ return _isLeader;
+ }
+
+ public TaskSchedulingContext setLeader(boolean leader) {
+ _isLeader = leader;
+ return this;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
new file mode 100644
index 0000000000..2ffa11676f
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+
+public class TaskSchedulingInfo {
+ private List<String> _scheduledTaskNames;
+ private final List<String> _generationErrors = new ArrayList<>();
+ private final List<String> _schedulingErrors = new ArrayList<>();
+
+ @Nullable
+ public List<String> getScheduledTaskNames() {
+ return _scheduledTaskNames;
+ }
+
+ public TaskSchedulingInfo setScheduledTaskNames(List<String>
scheduledTaskNames) {
+ _scheduledTaskNames = scheduledTaskNames;
+ return this;
+ }
+
+ public List<String> getGenerationErrors() {
+ return _generationErrors;
+ }
+
+ public void addGenerationError(String generationError) {
+ _generationErrors.add(generationError);
+ }
+
+ public List<String> getSchedulingErrors() {
+ return _schedulingErrors;
+ }
+
+ public void addSchedulingError(String schedulingError) {
+ _schedulingErrors.add(schedulingError);
+ }
+}
diff --git a/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
b/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
index f94cd4d820..27ab5dc273 100644
--- a/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
+++ b/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
@@ -113,6 +113,9 @@ const TaskDetail = (props) => {
<Grid item xs={12}>
<strong>Finish Time:</strong> {get(taskDebugData, 'finishTime',
'')}
</Grid>
+ <Grid item xs={12}>
+ <strong>Triggered By:</strong> {get(taskDebugData, 'triggeredBy',
'')}
+ </Grid>
<Grid item xs={12}>
<strong>Minion Host Name:</strong> {get(taskDebugData,
'participant', '')}
</Grid>
diff --git a/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
b/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
index f825cfbd05..eff82fd3fd 100644
--- a/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
+++ b/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
@@ -124,6 +124,9 @@ const TaskDetail = (props) => {
<Grid item xs={12}>
<strong>Finish Time:</strong> {get(taskDebugData, 'finishTime',
'')}
</Grid>
+ <Grid item xs={12}>
+ <strong>Triggered By:</strong> {get(taskDebugData, 'triggeredBy',
'')}
+ </Grid>
<Grid item xs={12}>
<strong>Number of Sub Tasks:</strong> {get(taskDebugData,
'subtaskCount.total', '')}
</Grid>
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index 132e109796..15b584eabf 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -65,6 +65,7 @@ import static org.testng.Assert.*;
@Test(groups = "stateless")
public class PinotTaskManagerStatelessTest extends ControllerTest {
private static final String RAW_TABLE_NAME = "myTable";
+ public static final String TABLE_NAME_WITH_TYPE = "myTable_OFFLINE";
private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
private static final long TIMEOUT_IN_MS = 10_000L;
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTaskManagerStatelessTest.class);
@@ -192,10 +193,15 @@ public class PinotTaskManagerStatelessTest extends
ControllerTest {
public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue()
throws Exception {
testValidateTaskGeneration(taskManager -> {
+ String taskName = "SegmentGenerationAndPushTask";
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(TABLE_NAME_WITH_TYPE))
+ .setTasksToSchedule(Collections.singleton(taskName));
// Validate schedule tasks for table when task queue is in stopped state
- List<String> taskIDs =
taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable",
null)
- .getScheduledTaskNames();
- assertNull(taskIDs);
+ TaskSchedulingInfo info =
taskManager.scheduleTasks(context).get(taskName);
+ assertNotNull(info);
+ assertNull(info.getScheduledTaskNames());
+ assertFalse(info.getSchedulingErrors().isEmpty());
return null;
});
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
index 849a8b8bfd..2291bbb858 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
@@ -20,6 +20,8 @@ package org.apache.pinot.integration.tests;
import java.util.Map;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -29,25 +31,18 @@ public class MinionTaskTestUtils {
private MinionTaskTestUtils() {
}
- public static void assertNoTaskSchedule(String tableNameWithType, String
taskType, PinotTaskManager taskManager) {
- PinotTaskManager.TaskSchedulingInfo info =
- taskManager.scheduleAllTasksForTable(tableNameWithType,
null).get(taskType);
- assertNoTaskSchedule(info);
- }
-
- public static void assertNoTaskSchedule(String taskType, PinotTaskManager
taskManager) {
- PinotTaskManager.TaskSchedulingInfo info =
taskManager.scheduleTaskForAllTables(taskType, null);
- assertNoTaskSchedule(info);
- }
-
- public static void assertNoTaskSchedule(PinotTaskManager taskManager) {
- Map<String, PinotTaskManager.TaskSchedulingInfo> infoMap =
taskManager.scheduleAllTasksForAllTables(null);
+ public static void assertNoTaskSchedule(TaskSchedulingContext context,
PinotTaskManager taskManager) {
+ Map<String, TaskSchedulingInfo> infoMap =
taskManager.scheduleTasks(context);
infoMap.forEach((key, value) -> assertNoTaskSchedule(value));
}
- public static void assertNoTaskSchedule(PinotTaskManager.TaskSchedulingInfo
info) {
+ public static void assertNoTaskSchedule(TaskSchedulingInfo info) {
assertNotNull(info.getScheduledTaskNames());
assertTrue(info.getScheduledTaskNames().isEmpty());
+ assertNoTaskErrors(info);
+ }
+
+ public static void assertNoTaskErrors(TaskSchedulingInfo info) {
assertNotNull(info.getGenerationErrors());
assertTrue(info.getGenerationErrors().isEmpty());
assertNotNull(info.getSchedulingErrors());
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index b8833d10b1..71a55da67d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -408,18 +409,20 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
long expectedWatermark = 16000 * 86_400_000L;
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE);
int numTasks = 0;
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName));
List<String> taskList;
- for (String tasks =
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ for (String tasks = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = _taskManager.scheduleAllTasksForTable(offlineTableName,
null)
+ taskList = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNull(_taskManager.scheduleTasks(context)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
@@ -524,18 +527,20 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
long expectedWatermark = 16000 * 86_400_000L;
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
int numTasks = 0;
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName));
List<String> taskList;
- for (String tasks =
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ for (String tasks = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = _taskManager.scheduleAllTasksForTable(offlineTableName,
null)
+ taskList = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNull(_taskManager.scheduleTasks(context)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
@@ -633,18 +638,20 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
long expectedWatermark = 16050 * 86_400_000L;
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE);
int numTasks = 0;
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName));
List<String> taskList;
- for (String tasks =
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ for (String tasks = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = _taskManager.scheduleAllTasksForTable(offlineTableName,
null)
+ taskList = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNull(_taskManager.scheduleTasks(context)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
@@ -785,18 +792,20 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE);
int numTasks = 0;
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName));
List<String> taskList;
- for (String tasks =
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ for (String tasks = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = _taskManager.scheduleAllTasksForTable(offlineTableName,
null)
+ taskList = _taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNull(_taskManager.scheduleTasks(context)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
@@ -918,11 +927,13 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
long expectedWatermark = 16000 * 86_400_000L;
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
int numTasks = 0;
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName));
List<String> taskList;
- for (String tasks =
taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ for (String tasks = taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
+ taskList = taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
// assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(),
expectedNumSubTasks[numTasks]);
@@ -930,7 +941,7 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ assertNull(taskManager.scheduleTasks(context)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
@@ -1024,17 +1035,19 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1};
String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
int numTasks = 0;
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName));
List<String> taskList;
- for (String tasks =
taskManager.scheduleAllTasksForTable(realtimeTableName, null).
-
get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
+ for (String tasks = taskManager.scheduleTasks(context)
+
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
+ taskList = taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
assertTrue(helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ assertNull(taskManager.scheduleTasks(context)
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();
@@ -1066,10 +1079,10 @@ public class MergeRollupMinionClusterIntegrationTest
extends BaseClusterIntegrat
uploadSegments(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE,
TableType.REALTIME, _tarDir5);
waitForAllDocsLoaded(600_000L);
- for (String tasks =
taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+ for (String tasks = taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
tasks != null;
- taskList = taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
+ taskList = taskManager.scheduleTasks(context)
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0)
: null, numTasks++) {
waitForTaskToComplete();
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index fed10b9f1b..19459894c7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionContext;
import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -185,12 +186,16 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
// 5. Check the purge process itself by setting an expecting number of rows
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
- assertNotNull(
- _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+ _taskManager);
waitForTaskToComplete();
// Check that metadata contains expected values
@@ -200,7 +205,10 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE +
MinionConstants.TASK_TIME_SUFFIX));
}
// Should not generate new purge task as the last time purge is not
greater than last + 1day (default purge delay)
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+ _taskManager);
// 52 rows with ArrTime = 1
// 115545 totals rows
@@ -231,11 +239,16 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
assertNotNull(
- _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
+ _taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+ _taskManager);
waitForTaskToComplete();
// Check that metadata contains expected values
@@ -247,7 +260,10 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) <
86400000);
}
// Should not generate new purge task as the last time purge is not
greater than last + 1day (default purge delay)
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+ _taskManager);
// 52 rows with ArrTime = 1
// 115545 totals rows
@@ -279,7 +295,10 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
// No task should be schedule as the delay is not passed
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+ _taskManager);
for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Check purge time
String purgeTime =
@@ -330,11 +349,15 @@ public class PurgeMinionClusterIntegrationTest extends
BaseClusterIntegrationTes
// schedule purge tasks
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
- assertNotNull(
- _taskManager.scheduleAllTasksForTable(offlineTableName,
null).get(MinionConstants.PurgeTask.TASK_TYPE));
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
+ .get(MinionConstants.PurgeTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+ _taskManager);
waitForTaskToComplete();
// Check that metadata contains expected values
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 296c981c18..15829f57b0 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -231,13 +232,16 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
long expectedWatermark = _dataSmallestTimeMs + 86400000;
for (int i = 0; i < 3; i++) {
// Schedule task
- assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName,
null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(_realtimeTableName)))
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- MinionTaskTestUtils.assertNoTaskSchedule(_realtimeTableName,
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
_taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(_realtimeTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)),
+ _taskManager);
// Wait at most 600 seconds for all tasks COMPLETED
waitForTaskToComplete(expectedWatermark, _realtimeTableName);
@@ -283,13 +287,16 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
_taskManager.cleanUpTask();
for (int i = 0; i < 3; i++) {
// Schedule task
-
assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName,
null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+
.setTablesToSchedule(Collections.singleton(_realtimeMetadataTableName)))
.get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- MinionTaskTestUtils.assertNoTaskSchedule(_realtimeMetadataTableName,
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
_taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+
.setTablesToSchedule(Collections.singleton(_realtimeMetadataTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)),
+ _taskManager);
// Wait at most 600 seconds for all tasks COMPLETED
waitForTaskToComplete(expectedWatermark, _realtimeMetadataTableName);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index c14f278cf6..9a0b23bae4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -108,12 +109,15 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
public void testFirstSegmentRefresh() {
// This will create the inverted index as we disable inverted index
creation during segment push.
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
- assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
waitForTaskToComplete();
@@ -128,7 +132,9 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
}
// This should be no-op as nothing changes.
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
@@ -153,12 +159,15 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING);
addSchema(schema);
- assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
waitForTaskToComplete();
@@ -232,12 +241,15 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
updateTableConfig(tableConfig);
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
- assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
waitForTaskToComplete();
@@ -323,12 +335,15 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
- assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
waitForTaskToComplete();
@@ -401,12 +416,15 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
updateTableConfig(tableConfig);
- assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName)))
.get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
waitForTaskToComplete();
@@ -423,7 +441,9 @@ public class RefreshSegmentMinionClusterIntegrationTest
extends BaseClusterInteg
}
// This should be no-op as nothing changes.
- MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName,
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(offlineTableName))
+
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
_taskManager);
for (SegmentZKMetadata metadata :
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
// Get the value in segment metadata
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 3071d9c7fb..00dce3341c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.MetricValueUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -137,7 +138,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
// Should create the task queues and generate a task in the same minion
instance
List<String> task1 =
-
_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE).getScheduledTaskNames();
+ _taskManager.scheduleTasks(new
TaskSchedulingContext()).get(TASK_TYPE).getScheduledTaskNames();
assertNotNull(task1);
assertEquals(task1.size(), 1);
assertTrue(_helixTaskResourceManager.getTaskQueues()
@@ -151,7 +152,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
verifyTaskCount(task1.get(0), 0, 1, 1, 2);
// Should generate one more task, with two sub-tasks. Both of these
sub-tasks will wait
// since we have one minion instance that is still running one of the
sub-tasks.
- List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE,
null).getScheduledTaskNames();
+ List<String> task2 = _taskManager.scheduleTasks(new
TaskSchedulingContext()).get(TASK_TYPE).getScheduledTaskNames();
assertNotNull(task2);
assertEquals(task2.size(), 1);
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -160,8 +161,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
// Should not generate more tasks since
SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
// Our test task generator does not generate if there are already this
many sub-tasks in the
// running+waiting count already.
- MinionTaskTestUtils.assertNoTaskSchedule(_taskManager);
- MinionTaskTestUtils.assertNoTaskSchedule(TASK_TYPE, _taskManager);
+ MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext(),
_taskManager);
// Wait at most 60 seconds for all tasks IN_PROGRESS
TestUtils.waitForCondition(input -> {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index e5c35f6dbd..eceafc732c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import
org.apache.pinot.integration.tests.access.CertBasedTlsChannelAccessControlFactory;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -478,7 +479,7 @@ public class TlsIntegrationTest extends
BaseClusterIntegrationTest {
Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
// schedule offline segment generation
-
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
+ Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks(new
TaskSchedulingContext()));
// wait for offline segments
JsonNode offlineSegments = TestUtils.waitForResult(() -> {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 6d965ffae0..3aa15e1fb0 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.config.TagNameUtils;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
@@ -468,7 +469,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTest {
sendPostRequest(_controllerRequestURLBuilder.forResumeConsumption(tableName));
waitForNumQueriedSegmentsToConverge(tableName, 600_000L, 5, 2);
String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
- assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName)))
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
// 2 segments should be compacted (351 rows -> 1 row; 500 rows -> 2 rows),
1 segment (149 rows) should be deleted
@@ -501,7 +503,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTest {
// NOTE: When in-memory valid doc ids are used, no need to pause/resume
consumption to trigger the snapshot.
String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
- assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName)))
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
// 1 segment should be compacted (500 rows -> 2 rows)
@@ -544,7 +547,8 @@ public class UpsertTableIntegrationTest extends
BaseClusterIntegrationTest {
waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5, 2);
String realtimeTableName =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
- assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName,
null)
+ assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+ .setTablesToSchedule(Collections.singleton(realtimeTableName)))
.get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
waitForTaskToComplete();
// 1 segment should be compacted (351 rows -> 1 rows), 2 segments (500
rows, 151 rows) should be deleted
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index 22d24115d9..ade04ba12e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -32,6 +32,7 @@ import
org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.auth.UrlAuthProvider;
import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -176,7 +177,7 @@ public class UrlAuthRealtimeIntegrationTest extends
BaseClusterIntegrationTest {
Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
// schedule offline segment generation
-
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
+ Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks(new
TaskSchedulingContext()));
// wait for offline segments
List<String> offlineSegments = TestUtils.waitForResult(() -> {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 19fbb75216..f1f197bfcd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -79,6 +79,10 @@ public class CommonConstants {
CONSUMING, NOT_CONSUMING // In error state
}
+ public enum TaskTriggers {
+ CRON_TRIGGER, MANUAL_TRIGGER, ADHOC_TRIGGER, UNKNOWN
+ }
+
public static class Table {
public static final String PUSH_FREQUENCY_HOURLY = "hourly";
public static final String PUSH_FREQUENCY_DAILY = "daily";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]