This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 733cc2fa6b [multistage] Use Different Thread Pool for QueryServer
(#10349)
733cc2fa6b is described below
commit 733cc2fa6ba7ec4fc022a4a1d951028b2156e43d
Author: Ankit Sultana <[email protected]>
AuthorDate: Wed Mar 1 01:18:32 2023 +0530
[multistage] Use Different Thread Pool for QueryServer (#10349)
---
.../apache/pinot/query/runtime/QueryRunner.java | 43 +++++++++++++++++++---
.../apache/pinot/query/service/QueryServer.java | 7 ++--
.../pinot/query/service/QueryDispatcherTest.java | 9 +++--
.../pinot/query/service/QueryServerTest.java | 9 +++--
4 files changed, 53 insertions(+), 15 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 3846e4d424..4c7380943e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.combine.BaseCombineOperator;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
@@ -84,7 +85,32 @@ public class QueryRunner {
private String _hostname;
private int _port;
private VirtualServerAddress _rootServer;
- private ExecutorService _executorService;
+ // Query worker threads are used for (1) running intermediate stage
operators (2) running segment level operators
+ /**
+ * Query worker threads are used for:
+ * <ol>
+ * <li>
+ * Running intermediate stage operators (v2 engine operators).
+ * </li>
+ * <li>
+ * Running per-segment operators submitted in {@link
BaseCombineOperator}.
+ * </li>
+ * </ol>
+ */
+ private ExecutorService _queryWorkerExecutorService;
+ /**
+ * Query runner threads are used for:
+ * <ol>
+ * <li>
+ * Merging results in BaseCombineOperator for leaf stages. Results are
provided by per-segment operators run in
+ * worker threads
+ * </li>
+ * <li>
+ * Building the OperatorChain and submitting to the scheduler for
non-leaf stages (intermediate stages).
+ * </li>
+ * </ol>
+ */
+ private ExecutorService _queryRunnerExecutorService;
private OpChainSchedulerService _scheduler;
/**
@@ -103,10 +129,13 @@ public class QueryRunner {
try {
long releaseMs =
config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
QueryConfig.DEFAULT_SCHEDULER_RELEASE_TIMEOUT_MS);
- _executorService = Executors.newFixedThreadPool(
+ _queryWorkerExecutorService = Executors.newFixedThreadPool(
ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("query_worker_on_" + _port + "_port"));
- _scheduler = new OpChainSchedulerService(new
RoundRobinScheduler(releaseMs), _executorService);
+ _queryRunnerExecutorService = Executors.newFixedThreadPool(
+ ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
+ new NamedThreadFactory("query_runner_on_" + _port + "_port"));
+ _scheduler = new OpChainSchedulerService(new
RoundRobinScheduler(releaseMs), _queryWorkerExecutorService);
_mailboxService = MultiplexingMailboxService.newInstance(_hostname,
_port, config, _scheduler::onDataAvailable);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX),
instanceDataManager, serverMetrics);
@@ -173,8 +202,12 @@ public class QueryRunner {
}
}
- public ExecutorService getExecutorService() {
- return _executorService;
+ public ExecutorService getQueryWorkerExecutorService() {
+ return _queryWorkerExecutorService;
+ }
+
+ public ExecutorService getQueryRunnerExecutorService() {
+ return _queryRunnerExecutorService;
}
private static List<ServerPlanRequestContext>
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index c6a2552a60..86605c4a8a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -52,12 +52,11 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
public QueryServer(int port, QueryRunner queryRunner) {
_server =
ServerBuilder.forPort(port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
_queryRunner = queryRunner;
- _executorService = queryRunner.getExecutorService();
- LOGGER.info("Initialized QueryServer on port: {}", port);
+ _executorService = queryRunner.getQueryRunnerExecutorService();
}
public void start() {
- LOGGER.info("Starting QueryWorker");
+ LOGGER.info("Starting QueryServer");
try {
_queryRunner.start();
_server.start();
@@ -67,7 +66,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
}
public void shutdown() {
- LOGGER.info("Shutting down QueryWorker");
+ LOGGER.info("Shutting down QueryServer");
try {
_queryRunner.shutDown();
_server.shutdown();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
index b523013ce4..5d8226d681 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java
@@ -44,8 +44,10 @@ import org.testng.annotations.Test;
public class QueryDispatcherTest extends QueryTestSet {
private static final Random RANDOM_REQUEST_ID_GEN = new Random();
private static final int QUERY_SERVER_COUNT = 2;
- private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
- ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new
NamedThreadFactory("QueryDispatcherTestExecutorService"));
+ private static final ExecutorService WORKER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
+ ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new
NamedThreadFactory("QueryDispatcherTest_Worker"));
+ private static final ExecutorService RUNNER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
+ ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new
NamedThreadFactory("QueryDispatcherTest_Runner"));
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
@@ -59,7 +61,8 @@ public class QueryDispatcherTest extends QueryTestSet {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);;
-
Mockito.when(queryRunner.getExecutorService()).thenReturn(EXECUTOR_SERVICE);
+
Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE);
+
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index de19b218eb..4ba37cb707 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -58,8 +58,10 @@ public class QueryServerTest extends QueryTestSet {
private static final int QUERY_SERVER_COUNT = 2;
private static final String KEY_OF_SERVER_INSTANCE_HOST =
"pinot.query.runner.server.hostname";
private static final String KEY_OF_SERVER_INSTANCE_PORT =
"pinot.query.runner.server.port";
- private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
- ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new
NamedThreadFactory("QueryServerTestExecutorService"));
+ private static final ExecutorService WORKER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
+ ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new
NamedThreadFactory("QueryServerTest_Worker"));
+ private static final ExecutorService RUNNER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(
+ ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new
NamedThreadFactory("QueryServerTest_Runner"));
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
private final Map<Integer, ServerInstance> _queryServerInstanceMap = new
HashMap<>();
@@ -74,7 +76,8 @@ public class QueryServerTest extends QueryTestSet {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
-
Mockito.when(queryRunner.getExecutorService()).thenReturn(EXECUTOR_SERVICE);
+
Mockito.when(queryRunner.getQueryWorkerExecutorService()).thenReturn(WORKER_EXECUTOR_SERVICE);
+
Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]