gortiz commented on code in PR #15445:
URL: https://github.com/apache/pinot/pull/15445#discussion_r2028246011
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -257,54 +258,57 @@ public void processQuery(WorkerMetadata workerMetadata,
StagePlan stagePlan, Map
Map<String, String> opChainMetadata =
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
// run pre-stage execution for all pipeline breakers
- PipelineBreakerResult pipelineBreakerResult =
+ CompletableFuture<PipelineBreakerResult> pipelineBreakerResultFuture =
CompletableFuture.supplyAsync(() ->
PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler,
_mailboxService, workerMetadata, stagePlan,
- opChainMetadata, requestId, deadlineMs, parentContext,
_sendStats.getAsBoolean());
-
- // Send error block to all the receivers if pipeline breaker fails
- if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
- TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock();
- int stageId = stageMetadata.getStageId();
- LOGGER.error("Error executing pipeline breaker for request: {}, stage:
{}, sending error block: {}", requestId,
- stageId, errorBlock.getExceptions());
- MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
- List<RoutingInfo> routingInfos = new ArrayList<>();
- for (Integer receiverStageId : rootNode.getReceiverStageIds()) {
- List<MailboxInfo> receiverMailboxInfos =
-
workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
- List<RoutingInfo> stageRoutingInfos =
- MailboxIdUtils.toRoutingInfos(requestId, stageId,
workerMetadata.getWorkerId(), receiverStageId,
- receiverMailboxInfos);
- routingInfos.addAll(stageRoutingInfos);
- }
- for (RoutingInfo routingInfo : routingInfos) {
- try {
- StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
- _mailboxService.getSendingMailbox(routingInfo.getHostname(),
routingInfo.getPort(),
- routingInfo.getMailboxId(), deadlineMs,
statMap).send(errorBlock);
- } catch (TimeoutException e) {
- LOGGER.warn("Timed out sending error block to mailbox: {} for
request: {}, stage: {}",
- routingInfo.getMailboxId(), requestId, stageId, e);
- } catch (Exception e) {
- LOGGER.error("Caught exception sending error block to mailbox: {}
for request: {}, stage: {}",
- routingInfo.getMailboxId(), requestId, stageId, e);
+ opChainMetadata, requestId, deadlineMs, parentContext,
_sendStats.getAsBoolean()),
+ _executorService);
+
+ pipelineBreakerResultFuture.thenAcceptAsync(pipelineBreakerResult -> {
Review Comment:
No, we don't need that. I don't know why I decided to refactor it that way.
Performance impact shouldn't be important, but I've changed the code to
resemble the previous one by creating a blocking internal method with the same
code we had before
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]