This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7f701245ca7 Make MSE worker manager pluggable (#17787)
7f701245ca7 is described below
commit 7f701245ca71c482ce71456e4e5082bfa82d5e14
Author: Yash Mayya <[email protected]>
AuthorDate: Sat Feb 28 22:47:31 2026 -0800
Make MSE worker manager pluggable (#17787)
---
.../broker/broker/helix/BaseBrokerStarter.java | 26 ++++++++-
.../BrokerRequestHandlerDelegate.java | 5 ++
.../MultiStageBrokerRequestHandler.java | 13 ++---
.../MultiStageWithoutStatsIntegrationTest.java | 2 +-
.../apache/pinot/query/routing/WorkerManager.java | 24 ++++++--
.../KeepPipelineBreakerStatsPredicate.java | 2 +-
.../apache/pinot/query/runtime/QueryRunner.java | 68 ++++++++++++++++------
.../pinot/query/runtime}/SendStatsPredicate.java | 2 +-
.../query/service/dispatch/QueryDispatcher.java | 4 ++
.../apache/pinot/query/QueryServerEnclosure.java | 3 +-
.../pinot/server/starter/ServerInstance.java | 4 +-
.../server/starter/helix/BaseServerStarter.java | 2 +
.../pinot/server/worker/WorkerQueryServer.java | 8 +--
13 files changed, 118 insertions(+), 45 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 44105c5d22f..5ce5521662c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -87,11 +87,13 @@ import org.apache.pinot.core.instance.context.BrokerContext;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
import org.apache.pinot.core.routing.MultiClusterRoutingContext;
+import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.NettyInspector;
import
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
+import org.apache.pinot.query.routing.WorkerManager;
import
org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
import
org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
@@ -138,7 +140,6 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected String _instanceId;
private volatile boolean _isStarting = false;
private volatile boolean _isShuttingDown = false;
-
// Dedicated handler for listening to cluster config changes
protected final DefaultClusterConfigChangeHandler
_clusterConfigChangeHandler =
new DefaultClusterConfigChangeHandler();
@@ -237,6 +238,14 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
return DefaultQueryOperatorFactoryProvider.INSTANCE;
}
+ /**
+ * Override to customize the {@link WorkerManager} used for multi-stage
query engine worker assignment.
+ */
+ protected WorkerManager createWorkerManager(String brokerId, String
hostname, int port,
+ RoutingManager routingManager) {
+ return new WorkerManager(brokerId, hostname, port, routingManager);
+ }
+
private void setupHelixSystemProperties() {
// NOTE: Helix will disconnect the manager and disable the instance if it
detects flapping (too frequent disconnect
// from ZooKeeper). Setting flapping time window to a small value can
avoid this from happening. Helix ignores the
@@ -438,10 +447,21 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
// multi-stage request handler uses both Netty and GRPC ports.
// worker requires both the "Netty port" for protocol transport; and
"GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
+ String queryRunnerHostname =
_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
+ int queryRunnerPort =
Integer.parseInt(_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
+ WorkerManager workerManager =
+ createWorkerManager(brokerId, queryRunnerHostname, queryRunnerPort,
_routingManager);
+ WorkerManager multiClusterWorkerManager;
+ if (multiClusterRoutingContext != null) {
+ multiClusterWorkerManager = createWorkerManager(brokerId,
queryRunnerHostname, queryRunnerPort,
+ multiClusterRoutingContext.getMultiClusterRoutingManager());
+ } else {
+ multiClusterWorkerManager = workerManager;
+ }
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId,
requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, _tableCache,
_multiStageQueryThrottler, _failureDetector,
- _threadAccountant, multiClusterRoutingContext);
+ _threadAccountant, multiClusterRoutingContext, workerManager,
multiClusterWorkerManager);
MultiStageBrokerRequestHandler finalHandler =
multiStageBrokerRequestHandler;
_routingManager.setServerReenableCallback(
serverInstance ->
finalHandler.getQueryDispatcher().resetClientConnectionBackoff(serverInstance));
@@ -724,7 +744,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
}
}
- private boolean updatePortIfNeeded(Map<String, String>
instanceConfigSimpleFields, String key, int port) {
+ protected boolean updatePortIfNeeded(Map<String, String>
instanceConfigSimpleFields, String key, int port) {
String existingPortStr = instanceConfigSimpleFields.get(key);
if (port > 0) {
String portStr = Integer.toString(port);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index 3bd3cb9a1a3..002de0e727a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -62,6 +62,11 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
_responseStore = responseStore;
}
+ @Nullable
+ public MultiStageBrokerRequestHandler getMultiStageBrokerRequestHandler() {
+ return _multiStageBrokerRequestHandler;
+ }
+
@Override
public void start() {
_singleStageBrokerRequestHandler.start();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 8d8afe0598a..55990a7116f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -148,20 +148,15 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
BrokerRequestIdGenerator requestIdGenerator, RoutingManager
routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
MultiStageQueryThrottler queryThrottler, FailureDetector
failureDetector, ThreadAccountant threadAccountant,
- MultiClusterRoutingContext multiClusterRoutingContext) {
+ MultiClusterRoutingContext multiClusterRoutingContext,
+ WorkerManager workerManager, WorkerManager multiClusterWorkerManager) {
super(config, brokerId, requestIdGenerator, routingManager,
accessControlFactory, queryQuotaManager, tableCache,
threadAccountant, multiClusterRoutingContext);
String hostname =
config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port =
Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
- _workerManager = new WorkerManager(_brokerId, hostname, port,
_routingManager);
- if (multiClusterRoutingContext != null) {
- _multiClusterWorkerManager = new WorkerManager(_brokerId, hostname, port,
- multiClusterRoutingContext.getMultiClusterRoutingManager());
- } else {
- // if multi-cluster routing is not enabled, use the same worker manager.
- _multiClusterWorkerManager = _workerManager;
- }
+ _workerManager = workerManager;
+ _multiClusterWorkerManager = multiClusterWorkerManager;
TlsConfig tlsConfig =
config.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ?
TlsUtils.extractTlsConfig(config,
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
index b74c5ac2217..fc4f146acc5 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.pinot.server.starter.helix.SendStatsPredicate;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index a65d1cfcfc6..2c89904755e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -205,7 +205,7 @@ public class WorkerManager {
// --------------------------------------------------------------------------
// Intermediate stage assign logic
// --------------------------------------------------------------------------
- private void assignWorkersToIntermediateFragment(PlanFragment fragment,
DispatchablePlanContext context) {
+ protected void assignWorkersToIntermediateFragment(PlanFragment fragment,
DispatchablePlanContext context) {
List<PlanFragment> children = fragment.getChildren();
Map<Integer, DispatchablePlanMetadata> metadataMap =
context.getDispatchablePlanMetadataMap();
DispatchablePlanMetadata metadata =
metadataMap.get(fragment.getFragmentId());
@@ -300,11 +300,13 @@ public class WorkerManager {
childWorkerIdToSegmentsMap.put(workerId, replicatedSegments);
}
} else {
- int numWorkers = candidateServers.size();
+ List<QueryServerInstance> replicatedLeafServers =
+ getCandidateServersForReplicatedLeaf(context, candidateServers);
+ int numWorkers = replicatedLeafServers.size();
childWorkerIdToServerInstanceMap =
Maps.newHashMapWithExpectedSize(numWorkers);
childWorkerIdToSegmentsMap =
Maps.newHashMapWithExpectedSize(numWorkers);
for (int workerId = 0; workerId < numWorkers; workerId++) {
- childWorkerIdToServerInstanceMap.put(workerId,
candidateServers.get(workerId));
+ childWorkerIdToServerInstanceMap.put(workerId,
replicatedLeafServers.get(workerId));
childWorkerIdToSegmentsMap.put(workerId, replicatedSegments);
}
}
@@ -359,7 +361,7 @@ public class WorkerManager {
/**
* Returns the servers serving any segment of the tables in the query.
*/
- private List<QueryServerInstance>
getCandidateServers(DispatchablePlanContext context) {
+ protected List<QueryServerInstance>
getCandidateServers(DispatchablePlanContext context) {
List<QueryServerInstance> candidateServers;
if (context.isUseLeafServerForIntermediateStage()) {
Set<QueryServerInstance> leafServerInstances =
context.getLeafServerInstances();
@@ -385,7 +387,7 @@ public class WorkerManager {
return candidateServers;
}
- private List<QueryServerInstance>
getCandidateServersPerTables(DispatchablePlanContext context) {
+ protected List<QueryServerInstance>
getCandidateServersPerTables(DispatchablePlanContext context) {
Set<String> nonLookupTables = context.getNonLookupTables();
assert !nonLookupTables.isEmpty();
Set<String> servers = new HashSet<>();
@@ -437,6 +439,18 @@ public class WorkerManager {
return candidateServers;
}
+ /**
+ * Returns the instances to assign to replicated leaf stage children when
there is no local exchange peer. By default,
+ * uses the same candidates as the intermediate stage.
+ *
+ * <p>Subclasses can override to use different instances for replicated leaf
stages (e.g., when intermediate stages
+ * run on non-server instances that cannot scan segments).</p>
+ */
+ protected List<QueryServerInstance>
getCandidateServersForReplicatedLeaf(DispatchablePlanContext context,
+ List<QueryServerInstance> intermediateStageWorkers) {
+ return intermediateStageWorkers;
+ }
+
private void assignWorkersToLeafFragment(PlanFragment fragment,
DispatchablePlanContext context) {
DispatchablePlanMetadata metadata =
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java
similarity index 98%
rename from
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
rename to
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java
index 374fa19770d..4bd5d3ea75f 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/KeepPipelineBreakerStatsPredicate.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/KeepPipelineBreakerStatsPredicate.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.query.runtime;
import java.util.Map;
import java.util.Set;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index d19fa564f1a..b8f356ee2b9 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -111,6 +111,7 @@ public class QueryRunner {
private ExecutorService _executorService;
private OpChainSchedulerService _opChainScheduler;
private MailboxService _mailboxService;
+ private boolean _ownsMailboxService = true;
private QueryExecutor _leafQueryExecutor;
// Group-by settings
@@ -143,10 +144,14 @@ public class QueryRunner {
private BooleanSupplier _keepPipelineBreakerStats;
/**
- * Initializes the query executor.
+ * Initializes the query runner.
* <p>Should be called only once and before calling any other method.
+ *
+ * @param instanceDataManager when non-null, the leaf query executor and
time series visitor are initialized
+ * for processing leaf-stage queries. When null,
only intermediate-stage execution
+ * is supported.
*/
- public void init(PinotConfiguration serverConf, InstanceDataManager
instanceDataManager,
+ public void init(PinotConfiguration serverConf, String instanceId, @Nullable
InstanceDataManager instanceDataManager,
@Nullable TlsConfig tlsConfig, BooleanSupplier sendStats,
BooleanSupplier keepPipelineBreakerStats) {
String hostname =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
@@ -221,19 +226,25 @@ public class QueryRunner {
_executorService = ThrottleOnCriticalHeapUsageExecutor.maybeWrap(
_executorService, serverConf, "multi-stage executor");
- _opChainScheduler = new
OpChainSchedulerService(instanceDataManager.getInstanceId(), _executorService,
serverConf);
- _mailboxService = new MailboxService(hostname, port, InstanceType.SERVER,
serverConf, tlsConfig);
- try {
- _leafQueryExecutor = new ServerQueryExecutorV1Impl();
-
_leafQueryExecutor.init(serverConf.subset(Server.QUERY_EXECUTOR_CONFIG_PREFIX),
instanceDataManager,
- serverMetrics);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ _opChainScheduler = new OpChainSchedulerService(instanceId,
_executorService, serverConf);
+ if (_mailboxService == null) {
+ _mailboxService = new MailboxService(hostname, port,
InstanceType.SERVER, serverConf, tlsConfig);
}
- if
(StringUtils.isNotBlank(serverConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
{
- _timeSeriesPhysicalPlanVisitor =
- new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor,
_executorService, serverMetrics);
- TimeSeriesBuilderFactoryProvider.init(serverConf);
+
+ if (instanceDataManager != null) {
+ try {
+ _leafQueryExecutor = new ServerQueryExecutorV1Impl();
+
_leafQueryExecutor.init(serverConf.subset(Server.QUERY_EXECUTOR_CONFIG_PREFIX),
instanceDataManager,
+ serverMetrics);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (StringUtils.isNotBlank(
+
serverConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey())))
{
+ _timeSeriesPhysicalPlanVisitor =
+ new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor,
_executorService, serverMetrics);
+ TimeSeriesBuilderFactoryProvider.init(serverConf);
+ }
}
_sendStats = sendStats;
@@ -242,14 +253,35 @@ public class QueryRunner {
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}",
hostname, port);
}
+ /**
+ * Initializes the query runner with a shared {@link MailboxService}.
+ */
+ public void init(PinotConfiguration serverConf, String instanceId, @Nullable
InstanceDataManager instanceDataManager,
+ @Nullable TlsConfig tlsConfig, BooleanSupplier sendStats,
BooleanSupplier keepPipelineBreakerStats,
+ @Nullable MailboxService sharedMailboxService) {
+ if (sharedMailboxService != null) {
+ _mailboxService = sharedMailboxService;
+ _ownsMailboxService = false;
+ }
+ init(serverConf, instanceId, instanceDataManager, tlsConfig, sendStats,
keepPipelineBreakerStats);
+ }
+
public void start() {
- _mailboxService.start();
- _leafQueryExecutor.start();
+ if (_ownsMailboxService) {
+ _mailboxService.start();
+ }
+ if (_leafQueryExecutor != null) {
+ _leafQueryExecutor.start();
+ }
}
public void shutDown() {
- _leafQueryExecutor.shutDown();
- _mailboxService.shutdown();
+ if (_leafQueryExecutor != null) {
+ _leafQueryExecutor.shutDown();
+ }
+ if (_ownsMailboxService) {
+ _mailboxService.shutdown();
+ }
ExecutorServiceUtils.close(_executorService);
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java
similarity index 99%
rename from
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
rename to
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java
index 27c610a83e9..d5d0d861102 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/SendStatsPredicate.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.server.starter.helix;
+package org.apache.pinot.query.runtime;
import java.util.HashMap;
import java.util.List;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index dc80478d9cc..89d9e227e1f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -133,6 +133,10 @@ public class QueryDispatcher {
}
}
+ public MailboxService getMailboxService() {
+ return _mailboxService;
+ }
+
public void start() {
_mailboxService.start();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 7dbf60c8dd4..0e6b7c8f1cf 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -62,7 +62,8 @@ public class QueryServerEnclosure {
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
_queryRunnerPort);
InstanceDataManager instanceDataManager =
factory.buildInstanceDataManager();
_queryRunner = new QueryRunner();
- _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager, null, () -> true, () -> true);
+ _queryRunner.init(new PinotConfiguration(runnerConfig),
instanceDataManager.getInstanceId(), instanceDataManager,
+ null, () -> true, () -> true);
}
public int getPort() {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 88c45b32d8b..4b62ad492dd 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -42,14 +42,14 @@ import
org.apache.pinot.core.transport.ChannelHandlerFactory;
import org.apache.pinot.core.transport.InstanceRequestHandler;
import org.apache.pinot.core.transport.QueryServer;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
+import org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.local.utils.ServerReloadJobStatusCache;
import org.apache.pinot.server.access.AccessControl;
import org.apache.pinot.server.access.AccessControlFactory;
import org.apache.pinot.server.access.AllowAllAccessFactory;
import org.apache.pinot.server.conf.ServerConf;
-import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
-import org.apache.pinot.server.starter.helix.SendStatsPredicate;
import org.apache.pinot.server.worker.WorkerQueryServer;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 3fd2b58d9e0..9aeb6a1732e 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -91,6 +91,8 @@ import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.NettyInspector;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
+import org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
import
org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
import
org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
import
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshManager;
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 4c343416429..f38c162b952 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -21,10 +21,10 @@ package org.apache.pinot.server.worker;
import javax.annotation.Nullable;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.query.runtime.KeepPipelineBreakerStatsPredicate;
import org.apache.pinot.query.runtime.QueryRunner;
+import org.apache.pinot.query.runtime.SendStatsPredicate;
import org.apache.pinot.query.service.server.QueryServer;
-import org.apache.pinot.server.starter.helix.KeepPipelineBreakerStatsPredicate;
-import org.apache.pinot.server.starter.helix.SendStatsPredicate;
import org.apache.pinot.spi.accounting.ThreadAccountant;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -44,8 +44,8 @@ public class WorkerQueryServer {
_queryServicePort =
serverConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
QueryRunner queryRunner = new QueryRunner();
- queryRunner.init(serverConf, instanceDataManager, tlsConfig,
sendStats::isSendStats,
- keepPipelineBreakerStatsPredicate::isEnabled);
+ queryRunner.init(serverConf, instanceDataManager.getInstanceId(),
instanceDataManager, tlsConfig,
+ sendStats::isSendStats, keepPipelineBreakerStatsPredicate::isEnabled);
_queryWorkerService =
new QueryServer(serverConf, instanceDataManager.getInstanceId(),
_queryServicePort, queryRunner, tlsConfig,
threadAccountant);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]