This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f701245ca7 Make MSE worker manager pluggable (#17787)
7f701245ca7 is described below

commit 7f701245ca71c482ce71456e4e5082bfa82d5e14
Author: Yash Mayya <[email protected]>
AuthorDate: Sat Feb 28 22:47:31 2026 -0800

    Make MSE worker manager pluggable (#17787)
---
 .../broker/broker/helix/BaseBrokerStarter.java     | 26 ++++++++-
 .../BrokerRequestHandlerDelegate.java              |  5 ++
 .../MultiStageBrokerRequestHandler.java            | 13 ++---
 .../MultiStageWithoutStatsIntegrationTest.java     |  2 +-
 .../apache/pinot/query/routing/WorkerManager.java  | 24 ++++++--
 .../KeepPipelineBreakerStatsPredicate.java         |  2 +-
 .../apache/pinot/query/runtime/QueryRunner.java    | 68 ++++++++++++++++------
 .../pinot/query/runtime}/SendStatsPredicate.java   |  2 +-
 .../query/service/dispatch/QueryDispatcher.java    |  4 ++
 .../apache/pinot/query/QueryServerEnclosure.java   |  3 +-
 .../pinot/server/starter/ServerInstance.java       |  4 +-
 .../server/starter/helix/BaseServerStarter.java    |  2 +
 .../pinot/server/worker/WorkerQueryServer.java     |  8 +--
 13 files changed, 118 insertions(+), 45 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 44105c5d22f..5ce5521662c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -87,11 +87,13 @@ import org.apache.pinot.core.instance.context.BrokerContext;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
 import org.apache.pinot.core.routing.MultiClusterRoutingContext;
+import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.transport.NettyInspector;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
+import org.apache.pinot.query.routing.WorkerManager;
 import 
org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
 import 
org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
 import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
@@ -138,7 +140,6 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected String _instanceId;
   private volatile boolean _isStarting = false;
   private volatile boolean _isShuttingDown = false;
-
   // Dedicated handler for listening to cluster config changes
   protected final DefaultClusterConfigChangeHandler 
_clusterConfigChangeHandler =
       new DefaultClusterConfigChangeHandler();
@@ -237,6 +238,14 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     return DefaultQueryOperatorFactoryProvider.INSTANCE;
   }
 
+  /**
+   * Override to customize the {@link WorkerManager} used for multi-stage 
query engine worker assignment.
+   */
+  protected WorkerManager createWorkerManager(String brokerId, String 
hostname, int port,
+      RoutingManager routingManager) {
+    return new WorkerManager(brokerId, hostname, port, routingManager);
+  }
+
   private void setupHelixSystemProperties() {
     // NOTE: Helix will disconnect the manager and disable the instance if it 
detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can 
avoid this from happening. Helix ignores the
@@ -438,10 +447,21 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       // multi-stage request handler uses both Netty and GRPC ports.
       // worker requires both the "Netty port" for protocol transport; and 
"GRPC port" for mailbox transport.
       // TODO: decouple protocol and engine selection.
+      String queryRunnerHostname = 
_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
+      int queryRunnerPort = 
Integer.parseInt(_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
+      WorkerManager workerManager =
+          createWorkerManager(brokerId, queryRunnerHostname, queryRunnerPort, 
_routingManager);
+      WorkerManager multiClusterWorkerManager;
+      if (multiClusterRoutingContext != null) {
+        multiClusterWorkerManager = createWorkerManager(brokerId, 
queryRunnerHostname, queryRunnerPort,
+            multiClusterRoutingContext.getMultiClusterRoutingManager());
+      } else {
+        multiClusterWorkerManager = workerManager;
+      }
       multiStageBrokerRequestHandler =
           new MultiStageBrokerRequestHandler(_brokerConf, brokerId, 
requestIdGenerator, _routingManager,
               _accessControlFactory, _queryQuotaManager, _tableCache, 
_multiStageQueryThrottler, _failureDetector,
-              _threadAccountant, multiClusterRoutingContext);
+              _threadAccountant, multiClusterRoutingContext, workerManager, 
multiClusterWorkerManager);
       MultiStageBrokerRequestHandler finalHandler = 
multiStageBrokerRequestHandler;
       _routingManager.setServerReenableCallback(
           serverInstance -> 
finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
@@ -724,7 +744,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     }
   }
 
-  private boolean updatePortIfNeeded(Map<String, String> 
instanceConfigSimpleFields, String key, int port) {
+  protected boolean updatePortIfNeeded(Map<String, String> 
instanceConfigSimpleFields, String key, int port) {
     String existingPortStr = instanceConfigSimpleFields.get(key);
     if (port > 0) {
       String portStr = Integer.toString(port);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index 3bd3cb9a1a3..002de0e727a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -62,6 +62,11 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
     _responseStore = responseStore;
   }
 
+  @Nullable
+  public MultiStageBrokerRequestHandler getMultiStageBrokerRequestHandler() {
+    return _multiStageBrokerRequestHandler;
+  }
+
   @Override
   public void start() {
     _singleStageBrokerRequestHandler.start();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 8d8afe0598a..55990a7116f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -148,20 +148,15 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       BrokerRequestIdGenerator requestIdGenerator, RoutingManager 
routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager 
queryQuotaManager, TableCache tableCache,
       MultiStageQueryThrottler queryThrottler, FailureDetector 
failureDetector, ThreadAccountant threadAccountant,
-      MultiClusterRoutingContext multiClusterRoutingContext) {
+      MultiClusterRoutingContext multiClusterRoutingContext,
+      WorkerManager workerManager, WorkerManager multiClusterWorkerManager) {
     super(config, brokerId, requestIdGenerator, routingManager, 
accessControlFactory, queryQuotaManager, tableCache,
         threadAccountant, multiClusterRoutingContext);
     String hostname = 
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     int port = 
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
 
-    _workerManager = new WorkerManager(_brokerId, hostname, port, 
_routingManager);
-    if (multiClusterRoutingContext != null) {
-      _multiClusterWorkerManager = new WorkerManager(_brokerId, hostname, port,
-          multiClusterRoutingContext.getMultiClusterRoutingManager());
-    } else {
-      // if multi-cluster routing is not enabled, use the same worker manager.
-      _multiClusterWorkerManager = _workerManager;
-    }
+    _workerManager = workerManager;
+    _multiClusterWorkerManager = multiClusterWorkerManager;
 
     TlsConfig tlsConfig = 
config.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
         CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? 
TlsUtils.extractTlsConfig(config,
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
index b74c5ac2217..fc4f146acc5 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.pinot.server.starter.helix.SendStatsPredicate;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index a65d1cfcfc6..2c89904755e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -205,7 +205,7 @@ public class WorkerManager {
   // --------------------------------------------------------------------------
   // Intermediate stage assign logic
   // --------------------------------------------------------------------------
-  private void assignWorkersToIntermediateFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
+  protected void assignWorkersToIntermediateFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
     List<PlanFragment> children = fragment.getChildren();
     Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
     DispatchablePlanMetadata metadata = 
metadataMap.get(fragment.getFragmentId());
@@ -300,11 +300,13 @@ public class WorkerManager {
             childWorkerIdToSegmentsMap.put(workerId, replicatedSegments);
           }
         } else {
-          int numWorkers = candidateServers.size();
+          List<QueryServerInstance> replicatedLeafServers =
+              getCandidateServersForReplicatedLeaf(context, candidateServers);
+          int numWorkers = replicatedLeafServers.size();
           childWorkerIdToServerInstanceMap = 
Maps.newHashMapWithExpectedSize(numWorkers);
           childWorkerIdToSegmentsMap = 
Maps.newHashMapWithExpectedSize(numWorkers);
           for (int workerId = 0; workerId < numWorkers; workerId++) {
-            childWorkerIdToServerInstanceMap.put(workerId, 
candidateServers.get(workerId));
+            childWorkerIdToServerInstanceMap.put(workerId, 
replicatedLeafServers.get(workerId));
             childWorkerIdToSegmentsMap.put(workerId, replicatedSegments);
           }
         }
@@ -359,7 +361,7 @@ public class WorkerManager {
   /**
    * Returns the servers serving any segment of the tables in the query.
    */
-  private List<QueryServerInstance> 
getCandidateServers(DispatchablePlanContext context) {
+  protected List<QueryServerInstance> 
getCandidateServers(DispatchablePlanContext context) {
     List<QueryServerInstance> candidateServers;
     if (context.isUseLeafServerForIntermediateStage()) {
       Set<QueryServerInstance> leafServerInstances = 
context.getLeafServerInstances();
@@ -385,7 +387,7 @@ public class WorkerManager {
     return candidateServers;
   }
 
-  private List<QueryServerInstance> 
getCandidateServersPerTables(DispatchablePlanContext context) {
+  protected List<QueryServerInstance> 
getCandidateServersPerTables(DispatchablePlanContext context) {
     Set<String> nonLookupTables = context.getNonLookupTables();
     assert !nonLookupTables.isEmpty();
     Set<String> servers = new HashSet<>();
@@ -437,6 +439,18 @@ public class WorkerManager {
     return candidateServers;
   }
 
+  /**
+   * Returns the instances to assign to replicated leaf stage children when 
there is no local exchange peer. By default,
+   * uses the same candidates as the intermediate stage.
+   *
+   * <p>Subclasses can override to use different instances for replicated leaf 
stages (e.g., when intermediate stages
+   * run on non-server instances that cannot scan segments).</p>
+   */
+  protected List<QueryServerInstance> 
getCandidateServersForReplicatedLeaf(DispatchablePlanContext context,
+      List<QueryServerInstance> intermediateStageWorkers) {
+    return intermediateStageWorkers;
+  }
+
   private void assignWorkersToLeafFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
     DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java
similarity index 98%
rename from 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
rename to 
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java
index 374fa19770d..4bd5d3ea75f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.query.runtime;
 
 import java.util.Map;
 import java.util.Set;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d19fa564f1a..b8f356ee2b9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -111,6 +111,7 @@ public class QueryRunner {
   private ExecutorService _executorService;
   private OpChainSchedulerService _opChainScheduler;
   private MailboxService _mailboxService;
+  private boolean _ownsMailboxService = true;
   private QueryExecutor _leafQueryExecutor;
 
   // Group-by settings
@@ -143,10 +144,14 @@ public class QueryRunner {
   private BooleanSupplier _keepPipelineBreakerStats;
 
   /**
-   * Initializes the query executor.
+   * Initializes the query runner.
    * <p>Should be called only once and before calling any other method.
+   *
+   * @param instanceDataManager when non-null, the leaf query executor and 
time series visitor are initialized
+   *                            for processing leaf-stage queries. When null, 
only intermediate-stage execution
+   *                            is supported.
    */
-  public void init(PinotConfiguration serverConf, InstanceDataManager 
instanceDataManager,
+  public void init(PinotConfiguration serverConf, String instanceId, @Nullable 
InstanceDataManager instanceDataManager,
       @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, 
BooleanSupplier keepPipelineBreakerStats) {
     String hostname = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
@@ -221,19 +226,25 @@ public class QueryRunner {
     _executorService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
         _executorService, serverConf, "multi-stage executor");
 
-    _opChainScheduler = new 
OpChainSchedulerService(instanceDataManager.getInstanceId(), _executorService, 
serverConf);
-    _mailboxService = new MailboxService(hostname, port, InstanceType.SERVER, 
serverConf, tlsConfig);
-    try {
-      _leafQueryExecutor = new ServerQueryExecutorV1Impl();
-      
_leafQueryExecutor.init(serverConf.subset(Server.QUERY_EXECUTOR_CONFIG_PREFIX), 
instanceDataManager,
-          serverMetrics);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    _opChainScheduler = new OpChainSchedulerService(instanceId, 
_executorService, serverConf);
+    if (_mailboxService == null) {
+      _mailboxService = new MailboxService(hostname, port, 
InstanceType.SERVER, serverConf, tlsConfig);
     }
-    if 
(StringUtils.isNotBlank(serverConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
 {
-      _timeSeriesPhysicalPlanVisitor =
-          new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor, 
_executorService, serverMetrics);
-      TimeSeriesBuilderFactoryProvider.init(serverConf);
+
+    if (instanceDataManager != null) {
+      try {
+        _leafQueryExecutor = new ServerQueryExecutorV1Impl();
+        
_leafQueryExecutor.init(serverConf.subset(Server.QUERY_EXECUTOR_CONFIG_PREFIX), 
instanceDataManager,
+            serverMetrics);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      if (StringUtils.isNotBlank(
+          
serverConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
 {
+        _timeSeriesPhysicalPlanVisitor =
+            new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor, 
_executorService, serverMetrics);
+        TimeSeriesBuilderFactoryProvider.init(serverConf);
+      }
     }
 
     _sendStats = sendStats;
@@ -242,14 +253,35 @@ public class QueryRunner {
     LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", 
hostname, port);
   }
 
+  /**
+   * Initializes the query runner with a shared {@link MailboxService}.
+   */
+  public void init(PinotConfiguration serverConf, String instanceId, @Nullable 
InstanceDataManager instanceDataManager,
+      @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats, 
BooleanSupplier keepPipelineBreakerStats,
+      @Nullable MailboxService sharedMailboxService) {
+    if (sharedMailboxService != null) {
+      _mailboxService = sharedMailboxService;
+      _ownsMailboxService = false;
+    }
+    init(serverConf, instanceId, instanceDataManager, tlsConfig, sendStats, 
keepPipelineBreakerStats);
+  }
+
   public void start() {
-    _mailboxService.start();
-    _leafQueryExecutor.start();
+    if (_ownsMailboxService) {
+      _mailboxService.start();
+    }
+    if (_leafQueryExecutor != null) {
+      _leafQueryExecutor.start();
+    }
   }
 
   public void shutDown() {
-    _leafQueryExecutor.shutDown();
-    _mailboxService.shutdown();
+    if (_leafQueryExecutor != null) {
+      _leafQueryExecutor.shutDown();
+    }
+    if (_ownsMailboxService) {
+      _mailboxService.shutdown();
+    }
     ExecutorServiceUtils.close(_executorService);
   }
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java
similarity index 99%
rename from 
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
rename to 
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java
index 27c610a83e9..d5d0d861102 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.query.runtime;
 
 import java.util.HashMap;
 import java.util.List;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index dc80478d9cc..89d9e227e1f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -133,6 +133,10 @@ public class QueryDispatcher {
     }
   }
 
+  public MailboxService getMailboxService() {
+    return _mailboxService;
+  }
+
   public void start() {
     _mailboxService.start();
   }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 7dbf60c8dd4..0e6b7c8f1cf 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -62,7 +62,8 @@ public class QueryServerEnclosure {
     
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
 _queryRunnerPort);
     InstanceDataManager instanceDataManager = 
factory.buildInstanceDataManager();
     _queryRunner = new QueryRunner();
-    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager, null, () -> true, () -> true);
+    _queryRunner.init(new PinotConfiguration(runnerConfig), 
instanceDataManager.getInstanceId(), instanceDataManager,
+        null, () -> true, () -> true);
   }
 
   public int getPort() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 88c45b32d8b..4b62ad492dd 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -42,14 +42,14 @@ import 
org.apache.pinot.core.transport.ChannelHandlerFactory;
 import org.apache.pinot.core.transport.InstanceRequestHandler;
 import org.apache.pinot.core.transport.QueryServer;
 import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
+import org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
 import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
 import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
 import org.apache.pinot.server.access.AccessControl;
 import org.apache.pinot.server.access.AccessControlFactory;
 import org.apache.pinot.server.access.AllowAllAccessFactory;
 import org.apache.pinot.server.conf.ServerConf;
-import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
-import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.server.worker.WorkerQueryServer;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 3fd2b58d9e0..9aeb6a1732e 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -91,6 +91,8 @@ import org.apache.pinot.core.transport.ListenerConfig;
 import org.apache.pinot.core.transport.NettyInspector;
 import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
+import org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
 import 
org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
 import 
org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 4c343416429..f38c162b952 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -21,10 +21,10 @@ package org.apache.pinot.server.worker;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
 import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
 import org.apache.pinot.query.service.server.QueryServer;
-import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
-import org.apache.pinot.server.starter.helix.SendStatsPredicate;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -44,8 +44,8 @@ public class WorkerQueryServer {
     _queryServicePort = 
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
         MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     QueryRunner queryRunner = new QueryRunner();
-    queryRunner.init(serverConf, instanceDataManager, tlsConfig, 
sendStats::isSendStats,
-        keepPipelineBreakerStatsPredicate::isEnabled);
+    queryRunner.init(serverConf, instanceDataManager.getInstanceId(), 
instanceDataManager, tlsConfig,
+        sendStats::isSendStats, keepPipelineBreakerStatsPredicate::isEnabled);
     _queryWorkerService =
         new QueryServer(serverConf, instanceDataManager.getInstanceId(), 
_queryServicePort, queryRunner, tlsConfig,
             threadAccountant);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to