Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177059420


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -18,59 +18,49 @@
  */
 package org.apache.pinot.query.mailbox;
 
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class InMemorySendingMailbox implements 
SendingMailbox<TransferableBlock> {
-  private final MailboxIdentifier _mailboxId;
-  private final Supplier<InMemoryTransferStream> _transferStreamProvider;
-  private final Consumer<MailboxIdentifier> _gotMailCallback;
+public class InMemorySendingMailbox implements SendingMailbox {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private InMemoryTransferStream _transferStream;
+  private final String _id;
+  private final MailboxService _mailboxService;
+  private final long _deadlineMs;
 
-  public InMemorySendingMailbox(MailboxIdentifier mailboxId, 
Supplier<InMemoryTransferStream> transferStreamProvider,
-      Consumer<MailboxIdentifier> gotMailCallback) {
-    _mailboxId = mailboxId;
-    _transferStreamProvider = transferStreamProvider;
-    _gotMailCallback = gotMailCallback;
-  }
+  private ReceivingMailbox _receivingMailbox;
 
-  @Override
-  public void send(TransferableBlock data)
-      throws Exception {
-    if (!isInitialized()) {
-      initialize();
-    }
-    _transferStream.send(data);
-    _gotMailCallback.accept(_mailboxId);
+  public InMemorySendingMailbox(String id, MailboxService mailboxService, long 
deadlineMs) {
+    _id = id;
+    _mailboxService = mailboxService;
+    _deadlineMs = deadlineMs;
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    _transferStream.complete();
-    _gotMailCallback.accept(_mailboxId);
+  public void send(TransferableBlock block) {
+    if (_receivingMailbox == null) {
+      _receivingMailbox = _mailboxService.getReceivingMailbox(_id);
+    }
+    long timeoutMs = _deadlineMs - System.currentTimeMillis();
+    if (!_receivingMailbox.offer(block, timeoutMs)) {
+      throw new RuntimeException(String.format("Failed to offer block into 
mailbox: %s within: %dms", _id, timeoutMs));

Review Comment:
   IMO, `send()` should throw exception (similar to `GrpcSendingMailbox.send()` 
when receiver already completed/errored). After it throws exception, caller 
will call `cancel()` to release the resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to