This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7805427c81 add_init (#15471)
7805427c81 is described below
commit 7805427c8140f6f0d288a432bda576a1af840b4b
Author: Songqiao Su <[email protected]>
AuthorDate: Mon Apr 7 08:20:49 2025 -0700
add_init (#15471)
---
.../apache/pinot/controller/BaseControllerStarter.java | 1 +
.../controller/helix/core/minion/PinotTaskManager.java | 18 +++++++++++++++---
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 532d628c41..bd77104c45 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -816,6 +816,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_taskManagerStatusCache = getTaskManagerStatusCache();
// Create and add task manager
_taskManager = createTaskManager();
+ _taskManager.init();
periodicTasks.add(_taskManager);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(_helixResourceManager, _config,
_executorService, _connectionManager);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index c62d7a1035..eab658a521 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -110,6 +110,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
private final Map<String, Map<String, String>>
_tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
private final Map<String, TableTaskSchedulerUpdater>
_tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
+ private final boolean _isPinotTaskManagerSchedulerEnabled;
+
// For metrics
private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap
= new ConcurrentHashMap<>();
private final Map<TaskState, Integer> _taskStateToCountMap = new
ConcurrentHashMap<>();
@@ -135,9 +137,21 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
_skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
_maxCronScheduleDelayInSeconds =
controllerConf.getMaxCronScheduleDelayInSeconds();
- if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
+ _isPinotTaskManagerSchedulerEnabled =
controllerConf.isPinotTaskManagerSchedulerEnabled();
+ if (_isPinotTaskManagerSchedulerEnabled) {
try {
_scheduler = new StdSchedulerFactory().getScheduler();
+ } catch (SchedulerException e) {
+ throw new RuntimeException("Caught exception while setting up the
scheduler", e);
+ }
+ } else {
+ _scheduler = null;
+ }
+ }
+
+ public void init() {
+ if (_isPinotTaskManagerSchedulerEnabled) {
+ try {
_scheduler.start();
synchronized (_zkTableConfigChangeListener) {
// Subscribe child changes before reading the data to avoid missing
changes
@@ -153,8 +167,6 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
} catch (SchedulerException e) {
throw new RuntimeException("Caught exception while setting up the
scheduler", e);
}
- } else {
- _scheduler = null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]