This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 25ef4806fb make pinot task manager pluggable (#15429)
25ef4806fb is described below
commit 25ef4806fbb800cd72d3df9eed648ac7f8e3c84e
Author: Songqiao Su <[email protected]>
AuthorDate: Wed Apr 2 11:32:52 2025 -0700
make pinot task manager pluggable (#15429)
---
.../pinot/controller/BaseControllerStarter.java | 28 ++++++++++++++++++----
.../apache/pinot/spi/utils/CommonConstants.java | 6 +++++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 106cfdf59e..532d628c41 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -133,6 +133,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -813,10 +814,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Setting up periodic tasks");
List<PeriodicTask> periodicTasks = new ArrayList<>();
_taskManagerStatusCache = getTaskManagerStatusCache();
- _taskManager =
- new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager,
_leadControllerManager, _config,
- _controllerMetrics, _taskManagerStatusCache, _executorService,
_connectionManager,
- _resourceUtilizationManager);
+ // Create and add task manager
+ _taskManager = createTaskManager();
periodicTasks.add(_taskManager);
BrokerServiceHelper brokerServiceHelper =
new BrokerServiceHelper(_helixResourceManager, _config,
_executorService, _connectionManager);
@@ -866,6 +865,27 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
return periodicTasks;
}
+ /**
+ * Creates a TaskManager instance as specified in the configuration.
+ */
+ private PinotTaskManager createTaskManager() {
+ String taskManagerClass =
_config.getProperty(CommonConstants.Controller.CONFIG_OF_TASK_MANAGER_CLASS,
+ CommonConstants.Controller.DEFAULT_TASK_MANAGER_CLASS);
+ LOGGER.info("Creating TaskManager with class: {}", taskManagerClass);
+ try {
+ return PluginManager.get().createInstance(taskManagerClass,
+ new Class[]{PinotHelixTaskResourceManager.class,
PinotHelixResourceManager.class, LeadControllerManager.class,
+ ControllerConf.class, ControllerMetrics.class,
TaskManagerStatusCache.class,
+ Executor.class, PoolingHttpClientConnectionManager.class,
ResourceUtilizationManager.class},
+ new Object[]{_helixTaskResourceManager, _helixResourceManager,
_leadControllerManager,
+ _config, _controllerMetrics, _taskManagerStatusCache,
_executorService,
+ _connectionManager, _resourceUtilizationManager});
+ } catch (Exception e) {
+ LOGGER.error("Failed to create task manager with class: {}",
taskManagerClass, e);
+ throw new RuntimeException("Failed to create task manager with class: "
+ taskManagerClass, e);
+ }
+ }
+
@Override
public void stop() {
switch (_controllerMode) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7eb62120f7..a5a4b60bff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1116,6 +1116,12 @@ public class CommonConstants {
public static final String CONFIG_OF_INSTANCE_ID =
"pinot.controller.instance.id";
public static final String CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES
=
"pinot.controller.query.rewriter.class.names";
+
+ // Task Manager configuration
+ public static final String CONFIG_OF_TASK_MANAGER_CLASS =
"pinot.controller.task.manager.class";
+ public static final String DEFAULT_TASK_MANAGER_CLASS =
+ "org.apache.pinot.controller.helix.core.minion.PinotTaskManager";
+
//Set to true to load all services tagged and compiled with
hk2-metadata-generator. Default to False
public static final String CONTROLLER_SERVICE_AUTO_DISCOVERY =
"pinot.controller.service.auto.discovery";
public static final String CONFIG_OF_LOGGER_ROOT_DIR =
"pinot.controller.logger.root.dir";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]