Jackie-Jiang commented on code in PR #11746:
URL: https://github.com/apache/pinot/pull/11746#discussion_r1355940162
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -49,6 +50,8 @@ public class ReceivingMailbox {
// TODO: Revisit if this is the correct way to apply back pressure
private final BlockingQueue<TransferableBlock> _blocks = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
private final AtomicReference<TransferableBlock> _errorBlock = new
AtomicReference<>();
+ private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean(false);
Review Comment:
(minor) This can be defined as a volatile boolean
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusObserver.java:
##########
@@ -34,12 +34,20 @@ public class MailboxStatusObserver implements
StreamObserver<MailboxStatus> {
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
private final AtomicInteger _bufferSize = new
AtomicInteger(DEFAULT_MAILBOX_QUEUE_CAPACITY);
+ private final AtomicBoolean _isEarlyTerminated = new AtomicBoolean();
Review Comment:
(minor) This can be defined as a volatile boolean
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -118,19 +118,24 @@ public String toExplainString() {
protected TransferableBlock getNextBlock() {
try {
TransferableBlock block = _sourceOperator.nextBlock();
+ boolean isEarlyTerminated;
if (block.isSuccessfulEndOfStreamBlock()) {
// Stats need to be populated here because the block is being sent to
the mailbox
// and the receiving opChain will not be able to access the stats from
the previous opChain
TransferableBlock eosBlockWithStats =
TransferableBlockUtils.getEndOfStreamTransferableBlock(
OperatorUtils.getMetadataFromOperatorStats(_opChainStats.getOperatorStatsMap()));
sendTransferableBlock(eosBlockWithStats);
+ // when sending an EOS block already, early termination flag is
ignored even if receiver has requested it.
+ isEarlyTerminated = false;
} else {
- sendTransferableBlock(block);
+ isEarlyTerminated = sendTransferableBlock(block);
Review Comment:
Can be simplified (remove `isEarlyTerminated`)
```suggestion
if (sendTransferableBlock(block)) {
earlyTerminate();
}
```
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java:
##########
@@ -71,6 +71,16 @@ public void cancel(Throwable t) {
cancelRemainingMailboxes();
}
+ public void earlyTerminate() {
+ earlyTerminateMailboxes();
+ }
+
+ protected void earlyTerminateMailboxes() {
Review Comment:
Any specific reason why making this a separate method?
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -70,11 +74,12 @@ public void send(TransferableBlock block)
@Override
public void complete() {
+ _isTerminated = true;
}
@Override
public void cancel(Throwable t) {
- if (_isTerminated) {
+ if (_isEarlyTerminated || _isTerminated) {
Review Comment:
We should not change this. Cancel should be applied even if early terminate
is called
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]