siddharthteotia commented on code in PR #9064:
URL: https://github.com/apache/pinot/pull/9064#discussion_r927808971
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -101,25 +107,72 @@ public void processQuery(DistributedStagePlan
distributedStagePlan, ExecutorServ
BaseDataBlock dataBlock;
try {
DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest,
executorService, null);
- // this works because default DataTableImplV3 will have a version
number at beginning,
- // which maps to ROW type of version 3.
- dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ if (!dataTable.getExceptions().isEmpty()) {
+ // if contains exception, directly return a metadata block with the
exceptions.
+ dataBlock =
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+ } else {
+ // this works because default DataTableImplV3 will have a version
number at beginning:
+ // the new DataBlock encodes lower 16 bites as version and upper 16
bites as type (ROW, COLUMNAR, METADATA)
+ dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ }
} catch (IOException e) {
- throw new RuntimeException("Unable to convert byte buffer", e);
+ dataBlock = DataBlockUtils.getErrorDataBlock(e);
}
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata =
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(_mailboxService, dataBlock,
receivingStageMetadata.getServerInstances(),
- sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_hostname, _port,
- serverQueryRequest.getRequestId(), sendNode.getStageId());
+ new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
+ new LeafStageTransferableBlockOperator(dataBlock,
sendNode.getDataSchema()),
+ receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(),
+ sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequest.getRequestId(),
+ sendNode.getStageId());
mailboxSendOperator.nextBlock();
+ if (dataBlock.getExceptions().isEmpty()) {
+ mailboxSendOperator.nextBlock();
+ }
} else {
_workerExecutor.processQuery(distributedStagePlan, requestMetadataMap,
executorService);
}
}
+ private static class LeafStageTransferableBlockOperator extends
BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "FAKE_TRANSFER_OPERATOR";
Review Comment:
Why do we need this ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]