This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 529432c7d7 [multistage] Add Callbacks for Complete Events (#10564)
529432c7d7 is described below
commit 529432c7d7b26cb9690611196210f187def473a3
Author: Ankit Sultana <[email protected]>
AuthorDate: Sat Apr 22 02:27:13 2023 +0530
[multistage] Add Callbacks for Complete Events (#10564)
* [multistage] Add Callbacks for Complete Events
* Add callback for cancel
* Add tests
---
.../pinot/query/mailbox/InMemorySendingMailbox.java | 2 ++
.../mailbox/channel/MailboxContentStreamObserver.java | 1 +
.../pinot/query/mailbox/GrpcMailboxServiceTest.java | 8 +++++++-
.../pinot/query/mailbox/InMemoryMailboxServiceTest.java | 15 +++++++++++++--
4 files changed, 23 insertions(+), 3 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 808011e2a2..18cdd5db36 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -52,6 +52,7 @@ public class InMemorySendingMailbox implements
SendingMailbox<TransferableBlock>
public void complete()
throws Exception {
_transferStream.complete();
+ _gotMailCallback.accept(_mailboxId);
}
@Override
@@ -64,6 +65,7 @@ public class InMemorySendingMailbox implements
SendingMailbox<TransferableBlock>
if (isInitialized() && !_transferStream.isCancelled()) {
_transferStream.cancel();
}
+ _gotMailCallback.accept(_mailboxId);
}
private void initialize() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index c8375d4eed..622dfe5742 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -148,6 +148,7 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
@Override
public void onCompleted() {
_isCompleted.set(true);
+ _gotMailCallback.accept(_mailboxId);
_responseObserver.onCompleted();
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 7d08439601..6f2802fa48 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -86,7 +86,11 @@ public class GrpcMailboxServiceTest {
SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId, deadlineMs);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
- _mail2GotData.set(ignored -> gotData.countDown());
+ CountDownLatch timesCallbackCalled = new CountDownLatch(2);
+ _mail2GotData.set(ignored -> {
+ gotData.countDown();
+ timesCallbackCalled.countDown();
+ });
// When:
TransferableBlock testBlock = getTestTransferableBlock();
@@ -98,6 +102,8 @@ public class GrpcMailboxServiceTest {
Assert.assertEquals(receivedBlock.getDataBlock().toBytes(),
testBlock.getDataBlock().toBytes());
sendingMailbox.complete();
+ Assert.assertTrue(timesCallbackCalled.await(1, TimeUnit.SECONDS));
+
TestUtils.waitForCondition(aVoid -> {
return receivingMailbox.isClosed();
}, 5000L, "Receiving mailbox is not closed properly!");
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 78edcec092..386716332c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -49,7 +50,10 @@ public class InMemoryMailboxServiceTest {
public void testHappyPath()
throws Exception {
long deadlineMs = System.currentTimeMillis() + 10_000;
- InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
+ AtomicInteger timesCallbackCalled = new AtomicInteger(0);
+ InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> {
+ timesCallbackCalled.incrementAndGet();
+ });
InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox)
mailboxService.getReceivingMailbox(
MAILBOX_ID);
InMemorySendingMailbox sendingMailbox =
@@ -60,6 +64,8 @@ public class InMemoryMailboxServiceTest {
sendingMailbox.send(getTestTransferableBlock(i, i + 1 == NUM_ENTRIES));
}
sendingMailbox.complete();
+ // The callback should be called for each send and complete call
+ Assert.assertEquals(NUM_ENTRIES + 1, timesCallbackCalled.get());
// Iterate 1 less time than the loop above
for (int i = 0; i + 1 < NUM_ENTRIES; i++) {
@@ -141,7 +147,10 @@ public class InMemoryMailboxServiceTest {
public void testInMemoryStreamCancellationBySender()
throws Exception {
long deadlineMs = System.currentTimeMillis() + 10_000;
- InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
+ AtomicInteger timesCallbackCalled = new AtomicInteger(0);
+ InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> {
+ timesCallbackCalled.incrementAndGet();
+ });
SendingMailbox<TransferableBlock> sendingMailbox =
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
ReceivingMailbox<TransferableBlock> receivingMailbox =
mailboxService.getReceivingMailbox(MAILBOX_ID);
@@ -154,6 +163,8 @@ public class InMemoryMailboxServiceTest {
sendingMailbox.cancel(new RuntimeException("foo"));
+ // If the sender cancels the stream, the receiver should get a callback
+ Assert.assertEquals(2, timesCallbackCalled.get());
// After the stream is cancelled, receiver will get error-blocks
receivedBlock = receivingMailbox.receive();
Assert.assertNotNull(receivedBlock);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]