yashmayya commented on code in PR #15445:
URL: https://github.com/apache/pinot/pull/15445#discussion_r2044463716
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -329,13 +364,12 @@ private <W> void submitStage(
* applying the submitFunction to each worker and the consumer to the list
of results.
Review Comment:
The Javadoc needs to be updated here - it currently says `Submits each stage
in the request to the workers and waits for all workers to complete` which is
now incorrect.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -898,10 +898,33 @@ public static class Server {
public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000;
// TODO: Merge this with "mse"
+ /**
+ * The ExecutorServiceProvider to use for execution threads, which are the
ones that execute
+ * MultiStageOperators (and SSE operators in the leaf stages).
+ *
+ * It is recommended to use cached. In case fixed is used, it should use a
large enough number of threads or
+ * parent operators may consume all threads.
+ * In Java 21 or newer, virtual threads are a good solution. Although
Apache Pinot doesn't include this option yet,
+ * it is trivial to implement that plugin.
+ *
+ * See QueryRunner
+ */
public static final String MULTISTAGE_EXECUTOR = "multistage.executor";
public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX =
QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR;
public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached";
+ /**
+ * The ExecutorServiceProvider to be used for submission threads, which
are the ones
+ * that receive requests in protobuf and transform them into
MultiStageOperators.
+ *
+ * It is recommended to use a fixed thread pool here, although defaults to
cached for historical
+ * reasons.
Review Comment:
Why aren't we changing this to `fixed` with your changes in this patch? Is
it in order to be sure that we don't introduce a regression?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request,
StreamObserver<Worker.QueryRespo
return;
}
- try (QueryThreadContext.CloseableContext queryTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
- QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
+ try (QueryThreadContext.CloseableContext qClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ QueryThreadContext.CloseableContext mseCloseable =
MseWorkerThreadContext.open()) {
+
long requestId = QueryThreadContext.getRequestId();
QueryThreadContext.setQueryEngine("mse");
- Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
ThreadExecutionContext.TaskType.MSE);
- ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
- try {
- forEachStage(request,
- (stagePlan, workerMetadata) -> {
+ // Submit the stage for each worker
+ List<CompletableFuture<List<Object>>> futures =
forEachStageAndWorker(request,
+ (stagePlan, workerMetadata) -> {
+ Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
ThreadExecutionContext.TaskType.MSE);
+ ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
+
+ try {
_queryRunner.processQuery(workerMetadata, stagePlan,
reqMetadata, parentContext);
- return null;
- },
- (ignored) -> {
- });
- } catch (ExecutionException | InterruptedException | TimeoutException |
RuntimeException e) {
- LOGGER.error("Caught exception while submitting request: {}",
requestId, e);
- String errorMsg = "Caught exception while submitting request: " +
e.getMessage();
- responseObserver.onNext(Worker.QueryResponse.newBuilder()
-
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
- .build());
- responseObserver.onCompleted();
- return;
- } finally {
- Tracing.ThreadAccountantOps.clear();
- }
- responseObserver.onNext(
- Worker.QueryResponse.newBuilder()
-
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
+ } finally {
+ Tracing.ThreadAccountantOps.clear();
+ }
+ return null;
+ });
+
+ // A completable future that will finish when all submit task finish or
on timoeut
Review Comment:
```suggestion
// A completable future that will finish when all submit task finish
or on timeout
```
nit
##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -898,10 +898,33 @@ public static class Server {
public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000;
// TODO: Merge this with "mse"
+ /**
+ * The ExecutorServiceProvider to use for execution threads, which are the
ones that execute
+ * MultiStageOperators (and SSE operators in the leaf stages).
+ *
+ * It is recommended to use cached. In case fixed is used, it should use a
large enough number of threads or
+ * parent operators may consume all threads.
+ * In Java 21 or newer, virtual threads are a good solution. Although
Apache Pinot doesn't include this option yet,
+ * it is trivial to implement that plugin.
+ *
+ * See QueryRunner
+ */
public static final String MULTISTAGE_EXECUTOR = "multistage.executor";
public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX =
QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR;
public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached";
+ /**
+ * The ExecutorServiceProvider to be used for submission threads, which
are the ones
+ * that receive requests in protobuf and transform them into
MultiStageOperators.
+ *
+ * It is recommended to use a fixed thread pool here, although defaults to
cached for historical
+ * reasons.
+ *
+ * See QueryServer
+ */
+ public static final String MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX =
+ QUERY_EXECUTOR_CONFIG_PREFIX + "multistage.submission";
Review Comment:
This needs a `.` separator after `QUERY_EXECUTOR_CONFIG_PREFIX`
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -159,37 +164,49 @@ public void submit(Worker.QueryRequest request,
StreamObserver<Worker.QueryRespo
return;
}
- try (QueryThreadContext.CloseableContext queryTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
- QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
+ try (QueryThreadContext.CloseableContext qClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ QueryThreadContext.CloseableContext mseCloseable =
MseWorkerThreadContext.open()) {
+
long requestId = QueryThreadContext.getRequestId();
QueryThreadContext.setQueryEngine("mse");
- Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
ThreadExecutionContext.TaskType.MSE);
- ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
- try {
- forEachStage(request,
- (stagePlan, workerMetadata) -> {
+ // Submit the stage for each worker
+ List<CompletableFuture<List<Object>>> futures =
forEachStageAndWorker(request,
+ (stagePlan, workerMetadata) -> {
+ Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
ThreadExecutionContext.TaskType.MSE);
+ ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
+
+ try {
_queryRunner.processQuery(workerMetadata, stagePlan,
reqMetadata, parentContext);
- return null;
- },
- (ignored) -> {
- });
- } catch (ExecutionException | InterruptedException | TimeoutException |
RuntimeException e) {
- LOGGER.error("Caught exception while submitting request: {}",
requestId, e);
- String errorMsg = "Caught exception while submitting request: " +
e.getMessage();
- responseObserver.onNext(Worker.QueryResponse.newBuilder()
-
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
- .build());
- responseObserver.onCompleted();
- return;
- } finally {
- Tracing.ThreadAccountantOps.clear();
- }
- responseObserver.onNext(
- Worker.QueryResponse.newBuilder()
-
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
+ } finally {
+ Tracing.ThreadAccountantOps.clear();
+ }
+ return null;
+ });
+
+ // A completable future that will finish when all submit task finish or
on timoeut
+ CompletableFuture<Void> allCompleted =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .orTimeout(QueryThreadContext.getDeadlineMs(),
TimeUnit.MILLISECONDS);
+ // When this future completes, notify the broker.
+ allCompleted.handle((result, error) -> {
Review Comment:
Why not `whenComplete` instead?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -208,43 +225,61 @@ public void explain(Worker.QueryRequest request,
StreamObserver<Worker.ExplainRe
return;
}
- try (QueryThreadContext.CloseableContext queryTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ try (QueryThreadContext.CloseableContext qTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
- try {
- forEachStage(request,
- (stagePlan, workerMetadata) ->
_queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata),
- (plans) -> {
- Worker.ExplainResponse.Builder builder =
Worker.ExplainResponse.newBuilder();
- for (StagePlan plan : plans) {
- ByteString rootAsBytes =
PlanNodeSerializer.process(plan.getRootNode()).toByteString();
+ // Explain the stage for each worker
+ List<CompletableFuture<List<StagePlan>>> futures =
forEachStageAndWorker(request,
+ (stagePlan, workerMetadata) ->
_queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata));
+ CompletableFuture<?>[] responseFutures = futures.stream()
+ .map(plansFuture ->
+ plansFuture.thenApply(plans -> {
+ Worker.ExplainResponse.Builder builder =
Worker.ExplainResponse.newBuilder();
+ for (StagePlan plan : plans) {
+ ByteString rootAsBytes =
PlanNodeSerializer.process(plan.getRootNode()).toByteString();
+
+ StageMetadata metadata = plan.getStageMetadata();
+ List<Worker.WorkerMetadata> protoWorkerMetadataList =
+
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
- StageMetadata metadata = plan.getStageMetadata();
- List<Worker.WorkerMetadata> protoWorkerMetadataList =
-
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
+
builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata(
+
Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId())
+ .addAllWorkerMetadata(protoWorkerMetadataList)
+
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
+ }
+
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
"");
+ synchronized (responseObserver) {
+ responseObserver.onNext(builder.build());
+ }
+ return null;
+ })
+ ).toArray(CompletableFuture[]::new);
-
builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata(
-
Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId())
- .addAllWorkerMetadata(protoWorkerMetadataList)
-
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
- }
-
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
"");
- responseObserver.onNext(builder.build());
- });
- } catch (ExecutionException | InterruptedException | TimeoutException |
RuntimeException e) {
- long requestId = QueryThreadContext.getRequestId();
- LOGGER.error("Caught exception while submitting request: {}",
requestId, e);
- String errorMsg = "Caught exception while submitting request: " +
e.getMessage();
- responseObserver.onNext(Worker.ExplainResponse.newBuilder()
-
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
+ // A completable future that will finish when all submit task finish or
on timoeut
+ CompletableFuture<Void> allCompleted =
CompletableFuture.allOf(responseFutures)
+ .orTimeout(QueryThreadContext.getDeadlineMs(),
TimeUnit.MILLISECONDS);
+ // When this future completes, notify the broker.
+ allCompleted.handle((result, error) -> {
+ if (error != null) {
+ long requestId = QueryThreadContext.getRequestId();
+ LOGGER.error("Caught exception while submitting request: {}",
requestId, error);
+ String errorMsg = "Caught exception while submitting request: " +
error.getMessage();
+ synchronized (responseObserver) {
Review Comment:
Nice catch, I wasn't aware that `StreamObserver` doesn't guarantee thread
safety.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java:
##########
@@ -130,6 +124,15 @@ public void start() {
}
}
+ private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T>
builder) {
+ return builder
+ // By using directExecutor, GRPC doesn't need to manage its own
thread pool
Review Comment:
IIUC, this means that the request processing callbacks will happen directly
on the transport thread - i.e., the Netty thread? And this is only recommended
when the request processing is non-blocking - however, we still do proto deser
on the request processing thread. I assume that that's okay because it should
have minimal overhead and we don't really risk blocking Netty threads for
handling other connections and requests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]