This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c79d029384 allow to config task expire time (#9530) c79d029384 is described below commit c79d029384c25241a4ee114fd5472cd6b4cb6d47 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Wed Oct 5 09:20:31 2022 -0700 allow to config task expire time (#9530) * allow to config task expire time, i.e. how often to purge ended tasks from ZK * comment the config for clarity --- .../org/apache/pinot/controller/BaseControllerStarter.java | 3 ++- .../main/java/org/apache/pinot/controller/ControllerConf.java | 8 ++++++++ .../helix/core/minion/PinotHelixTaskResourceManager.java | 10 +++++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 9ba042bbcb..697c8f2a06 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -402,7 +402,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Starting task resource manager"); _helixTaskResourceManager = - new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager)); + new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager), + _config.getPinotTaskExpireTimeInMs()); // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager LOGGER.info("Starting realtime segment manager"); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index a3ae9316eb..9b1081d6d4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -148,6 +148,10 @@ public class ControllerConf extends PinotConfiguration { "controller.minion.task.metrics.emitter.frequencyPeriod"; public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled"; + // This is the expiry for the ended tasks. Helix cleans up the task info from ZK after the expiry time from the + // end of the task. + public static final String PINOT_TASK_EXPIRE_TIME_MS = "controller.task.expire.time.ms"; + @Deprecated // RealtimeSegmentRelocator has been rebranded as SegmentRelocator public static final String DEPRECATED_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = @@ -825,6 +829,10 @@ public class ControllerConf extends PinotConfiguration { return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, false); } + public long getPinotTaskExpireTimeInMs() { + return getProperty(ControllerPeriodicTasksConf.PINOT_TASK_EXPIRE_TIME_MS, TimeUnit.HOURS.toMillis(24)); + } + /** * RealtimeSegmentRelocator has been rebranded to SegmentRelocator. * Check for SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS property, if not found, return diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index 85b913c471..4e5c7e5963 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -83,10 +83,17 @@ public class PinotHelixTaskResourceManager { private final TaskDriver _taskDriver; private final PinotHelixResourceManager _helixResourceManager; + private final long _taskExpireTimeMs; public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceManager, TaskDriver taskDriver) { + this(helixResourceManager, taskDriver, TimeUnit.HOURS.toMillis(24)); + } + + public PinotHelixTaskResourceManager(PinotHelixResourceManager helixResourceManager, TaskDriver taskDriver, + long taskExpireTimeMs) { _helixResourceManager = helixResourceManager; _taskDriver = taskDriver; + _taskExpireTimeMs = taskExpireTimeMs; } /** @@ -288,7 +295,8 @@ public class PinotHelixTaskResourceManager { JobConfig.Builder jobBuilder = new JobConfig.Builder().addTaskConfigs(helixTaskConfigs).setInstanceGroupTag(minionInstanceTag) .setTimeoutPerTask(taskTimeoutMs).setNumConcurrentTasksPerInstance(numConcurrentTasksPerInstance) - .setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE); + .setIgnoreDependentJobFailure(true).setMaxAttemptsPerTask(1).setFailureThreshold(Integer.MAX_VALUE) + .setExpiry(_taskExpireTimeMs); _taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, jobBuilder); // Wait until task state is available --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org