gortiz commented on code in PR #15245:
URL: https://github.com/apache/pinot/pull/15245#discussion_r2052037174
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java:
##########
@@ -610,18 +604,32 @@ public void testRemoteEarlyTerminated()
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
// receiving-side early terminates after pulling the first block
TestUtils.waitForCondition(aVoid -> {
- TransferableBlock block = receivingMailbox.poll();
- return block != null && block.getNumRows() == 1;
+ MseBlock block = readBlock(receivingMailbox);
+ return block != null && block.isData() && ((MseBlock.Data)
block).getNumRows() == 1;
Review Comment:
Nice catch
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MailboxServiceTest.java:
##########
@@ -610,18 +604,32 @@ public void testRemoteEarlyTerminated()
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
// receiving-side early terminates after pulling the first block
TestUtils.waitForCondition(aVoid -> {
- TransferableBlock block = receivingMailbox.poll();
- return block != null && block.getNumRows() == 1;
+ MseBlock block = readBlock(receivingMailbox);
+ return block != null && block.isData() && ((MseBlock.Data)
block).getNumRows() == 1;
}, 1000L, "Failed to deliver mails");
receivingMailbox.earlyTerminate();
// send another block b/c it doesn't guarantee the next block must be EOS
sendingMailbox.send(OperatorTestUtil.block(DATA_SCHEMA, new Object[]{0}));
// send a metadata block
-
sendingMailbox.send(TransferableBlockTestUtils.getEndOfStreamTransferableBlock(SENDER_STAGE_ID));
+ sendingMailbox.send(SuccessMseBlock.INSTANCE,
MultiStageQueryStats.emptyStats(SENDER_STAGE_ID).serialize());
sendingMailbox.complete();
// sending side should early terminate
TestUtils.waitForCondition(aVoid -> sendingMailbox.isEarlyTerminated(),
1000L, "Failed to early-terminate sender");
}
+
+ private static List<Object[]> getRows(ReceivingMailbox receivingMailbox) {
+ ReceivingMailbox.MseBlockWithStats block = receivingMailbox.poll();
+ assertNotNull(block);
+ assertTrue(block.getBlock().isData());
+ List<Object[]> rows = ((MseBlock.Data) block).asRowHeap().getRows();
+ return rows;
+ }
+
+ public static MseBlock readBlock(ReceivingMailbox receivingMailbox) {
+ ReceivingMailbox.MseBlockWithStats block = receivingMailbox.poll();
+ assertNotNull(block);
+ return block.getBlock();
Review Comment:
Nice catch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]