This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f7c281f2dc [multistage] [bugfix] Throw error when GrpcMailbox
receiving buffer is full (#9969)
f7c281f2dc is described below
commit f7c281f2dc5d76ed787e0aeebbc6c629c8cec50e
Author: Yao Liu <[email protected]>
AuthorDate: Wed Dec 14 11:15:36 2022 -0800
[multistage] [bugfix] Throw error when GrpcMailbox receiving buffer is full
(#9969)
* error out when receiving buffer full
* fix race condition
---
.../pinot/query/mailbox/GrpcReceivingMailbox.java | 1 -
.../channel/MailboxContentStreamObserver.java | 47 +++++++++++++++++++---
.../channel/MailboxStatusStreamObserver.java | 1 -
3 files changed, 42 insertions(+), 7 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index bb6dce4b76..ce0152f1e6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -94,7 +94,6 @@ public class GrpcReceivingMailbox implements
ReceivingMailbox<TransferableBlock>
return isInitialized() && _contentStreamObserver.isCompleted();
}
- // TODO: fix busy wait. This should be guarded by timeout.
private boolean waitForInitialize()
throws Exception {
if (_initializationLatch.getCount() > 0) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 1b6c8a8130..238c3cf60c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -23,6 +23,8 @@ import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
@@ -43,6 +45,14 @@ import org.slf4j.LoggerFactory;
*/
public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.MailboxContent> {
private static final Logger LOGGER =
LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+
+ private static Mailbox.MailboxContent createErrorContent(Throwable e)
+ throws IOException {
+ return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+ TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException(e)).getDataBlock().toBytes()))
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true").build();
+ }
+
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
private final GrpcMailboxService _mailboxService;
private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
@@ -50,6 +60,9 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
private final ArrayBlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
+
+ ReadWriteLock _errorLock = new ReentrantReadWriteLock();
+ private Mailbox.MailboxContent _errorContent = null; // Guarded by
_errorLock.
private StringMailboxIdentifier _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
@@ -73,6 +86,14 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
* to indicate when to call this method.
*/
public Mailbox.MailboxContent poll() {
+ try {
+ _errorLock.readLock().lock();
+ if (_errorContent != null) {
+ return _errorContent;
+ }
+ } finally {
+ _errorLock.readLock().unlock();
+ }
if (isCompleted()) {
return null;
}
@@ -93,7 +114,22 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
if
(!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY))
{
// when the receiving end receives a message put it in the mailbox queue.
- _receivingBuffer.offer(mailboxContent);
+ // TODO: pass a timeout to _receivingBuffer.
+ if (!_receivingBuffer.offer(mailboxContent)) {
+ // TODO: close the stream.
+ RuntimeException e = new RuntimeException("Mailbox receivingBuffer is
full:" + _mailboxId);
+ LOGGER.error(e.getMessage());
+ try {
+ _errorLock.writeLock().lock();
+ _errorContent = createErrorContent(e);
+ } catch (IOException ioe) {
+ e = new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
+ LOGGER.error(e.getMessage());
+ throw e;
+ } finally {
+ _errorLock.writeLock().unlock();
+ }
+ }
_gotMailCallback.accept(_mailboxId);
if (_isEnabledFeedback) {
@@ -116,14 +152,15 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
@Override
public void onError(Throwable e) {
try {
- _receivingBuffer.offer(Mailbox.MailboxContent.newBuilder()
- .setPayload(ByteString.copyFrom(
- TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException(e)).getDataBlock().toBytes()))
- .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true").build());
+ _errorLock.writeLock().lock();
+ _errorContent = createErrorContent(e);
_gotMailCallback.accept(_mailboxId);
+ // TODO: close the stream.
throw new RuntimeException(e);
} catch (IOException ioe) {
throw new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
+ } finally {
+ _errorLock.writeLock().unlock();
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index e482583bd3..b758f0793d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
public class MailboxStatusStreamObserver implements
StreamObserver<Mailbox.MailboxStatus> {
private static final Logger LOGGER =
LoggerFactory.getLogger(MailboxStatusStreamObserver.class);
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
- private static final long DEFAULT_MAILBOX_POLL_TIMEOUT_MS = 1000L;
private final AtomicInteger _bufferSize = new AtomicInteger(5);
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]