walterddr commented on code in PR #9064:
URL: https://github.com/apache/pinot/pull/9064#discussion_r927823286
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -101,25 +107,72 @@ public void processQuery(DistributedStagePlan
distributedStagePlan, ExecutorServ
BaseDataBlock dataBlock;
try {
DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest,
executorService, null);
- // this works because default DataTableImplV3 will have a version
number at beginning,
- // which maps to ROW type of version 3.
- dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ if (!dataTable.getExceptions().isEmpty()) {
+ // if contains exception, directly return a metadata block with the
exceptions.
+ dataBlock =
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+ } else {
+ // this works because default DataTableImplV3 will have a version
number at beginning:
+ // the new DataBlock encodes lower 16 bites as version and upper 16
bites as type (ROW, COLUMNAR, METADATA)
+ dataBlock =
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+ }
} catch (IOException e) {
- throw new RuntimeException("Unable to convert byte buffer", e);
+ dataBlock = DataBlockUtils.getErrorDataBlock(e);
}
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata =
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
MailboxSendOperator mailboxSendOperator =
- new MailboxSendOperator(_mailboxService, dataBlock,
receivingStageMetadata.getServerInstances(),
- sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_hostname, _port,
- serverQueryRequest.getRequestId(), sendNode.getStageId());
+ new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
+ new LeafStageTransferableBlockOperator(dataBlock,
sendNode.getDataSchema()),
+ receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(),
+ sendNode.getPartitionKeySelector(), _hostname, _port,
serverQueryRequest.getRequestId(),
+ sendNode.getStageId());
mailboxSendOperator.nextBlock();
+ if (dataBlock.getExceptions().isEmpty()) {
+ mailboxSendOperator.nextBlock();
+ }
} else {
_workerExecutor.processQuery(distributedStagePlan, requestMetadataMap,
executorService);
}
}
+ private static class LeafStageTransferableBlockOperator extends
BaseOperator<TransferableBlock> {
+ private static final String EXPLAIN_NAME = "FAKE_TRANSFER_OPERATOR";
Review Comment:
because leaf stage only returns one DataTable. we need to create the logic
for `nextBlock()`
- if the datatable is not empty and contains no error, then send in a
datablock, the next time `getBlock()` is called. it should send the metadata of
the datablock
- if the datatable contains error, the send the error as metadata block and
nothing should be return if `getBlock()` is called again
previously this was implemented in mailbox and it was a bit tedious to
maintain both side so I factor it out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]