walterddr commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177074193


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java:
##########
@@ -18,55 +18,118 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is 
with the MailboxService, which is unlike
- * the {@link SendingMailbox} whose ownership lies with the {@link 
MailboxSendOperator}. This is because the
- * ReceivingMailbox can be initialized even before the corresponding OpChain 
is registered on the receiver, whereas
- * the SendingMailbox is initialized when the MailboxSendOperator is running. 
Also see {@link #isInitialized()}.
- *
- * @param <T> the unit of data that each {@link #receive()} call returns.
+ * the {@link SendingMailbox} whose ownership lies with the send operator. 
This is because the ReceivingMailbox can be
+ * initialized even before the corresponding OpChain is registered on the 
receiver, whereas the SendingMailbox is
+ * initialized when the send operator is running.
  */
-public interface ReceivingMailbox<T> {
+public class ReceivingMailbox {
+  public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReceivingMailbox.class);
+  private static final TransferableBlock CANCELLED_ERROR_BLOCK =
+      TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException("Cancelled by receiver"));
+
+  private final String _id;
+  private final Consumer<String> _receiveMailCallback;
+  // TODO: Make the queue size configurable
+  // TODO: Revisit if this is the correct way to apply back pressure
+  private final BlockingQueue<TransferableBlock> _blocks = new 
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
+  private final AtomicReference<TransferableBlock> _errorBlock = new 
AtomicReference<>();
+
+  public ReceivingMailbox(String id, Consumer<String> receiveMailCallback) {
+    _id = id;
+    _receiveMailCallback = receiveMailCallback;
+  }
 
-  MailboxIdentifier getId();
+  public String getId() {
+    return _id;
+  }
 
   /**
-   * Returns a unit of data. Implementations are allowed to return null, in 
which case {@link MailboxReceiveOperator}
-   * will assume that this mailbox doesn't have any data to return and it will 
instead poll the other mailbox (if any).
+   * Offers a non-error block into the mailbox within the timeout specified, 
returns whether the block is successfully
+   * added. If the block is not added, an error block is added to the mailbox.
    */
-  @Nullable
-  T receive() throws Exception;
+  public boolean offer(TransferableBlock block, long timeoutMs) {
+    if (_errorBlock.get() != null) {
+      LOGGER.debug("Mailbox: {} is already cancelled or errored out, ignoring 
the late block", _id);
+      return false;
+    }
+    if (timeoutMs < 0) {
+      LOGGER.debug("Mailbox: {} is already timed out", _id);
+      setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+          new TimeoutException("Timed out while offering data to mailbox: " + 
_id)));
+      return false;
+    }
+    try {
+      if (_blocks.offer(block, timeoutMs, TimeUnit.MILLISECONDS)) {
+        if (_errorBlock.get() == null) {
+          _receiveMailCallback.accept(_id);
+          return true;
+        } else {
+          LOGGER.debug("Mailbox: {} is already cancelled or errored out, 
ignoring the late block", _id);
+          _blocks.clear();
+          return false;
+        }
+      } else {
+        LOGGER.debug("Failed to offer block into mailbox: {} within: {}ms", 
_id, timeoutMs);
+        setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(
+            new TimeoutException("Timed out while waiting for receive operator 
to consume data from mailbox: " + _id)));
+        return false;
+      }
+    } catch (InterruptedException e) {
+      LOGGER.error("Interrupted while offering block into mailbox: {}", _id);
+      setErrorBlock(TransferableBlockUtils.getErrorTransferableBlock(e));
+      return false;
+    }
+  }
 
   /**
-   * A ReceivingMailbox is considered initialized when it has a reference to 
the underlying channel used for receiving
-   * the data. The underlying channel may be a gRPC stream, in-memory queue, 
etc. Once a receiving mailbox is
-   * initialized, it has the ability to close the underlying channel via the 
{@link #cancel()} method.
+   * Sets an error block into the mailbox. No more blocks are accepted after 
calling this method.
    */
-  boolean isInitialized();
+  public void setErrorBlock(TransferableBlock errorBlock) {
+    if (_errorBlock.compareAndSet(null, errorBlock)) {
+      _blocks.clear();
+      _receiveMailCallback.accept(_id);
+    }
+  }
 
   /**
-   * A ReceivingMailbox is considered closed if it has sent all the data to 
the receiver and doesn't have any more data
-   * to send.
+   * Returns the first block from the mailbox, or {@code null} if there is no 
block received yet. Error block is
+   * returned if exists.
    */
-  boolean isClosed();
+  @Nullable
+  public TransferableBlock poll() {
+    TransferableBlock errorBlock = _errorBlock.get();
+    return errorBlock != null ? errorBlock : _blocks.poll();
+  }
 
   /**
-   * A ReceivingMailbox may hold a reference to the underlying channel. 
Usually the channel would be automatically
-   * closed once all the data has been received by the receiver, and in such 
cases {@link #isClosed()} returns true.
-   * However in failure scenarios the underlying channel may not be released, 
and the receiver can use this method to
-   * ensure the same.
-   *
-   * This API should ensure that the underlying channel is "released" if it 
hasn't been already. If the channel has
-   * already been released, the API shouldn't throw and instead return 
gracefully.
-   *
-   * <p>
-   *   This method may be called multiple times, so implementations should 
ensure this is idempotent.
-   * </p>
+   * Cancels the mailbox. No more blocks are accepted after calling this 
method. Should only be called by the receive
+   * operator to clean up the remaining blocks.
    */
-  void cancel();
+  public void cancel() {
+    LOGGER.debug("Cancelling mailbox: {}", _id);
+    if (_errorBlock.compareAndSet(null, CANCELLED_ERROR_BLOCK)) {
+      _blocks.clear();
+    }
+  }

Review Comment:
   should accept upstream throwable here
   ```suggestion
     public void cancel(Throwable t) {
       LOGGER.debug("Cancelling mailbox: {}", _id, t);
       if (_errorBlock.compareAndSet(null, 
TransferableBlockUtils.getErrorTransferableBlock(t))) {
         _blocks.clear();
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to