Jackie-Jiang commented on code in PR #10681:
URL: https://github.com/apache/pinot/pull/10681#discussion_r1177071005


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -40,86 +36,67 @@
 /**
  * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is 
created on the first call to {@link #send}.
  */
-public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+public class GrpcSendingMailbox implements SendingMailbox {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
 
-  private final String _mailboxId;
-  private final Function<Long, StreamObserver<MailboxContent>> 
_mailboxContentStreamObserverSupplier;
-  private final MailboxStatusStreamObserver _statusObserver;
+  private final String _id;
+  private final ChannelManager _channelManager;
+  private final String _hostname;
+  private final int _port;
   private final long _deadlineMs;
-  private final AtomicBoolean _initialized = new AtomicBoolean(false);
+  private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
 
-  private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
-  private TransferableBlock _errorBlock;
+  private StreamObserver<MailboxContent> _contentObserver;
 
-  public GrpcSendingMailbox(MailboxIdentifier mailboxId, 
MailboxStatusStreamObserver statusObserver,
-      Function<Long, StreamObserver<MailboxContent>> 
contentStreamObserverSupplier, long deadlineMs) {
-    _mailboxId = mailboxId.toString();
-    _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
-    _statusObserver = statusObserver;
+  public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs) {
+    _id = id;
+    _channelManager = channelManager;
+    _hostname = hostname;
+    _port = port;
     _deadlineMs = deadlineMs;
-    _errorBlock = null;
   }
 
   @Override
   public void send(TransferableBlock block)
-      throws Exception {
-    if (!_initialized.get()) {
-      open();
+      throws IOException {
+    if (_contentObserver == null) {
+      _contentObserver = getContentObserver();
     }
-    Preconditions.checkState(!_statusObserver.isFinished() || _errorBlock != 
null,
-        "Called send when stream is already closed for mailbox=%s", 
_mailboxId);
-    MailboxContent data = toMailboxContent(block.getDataBlock());
-    _mailboxContentStreamObserver.onNext(data);
+    Preconditions.checkState(!_statusObserver.isFinished(), "Mailbox: %s is 
already closed", _id);
+    _contentObserver.onNext(toMailboxContent(block));
   }
 
   @Override
-  public void complete()
-      throws Exception {
-    // TODO: should wait for _mailboxContentStreamObserver.onNext() finish 
before calling onComplete().
-    _mailboxContentStreamObserver.onCompleted();
-  }
-
-  @Override
-  public boolean isInitialized() {
-    return _initialized.get();
+  public void complete() {
+    _contentObserver.onCompleted();
   }
 
   @Override
   public void cancel(Throwable t) {
-    if (_initialized.get() && !_statusObserver.isFinished()) {
-      LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
+    if (!_statusObserver.isFinished()) {
+      LOGGER.debug("Cancelling mailbox: {}", _id);
+      if (_contentObserver == null) {
+        _contentObserver = getContentObserver();
+      }
       try {
-        RuntimeException e = new RuntimeException("Cancelled by the sender");
-        _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
-        
_mailboxContentStreamObserver.onNext(toMailboxContent(_errorBlock.getDataBlock()));
-        
_mailboxContentStreamObserver.onError(Status.fromThrowable(e).asRuntimeException());
-      } catch (Exception e) {
-        // TODO: We don't necessarily need to log this since this is 
relatively quite likely to happen. Logging this
-        //  anyways as info for now so we can see how frequently this happens.
-        LOGGER.info("Unexpected error issuing onError to 
MailboxContentStreamObserver: {}", e.getMessage());
+        // NOTE: DO NOT use onError() because it will terminate the stream, 
and receiver might not get the callback
+        _contentObserver.onNext(toMailboxContent(
+            TransferableBlockUtils.getErrorTransferableBlock(new 
RuntimeException("Cancelled by sender", t))));
+        _contentObserver.onCompleted();
+      } catch (Exception ignored) {
+        // Exception can be thrown if the stream is already closed, so we 
simply ignore it

Review Comment:
   It is normal for resource cleanup (`cancel()`) to try to send message to a 
closed stream (e.g. `complete()` is called, but the callback from receiver 
hasn't arrived yet). Logging the error could potentially flood the log. We do 
log warnings when the receiver buffer is not properly cleaned up. I can add a 
debug log here so that we can see the exception when debugging an issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to