This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5b289fc2e5 [multistage] Fix Leaks in Mailbox (#10322)
5b289fc2e5 is described below
commit 5b289fc2e5acb14388f84cb60b7c5d2d71f5e6cf
Author: Ankit Sultana <[email protected]>
AuthorDate: Fri Mar 10 23:31:25 2023 +0530
[multistage] Fix Leaks in Mailbox (#10322)
- Mailbox ownership model:
*
----------------------------------------------------------------------------
* (Operator Layer)
* MailboxSendOperator ---------> MailboxReceiveOperator
* | |
* | |
*
------------------|----------------------------------|----------------------
* | |
* (MailboxService) | | ( WAIT ON INIT )
* \_/ \_/
* SendingMailbox ReceivingMailbox
*
------------------|---------------------------------/^\---------------------
* (Physical Layer) | |
* (e.g. GRPC) | | ( INITIALIZE )
* | |
* \_/ |
* StreamObserver -------------------> StreamObserver
*
----------------------------------------------------------------------------
- Work items done in this PR
* clean up MailboxService API to sanitize cancel/release.
* completely move SendMailbox management out of MailboxService
* make sending mailbox lazy-open GRPC observers
* add queue offer timeout to avoid infinite OOM blow up
* handle close/cancel on leaf-stage operator + minor log change
* enhanced in-mem transfer stream
---
.../pinot/query/mailbox/GrpcMailboxService.java | 112 +++++++----
.../pinot/query/mailbox/GrpcReceivingMailbox.java | 39 +++-
.../pinot/query/mailbox/GrpcSendingMailbox.java | 73 ++++---
.../query/mailbox/InMemoryMailboxService.java | 65 ++++--
.../query/mailbox/InMemoryReceivingMailbox.java | 34 ++--
.../query/mailbox/InMemorySendingMailbox.java | 49 +++--
.../apache/pinot/query/mailbox/MailboxService.java | 44 +++-
.../query/mailbox/MultiplexingMailboxService.java | 15 +-
.../pinot/query/mailbox/ReceivingMailbox.java | 50 +++--
.../apache/pinot/query/mailbox/SendingMailbox.java | 42 ++--
.../mailbox/channel/InMemoryTransferStream.java | 101 ++++++++++
.../channel/MailboxContentStreamObserver.java | 162 ++++++---------
.../channel/MailboxStatusStreamObserver.java | 12 +-
.../apache/pinot/query/runtime/QueryRunner.java | 65 +++---
.../runtime/executor/OpChainSchedulerService.java | 9 +-
.../runtime/operator/MailboxReceiveOperator.java | 16 ++
.../runtime/operator/MailboxSendOperator.java | 27 ++-
.../runtime/operator/exchange/BlockExchange.java | 31 ++-
.../operator/exchange/BroadcastExchange.java | 3 +-
.../runtime/operator/exchange/HashExchange.java | 3 +-
.../runtime/operator/exchange/RandomExchange.java | 3 +-
.../operator/exchange/SingletonExchange.java | 3 +-
.../query/runtime/plan/PhysicalPlanVisitor.java | 3 +-
.../query/runtime/plan/PlanRequestContext.java | 9 +-
.../runtime/plan/ServerRequestPlanVisitor.java | 4 +-
.../plan/server/ServerPlanRequestContext.java | 6 +-
.../query/mailbox/GrpcMailboxServiceTest.java | 224 +++++++++++++++++++--
.../query/mailbox/InMemoryMailboxServiceTest.java | 121 ++++++++++-
.../mailbox/MultiplexingMailboxServiceTest.java | 8 +-
.../executor/OpChainSchedulerServiceTest.java | 4 +-
.../runtime/operator/MailboxSendOperatorTest.java | 33 ++-
.../operator/exchange/BlockExchangeTest.java | 12 +-
.../operator/exchange/BroadcastExchangeTest.java | 3 +-
.../operator/exchange/HashExchangeTest.java | 3 +-
.../operator/exchange/RandomExchangeTest.java | 3 +-
.../operator/exchange/SingletonExchangeTest.java | 3 +-
36 files changed, 1020 insertions(+), 374 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index 37fd81b3d2..744c681752 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -18,45 +18,61 @@
*/
package org.apache.pinot.query.mailbox;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be
cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
*
- * <p>It maintains a collection of connected mailbox servers and clients to
remote hosts. All indexed by the
- * mailboxID in the format of:
<code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ * <li>When the OpChain corresponding to the receiver was never
registered.</li>
+ * <li>When the receiving OpChain exited before data was sent for the first
time by the sender.</li>
+ * </ol>
*
- * <p>Connections are established/initiated from the sender side and only
tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on
a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- * <li>the latter part of the mailboxID consist of the channelID.</li>
- * <li>the job_id should be uniquely identifying a send/receving pair, for
example if one bundle job requires
- * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to
distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a
time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel
to ensure the underlying stream is also
+ * released.
*/
public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcMailboxService.class);
// channel manager
+ private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY =
Duration.ofMinutes(5);
private final ChannelManager _channelManager;
private final String _hostname;
private final int _mailboxPort;
- // maintaining a list of registered mailboxes.
- private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>>
_receivingMailboxMap =
- new ConcurrentHashMap<>();
+ // We use a cache to ensure that the receiving mailbox and the underlying
gRPC stream are not leaked in the cases
+ // where the corresponding OpChain is either never registered or died before
the sender sent data for the first time.
+ private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<String, GrpcReceivingMailbox>()
{
+ @Override
+ public void onRemoval(RemovalNotification<String,
GrpcReceivingMailbox> notification) {
+ if (notification.wasEvicted()) {
+ // TODO: This should be tied with query deadline, but for that
we need to know the query deadline
+ // when the GrpcReceivingMailbox is initialized in
MailboxContentStreamObserver.
+ LOGGER.warn("Removing dangling GrpcReceivingMailbox: {}",
notification.getKey());
+ notification.getValue().cancel();
+ }
+ }
+ })
+ .build();
private final Consumer<MailboxIdentifier> _gotMailCallback;
public GrpcMailboxService(String hostname, int mailboxPort,
PinotConfiguration extraConfig,
@@ -88,29 +104,57 @@ public class GrpcMailboxService implements
MailboxService<TransferableBlock> {
}
/**
- * Register a mailbox, mailbox needs to be registered before use.
- * @param mailboxId the id of the mailbox.
+ * {@inheritDoc}
*/
- public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier
mailboxId) {
- ManagedChannel channel = getChannel(mailboxId.toString());
- PinotMailboxGrpc.PinotMailboxStub stub = PinotMailboxGrpc.newStub(channel);
- CountDownLatch latch = new CountDownLatch(1);
- StreamObserver<Mailbox.MailboxContent> mailboxContentStreamObserver =
- stub.open(new MailboxStatusStreamObserver(latch));
- GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(),
mailboxContentStreamObserver, latch);
+ @Override
+ public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier
mailboxId, long deadlineMs) {
+ MailboxStatusStreamObserver statusStreamObserver = new
MailboxStatusStreamObserver();
+
+ GrpcSendingMailbox mailbox = new GrpcSendingMailbox(mailboxId.toString(),
statusStreamObserver, (deadline) -> {
+ ManagedChannel channel = getChannel(mailboxId.toString());
+ PinotMailboxGrpc.PinotMailboxStub stub =
+ PinotMailboxGrpc.newStub(channel)
+ .withDeadlineAfter(Math.max(0L, deadline -
System.currentTimeMillis()), TimeUnit.MILLISECONDS);
+ return stub.open(statusStreamObserver);
+ }, deadlineMs);
return mailbox;
}
/**
- * Register a mailbox, mailbox needs to be registered before use.
- * @param mailboxId the id of the mailbox.
+ * {@inheritDoc} See {@link GrpcMailboxService} for details on the design.
*/
+ @Override
public ReceivingMailbox<TransferableBlock>
getReceivingMailbox(MailboxIdentifier mailboxId) {
- return _receivingMailboxMap.computeIfAbsent(mailboxId.toString(),
- (mId) -> new GrpcReceivingMailbox(mId, _gotMailCallback));
+ try {
+ return _receivingMailboxCache.get(mailboxId.toString(),
+ () -> new GrpcReceivingMailbox(mailboxId.toString(),
_gotMailCallback));
+ } catch (ExecutionException e) {
+ LOGGER.error(String.format("Error getting receiving mailbox: %s",
mailboxId), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * If there's a cached receiving mailbox and it isn't closed (i.e. query
didn't finish successfully), then this
+ * calls a cancel to ensure that the underlying gRPC stream is closed. After
that the receiving mailbox is removed
+ * from the cache.
+ * <p>
+ * Also refer to the definition in the interface:
+ * </p>
+ * <p>
+ * {@inheritDoc}
+ * </p>
+ */
+ @Override
+ public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
+ GrpcReceivingMailbox receivingMailbox =
_receivingMailboxCache.getIfPresent(mailboxId.toString());
+ if (receivingMailbox != null && !receivingMailbox.isClosed()) {
+ receivingMailbox.cancel();
+ }
+ _receivingMailboxCache.invalidate(mailboxId.toString());
}
- public ManagedChannel getChannel(String mailboxId) {
+ private ManagedChannel getChannel(String mailboxId) {
return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
index 464c092e87..f6dde0c823 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java
@@ -18,34 +18,41 @@
*/
package org.apache.pinot.query.mailbox;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * GRPC implementation of the {@link ReceivingMailbox}.
+ * GRPC implementation of the {@link ReceivingMailbox}. This mailbox doesn't
hold any resources upon creation.
+ * Instead an explicit {@link #init} call is made when the sender sends the
first data-block which attaches
+ * references to the {@link StreamObserver} to this mailbox.
*/
public class GrpcReceivingMailbox implements
ReceivingMailbox<TransferableBlock> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcReceivingMailbox.class);
private static final long DEFAULT_MAILBOX_INIT_TIMEOUT = 100L;
private final String _mailboxId;
- private Consumer<MailboxIdentifier> _gotMailCallback;
+ private final Consumer<MailboxIdentifier> _gotMailCallback;
private final CountDownLatch _initializationLatch;
- private final AtomicInteger _totalMsgReceived = new AtomicInteger(0);
private MailboxContentStreamObserver _contentStreamObserver;
+ private StreamObserver<Mailbox.MailboxStatus> _statusStreamObserver;
public GrpcReceivingMailbox(String mailboxId, Consumer<MailboxIdentifier>
gotMailCallback) {
_mailboxId = mailboxId;
@@ -53,9 +60,11 @@ public class GrpcReceivingMailbox implements
ReceivingMailbox<TransferableBlock>
_initializationLatch = new CountDownLatch(1);
}
- public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver
streamObserver) {
+ public Consumer<MailboxIdentifier> init(MailboxContentStreamObserver
streamObserver,
+ StreamObserver<Mailbox.MailboxStatus> statusStreamObserver) {
if (_initializationLatch.getCount() > 0) {
_contentStreamObserver = streamObserver;
+ _statusStreamObserver = statusStreamObserver;
_initializationLatch.countDown();
}
return _gotMailCallback;
@@ -70,29 +79,37 @@ public class GrpcReceivingMailbox implements
ReceivingMailbox<TransferableBlock>
* 2. If the received block from the sender is a data-block with 0 rows.
* </p>
*/
+ @Nullable
@Override
- public TransferableBlock receive()
- throws Exception {
+ public TransferableBlock receive() throws Exception {
if (!waitForInitialize()) {
return null;
}
MailboxContent mailboxContent = _contentStreamObserver.poll();
- _totalMsgReceived.incrementAndGet();
return mailboxContent == null ? null : fromMailboxContent(mailboxContent);
}
@Override
public boolean isInitialized() {
- return _initializationLatch.getCount() <= 0;
+ return _initializationLatch.getCount() == 0;
}
@Override
public boolean isClosed() {
- return isInitialized() && _contentStreamObserver.isCompleted();
+ return isInitialized() && _contentStreamObserver.hasConsumedAllData();
}
@Override
- public void cancel(Throwable e) {
+ public void cancel() {
+ if (isInitialized()) {
+ try {
+ _statusStreamObserver.onError(Status.CANCELLED.asRuntimeException());
+ } catch (Exception e) {
+ // TODO: This can happen if the call is already closed. Consider
removing this log altogether or find a way
+ // to check if the stream is already closed.
+ LOGGER.info("Tried to cancel receiving mailbox", e);
+ }
+ }
}
private boolean waitForInitialize()
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index d2f4de89c2..3fa6d6c229 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -18,79 +18,94 @@
*/
package org.apache.pinot.query.mailbox;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
+import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * GRPC implementation of the {@link SendingMailbox}.
+ * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is
created on the first call to {@link #send}.
*/
public class GrpcSendingMailbox implements SendingMailbox<TransferableBlock> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcSendingMailbox.class);
private final String _mailboxId;
private final AtomicBoolean _initialized = new AtomicBoolean(false);
- private final AtomicInteger _totalMsgSent = new AtomicInteger(0);
- private final CountDownLatch _finishLatch;
- private final StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+ private StreamObserver<MailboxContent> _mailboxContentStreamObserver;
+ private final Function<Long, StreamObserver<MailboxContent>>
_mailboxContentStreamObserverSupplier;
+ private final MailboxStatusStreamObserver _statusObserver;
+ private final long _deadlineMs;
- public GrpcSendingMailbox(String mailboxId, StreamObserver<MailboxContent>
mailboxContentStreamObserver,
- CountDownLatch latch) {
+ public GrpcSendingMailbox(String mailboxId, MailboxStatusStreamObserver
statusObserver,
+ Function<Long, StreamObserver<MailboxContent>>
contentStreamObserverSupplier, long deadlineMs) {
_mailboxId = mailboxId;
- _mailboxContentStreamObserver = mailboxContentStreamObserver;
- _finishLatch = latch;
- _initialized.set(false);
+ _mailboxContentStreamObserverSupplier = contentStreamObserverSupplier;
+ _statusObserver = statusObserver;
+ _deadlineMs = deadlineMs;
}
@Override
public void send(TransferableBlock block)
- throws UnsupportedOperationException {
+ throws Exception {
if (!_initialized.get()) {
- // initialization is special
open();
}
+ Preconditions.checkState(!_statusObserver.isFinished(),
+ "Called send when stream is already closed for mailbox=" + _mailboxId);
MailboxContent data = toMailboxContent(block.getDataBlock());
_mailboxContentStreamObserver.onNext(data);
- _totalMsgSent.incrementAndGet();
}
@Override
- public void complete() {
+ public void complete()
+ throws Exception {
_mailboxContentStreamObserver.onCompleted();
}
@Override
- public void open() {
- // TODO: Get rid of init call.
- // send a begin-of-stream message.
-
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
- .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY,
"true").build());
- _initialized.set(true);
+ public boolean isInitialized() {
+ return _initialized.get();
}
@Override
- public String getMailboxId() {
- return _mailboxId;
+ public void cancel(Throwable t) {
+ if (_initialized.get() && !_statusObserver.isFinished()) {
+ LOGGER.warn("GrpcSendingMailbox={} cancelling stream", _mailboxId);
+ try {
+ _mailboxContentStreamObserver.onError(Status.fromThrowable(
+ new RuntimeException("Cancelled by the
sender")).asRuntimeException());
+ } catch (Exception e) {
+ // TODO: We don't necessarily need to log this since this is
relatively quite likely to happen. Logging this
+ // anyways as info for now so we can see how frequently this happens.
+ LOGGER.info("Unexpected error issuing onError to
MailboxContentStreamObserver: {}", e.getMessage());
+ }
+ }
}
@Override
- public void waitForFinish(long timeout, TimeUnit unit)
- throws InterruptedException {
- _finishLatch.await(timeout, unit);
+ public String getMailboxId() {
+ return _mailboxId;
}
- @Override
- public void cancel(Throwable t) {
+ private void open() {
+ _mailboxContentStreamObserver =
_mailboxContentStreamObserverSupplier.apply(_deadlineMs);
+ _initialized.set(true);
+ // send a begin-of-stream message.
+
_mailboxContentStreamObserver.onNext(MailboxContent.newBuilder().setMailboxId(_mailboxId)
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY,
"true").build());
}
private MailboxContent toMailboxContent(DataBlock dataBlock) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
index b5bcb9057b..c15546b25a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java
@@ -19,21 +19,41 @@
package org.apache.pinot.query.mailbox;
import com.google.common.base.Preconditions;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class InMemoryMailboxService implements
MailboxService<TransferableBlock> {
- // channel manager
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryMailboxService.class);
+ private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY =
Duration.ofMinutes(5);
private final String _hostname;
private final int _mailboxPort;
private final Consumer<MailboxIdentifier> _receivedMailContentCallback;
- private final ConcurrentHashMap<String, ReceivingMailbox> _receivingMailbox
= new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, BlockingQueue> _mailboxQueue = new
ConcurrentHashMap<>();
+ private final Cache<String, InMemoryReceivingMailbox> _receivingMailboxCache
=
+
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<String,
InMemoryReceivingMailbox>() {
+ @Override
+ public void onRemoval(RemovalNotification<String,
InMemoryReceivingMailbox> notification) {
+ if (notification.wasEvicted()) {
+ LOGGER.info("Evicting dangling InMemoryReceivingMailbox: {}",
notification.getKey());
+ // TODO: This should be tied to the query deadline. Unlike
GrpcMailboxService, the change here is
+ // simpler.
+ notification.getValue().cancel();
+ }
+ }
+ })
+ .build();
public InMemoryMailboxService(String hostname, int mailboxPort,
Consumer<MailboxIdentifier> receivedMailContentCallback) {
@@ -64,25 +84,32 @@ public class InMemoryMailboxService implements
MailboxService<TransferableBlock>
return _mailboxPort;
}
- public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier
mailboxId) {
+ @Override
+ public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier
mailboxId, long deadlineMs) {
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory
mailbox service for non-local transport");
- String mId = mailboxId.toString();
- // for now, we use an unbounded blocking queue as the means of
communication between
- // in memory mailboxes - the reason for this is that unless we implement
flow control,
- // blocks will sit in memory either way (blocking the sender from sending
doesn't prevent
- // more blocks from being generated from upstream). on the other hand,
having a capacity
- // for the queue causes the sending thread to occupy a task pool thread
and prevents other
- // threads (most importantly, the receiving thread) from running - which
can cause unnecessary
- // failure situations
- // TODO: when we implement flow control, we should swap this out with a
bounded abstraction
return new InMemorySendingMailbox(mailboxId.toString(),
- _mailboxQueue.computeIfAbsent(mId, id -> new LinkedBlockingQueue<>()),
getReceivedMailContentCallback());
+ () -> new InMemoryTransferStream(mailboxId, this, deadlineMs),
+ getReceivedMailContentCallback());
}
+ @Override
public ReceivingMailbox<TransferableBlock>
getReceivingMailbox(MailboxIdentifier mailboxId) {
Preconditions.checkState(mailboxId.isLocal(), "Cannot use in-memory
mailbox service for non-local transport");
String mId = mailboxId.toString();
- BlockingQueue mailboxQueue = _mailboxQueue.computeIfAbsent(mId, id -> new
LinkedBlockingQueue<>());
- return _receivingMailbox.computeIfAbsent(mId, id -> new
InMemoryReceivingMailbox(mId, mailboxQueue));
+ try {
+ return _receivingMailboxCache.get(mId, () -> new
InMemoryReceivingMailbox(mId));
+ } catch (ExecutionException e) {
+ LOGGER.error(String.format("Error getting in-memory receiving
mailbox=%s", mailboxId), e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
+ InMemoryReceivingMailbox receivingMailbox =
_receivingMailboxCache.getIfPresent(mailboxId.toString());
+ if (receivingMailbox != null) {
+ receivingMailbox.cancel();
+ _receivingMailboxCache.invalidate(mailboxId.toString());
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
index 43f32c61c5..57e6cce087 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java
@@ -18,30 +18,30 @@
*/
package org.apache.pinot.query.mailbox;
-import java.util.concurrent.BlockingQueue;
+import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
public class InMemoryReceivingMailbox implements
ReceivingMailbox<TransferableBlock> {
private final String _mailboxId;
- private final BlockingQueue<TransferableBlock> _queue;
- private volatile boolean _closed;
+ private InMemoryTransferStream _transferStream;
+ private volatile boolean _closed = false;
- public InMemoryReceivingMailbox(String mailboxId,
BlockingQueue<TransferableBlock> queue) {
+ public InMemoryReceivingMailbox(String mailboxId) {
_mailboxId = mailboxId;
- _queue = queue;
- _closed = false;
}
- @Override
- public String getMailboxId() {
- return _mailboxId;
+ public void init(InMemoryTransferStream transferStream) {
+ _transferStream = transferStream;
}
@Override
public TransferableBlock receive()
throws Exception {
- TransferableBlock block = _queue.poll();
+ if (_transferStream == null) {
+ return null;
+ }
+ TransferableBlock block = _transferStream.poll();
if (block == null) {
return null;
@@ -56,15 +56,23 @@ public class InMemoryReceivingMailbox implements
ReceivingMailbox<TransferableBl
@Override
public boolean isInitialized() {
- return true;
+ return _transferStream != null;
}
@Override
public boolean isClosed() {
- return _closed && _queue.size() == 0;
+ return _closed;
}
@Override
- public void cancel(Throwable e) {
+ public void cancel() {
+ if (_transferStream != null) {
+ _transferStream.cancel();
+ }
+ }
+
+ @Override
+ public String getMailboxId() {
+ return _mailboxId;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
index 18dcd8ffd3..035ee16f2b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java
@@ -18,56 +18,61 @@
*/
package org.apache.pinot.query.mailbox;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.pinot.query.mailbox.channel.InMemoryTransferStream;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
public class InMemorySendingMailbox implements
SendingMailbox<TransferableBlock> {
private final Consumer<MailboxIdentifier> _gotMailCallback;
- private final String _mailboxId;
+ private final JsonMailboxIdentifier _mailboxId;
- // TODO: changed to 2-way communication channel.
- private BlockingQueue<TransferableBlock> _queue;
+ private Supplier<InMemoryTransferStream> _transferStreamProvider;
+ private InMemoryTransferStream _transferStream;
- public InMemorySendingMailbox(String mailboxId,
BlockingQueue<TransferableBlock> queue,
+ public InMemorySendingMailbox(String mailboxId,
Supplier<InMemoryTransferStream> transferStreamProvider,
Consumer<MailboxIdentifier> gotMailCallback) {
- _mailboxId = mailboxId;
- _queue = queue;
+ _mailboxId = JsonMailboxIdentifier.parse(mailboxId);
+ _transferStreamProvider = transferStreamProvider;
_gotMailCallback = gotMailCallback;
}
- @Override
- public void open() {
- }
-
@Override
public String getMailboxId() {
- return _mailboxId;
+ return _mailboxId.toString();
}
@Override
public void send(TransferableBlock data)
- throws UnsupportedOperationException {
- if (!_queue.offer(data)) {
- // this should never happen, since we use a LinkedBlockingQueue
- // which does not have capacity bounds
- throw new IllegalStateException("Failed to insert into in-memory mailbox
" + _mailboxId);
+ throws Exception {
+ if (!isInitialized()) {
+ initialize();
}
- _gotMailCallback.accept(JsonMailboxIdentifier.parse(_mailboxId));
+ _transferStream.send(data);
+ _gotMailCallback.accept(_mailboxId);
}
@Override
- public void complete() {
+ public void complete() throws Exception {
+ _transferStream.complete();
}
@Override
- public void waitForFinish(long timeout, TimeUnit unit)
- throws InterruptedException {
+ public boolean isInitialized() {
+ return _transferStream != null;
}
@Override
public void cancel(Throwable t) {
+ if (isInitialized() && !_transferStream.isCancelled()) {
+ _transferStream.cancel();
+ }
+ }
+
+ private void initialize() {
+ if (_transferStream == null) {
+ _transferStream = _transferStreamProvider.get();
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 234dd78e98..b4d16b0a85 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.query.mailbox;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.OpChain;
+
+
/**
* Mailbox service that handles transfer for mailbox contents.
*
@@ -31,12 +35,12 @@ public interface MailboxService<T> {
void start();
/**
- * Shutting down the mailbox service.s
+ * Shutting down the mailbox service.
*/
void shutdown();
/**
- * Get the host name on which this mailbox service is runnning on.
+ * Get the host name on which this mailbox service is running on.
*
* @return the host.
*/
@@ -50,10 +54,7 @@ public interface MailboxService<T> {
int getMailboxPort();
/**
- * Look up a receiving mailbox by {@link MailboxIdentifier}.
- *
- * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist
already, but it might not have been
- * initialized.
+ * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
*
* @param mailboxId mailbox identifier.
* @return a receiving mailbox.
@@ -61,10 +62,37 @@ public interface MailboxService<T> {
ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
/**
- * Look up a sending mailbox by {@link MailboxIdentifier}.
+ * Return a sending-mailbox for the given {@link MailboxIdentifier}. The
returned {@link SendingMailbox} is
+ * uninitialized, i.e. it will not open the underlying channel or acquire
any additional resources. Instead the
+ * {@link SendingMailbox} will initialize lazily when the data is sent for
the first time through it.
*
* @param mailboxId mailbox identifier.
+ * @param deadlineMs deadline in milliseconds, which is usually the same as
the query deadline.
* @return a sending mailbox.
*/
- SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+ SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long
deadlineMs);
+
+ /**
+ * A {@link ReceivingMailbox} for a given {@link OpChain} may be created
before the OpChain is even registered.
+ * Reason being that the sender starts sending data, and the receiver starts
receiving the same without waiting for
+ * the OpChain to be registered. The ownership for the ReceivingMailbox
hence lies with the MailboxService and not
+ * the OpChain. There are two ways in which a MailboxService may release its
references to a ReceivingMailbox and
+ * the underlying resources:
+ *
+ * <ol>
+ * <li>
+ * If the OpChain corresponding to a ReceivingMailbox was closed or
cancelled. In that case,
+ * {@link MailboxReceiveOperator} will call this method as part of its
close/cancel call. This is the main
+ * reason why this method exists.
+ * </li>
+ * <li>
+ * There can be cases where the corresponding OpChain was never
registered with the scheduler. In that case, it
+ * is up to the {@link MailboxService} to ensure that there are no leaks
of resources. E.g. it could setup a
+ * periodic job to detect such mailbox and do any clean-up. Note that
for this case, it is not mandatory for
+ * the {@link MailboxService} to use this method. It can use any
internal method it needs to do the clean-up.
+ * </li>
+ * </ol>
+ * @param mailboxId
+ */
+ void releaseReceivingMailbox(MailboxIdentifier mailboxId);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
index e80a65ce71..c4f9c6fdec 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
@@ -72,11 +72,20 @@ public class MultiplexingMailboxService implements
MailboxService<TransferableBl
}
@Override
- public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier
mailboxId) {
+ public SendingMailbox<TransferableBlock> getSendingMailbox(MailboxIdentifier
mailboxId, long deadlineMs) {
if (mailboxId.isLocal()) {
- return _inMemoryMailboxService.getSendingMailbox(mailboxId);
+ return _inMemoryMailboxService.getSendingMailbox(mailboxId, deadlineMs);
}
- return _grpcMailboxService.getSendingMailbox(mailboxId);
+ return _grpcMailboxService.getSendingMailbox(mailboxId, deadlineMs);
+ }
+
+ @Override
+ public void releaseReceivingMailbox(MailboxIdentifier mailboxId) {
+ if (mailboxId.isLocal()) {
+ _inMemoryMailboxService.releaseReceivingMailbox(mailboxId);
+ return;
+ }
+ _grpcMailboxService.releaseReceivingMailbox(mailboxId);
}
public static MultiplexingMailboxService newInstance(String hostname, int
port,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 377430883c..98893aa5e8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -18,42 +18,58 @@
*/
package org.apache.pinot.query.mailbox;
+import javax.annotation.Nullable;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+
+
/**
- * Mailbox is used to send and receive data.
+ * Mailbox that's used to receive data. Ownership of the ReceivingMailbox is
with the MailboxService, which is unlike
+ * the {@link SendingMailbox} whose ownership lies with the {@link
MailboxSendOperator}. This is because the
+ * ReceivingMailbox can be initialized even before the corresponding OpChain
is registered on the receiver, whereas
+ * the SendingMailbox is initialized when the MailboxSendOperator is running.
Also see {@link #isInitialized()}.
*
- * Mailbox should be instantiated on both side of MailboxServer.
- *
- * @param <T> type of data carried over the mailbox.
+ * @param <T> the unit of data that each {@link #receive()} call returns.
*/
public interface ReceivingMailbox<T> {
/**
* Get the unique identifier for the mailbox.
- *
- * @return Mailbox ID.
*/
String getMailboxId();
/**
- * Receive a data packet from the mailbox. Depending on the implementation,
this may return null. The caller should
- * use {@link ReceivingMailbox#isClosed()} to determine if the sender is
done sending and the channel is closed.
- * @return data packet.
- * @throws Exception
+ * Returns a unit of data. Implementations are allowed to return null, in
which case {@link MailboxReceiveOperator}
+ * will assume that this mailbox doesn't have any data to return and it will
instead poll the other mailbox (if any).
*/
- T receive()
- throws Exception;
+ @Nullable
+ T receive() throws Exception;
/**
- * Check if receiving mailbox is initialized.
- * @return
+ * A ReceivingMailbox is considered initialized when it has a reference to
the underlying channel used for receiving
+ * the data. The underlying channel may be a gRPC stream, in-memory queue,
etc. Once a receiving mailbox is
+ * initialized, it has the ability to close the underlying channel via the
{@link #cancel()} method.
*/
boolean isInitialized();
/**
- * Check if mailbox is closed.
- * @return
+ * A ReceivingMailbox is considered closed if it has sent all the data to
the receiver and doesn't have any more data
+ * to send.
*/
boolean isClosed();
- void cancel(Throwable e);
+ /**
+ * A ReceivingMailbox may hold a reference to the underlying channel.
Usually the channel would be automatically
+ * closed once all the data has been received by the receiver, and in such
cases {@link #isClosed()} returns true.
+ * However in failure scenarios the underlying channel may not be released,
and the receiver can use this method to
+ * ensure the same.
+ *
+ * This API should ensure that the underlying channel is "released" if it
hasn't been already. If the channel has
+ * already been released, the API shouldn't throw and instead return
gracefully.
+ *
+ * <p>
+ * This method may be called multiple times, so implementations should
ensure this is idempotent.
+ * </p>
+ */
+ void cancel();
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
index 6cc162b0ec..24651348ac 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/SendingMailbox.java
@@ -18,20 +18,15 @@
*/
package org.apache.pinot.query.mailbox;
-import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
/**
- * Mailbox is used to send and receive data.
+ * Mailbox that's used to send data.
*
- * Mailbox should be instantiated on both side of MailboxServer.
- *
- * @param <T> type of data carried over the mailbox.
+ * @param <T> unit of data sent in one {@link #send} call.
*/
public interface SendingMailbox<T> {
-
- void open();
-
/**
* get the unique identifier for the mailbox.
*
@@ -40,19 +35,36 @@ public interface SendingMailbox<T> {
String getMailboxId();
/**
- * send a data packet through the mailbox.
- * @param data
- * @throws UnsupportedOperationException
+ * Send a single unit of data to a receiver. Note that SendingMailbox are
required to acquire resources lazily in
+ * this call and they should <b>not</b> acquire any resources when they are
created. This method should throw if there
+ * was an error sending the data, since that would allow {@link
BlockExchange} to exit early.
*/
void send(T data)
- throws UnsupportedOperationException;
+ throws Exception;
/**
- * Complete delivery of the current mailbox.
+ * Called when there is no more data to be sent by the {@link
BlockExchange}. This is also a signal for the
+ * SendingMailbox that the sender is done sending data from its end. Note
that this doesn't mean that the receiver
+ * has received all the data.
+ *
+ * <p>
+ * <b>Note:</b> While this is similar to a close() method that's usually
provided with objects that hold releasable
+ * resources, the key difference is that a SendingMailbox cannot completely
release the resources on its end
+ * gracefully, since it would be waiting for the receiver to ack that it has
received all the data. See
+ * {@link #cancel} which can allow callers to force release the underlying
resources.
+ * </p>
*/
- void complete();
+ void complete()
+ throws Exception;
- void waitForFinish(long timeout, TimeUnit unit) throws InterruptedException;
+ /**
+ * A SendingMailbox is considered initialized after it has acquired a
reference to the underlying channel that will
+ * be used to send data to the receiver.
+ */
+ boolean isInitialized();
+ /**
+ * Allows terminating the underlying channel.
+ */
void cancel(Throwable t);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java
new file mode 100644
index 0000000000..8bf65110be
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryTransferStream.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.annotation.Nullable;
+import org.apache.pinot.query.mailbox.InMemoryMailboxService;
+import org.apache.pinot.query.mailbox.InMemoryReceivingMailbox;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+public class InMemoryTransferStream {
+
+ private MailboxIdentifier _mailboxId;
+ private BlockingQueue<TransferableBlock> _queue;
+ private InMemoryMailboxService _mailboxService;
+ private final long _deadlineMs;
+ private boolean _receivingMailboxInitialized = false;
+ private boolean _isCancelled = false;
+ private boolean _isCompleted = false;
+
+ public InMemoryTransferStream(MailboxIdentifier mailboxId,
InMemoryMailboxService mailboxService, long deadlineMs) {
+ _mailboxId = mailboxId;
+ _queue = new LinkedBlockingQueue<>();
+ _mailboxService = mailboxService;
+ _deadlineMs = deadlineMs;
+ }
+
+ public void send(TransferableBlock block) {
+ Preconditions.checkState(!isCancelled(), "Tried to send on a cancelled
InMemory stream");
+ // TODO: Deadline check can be more efficient.
+ // While, in most cases the receiver would have anyways called cancel, for
expensive queries it is possible that
+ // the receiver may have hung-up before it could get a reference to the
stream. This can happen if the sending
+ // OpChain was running an expensive operation (like a large hash-join).
+ long currentTime = System.currentTimeMillis();
+ Preconditions.checkState(currentTime < _deadlineMs,
+ String.format("Deadline exceeded by %s ms", currentTime -
_deadlineMs));
+ if (!_receivingMailboxInitialized) {
+ InMemoryReceivingMailbox receivingMailbox =
+ (InMemoryReceivingMailbox)
_mailboxService.getReceivingMailbox(_mailboxId);
+ receivingMailbox.init(this);
+ _receivingMailboxInitialized = true;
+ }
+ _queue.offer(block);
+ }
+
+ @Nullable
+ public TransferableBlock poll()
+ throws InterruptedException {
+ if (_isCancelled) {
+ return TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException("InMemoryTransferStream is cancelled"));
+ } else if (System.currentTimeMillis() > _deadlineMs) {
+ return TransferableBlockUtils.getErrorTransferableBlock(
+ new RuntimeException("Deadline reached for in-memory transfer
stream"));
+ }
+ return _queue.poll();
+ }
+
+ public void complete() {
+ _isCompleted = true;
+ }
+
+ public int size() {
+ return _queue.size();
+ }
+
+ public void cancel() {
+ _isCancelled = true;
+ // Eagerly lose references to the underlying data.
+ _queue.clear();
+ }
+
+ public boolean isCompleted() {
+ return _isCompleted;
+ }
+
+ public boolean isCancelled() {
+ return _isCancelled;
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
index 220e429caa..c8375d4eed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java
@@ -19,15 +19,15 @@
package org.apache.pinot.query.mailbox.channel;
import com.google.protobuf.ByteString;
+import io.grpc.Context;
+import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
-import javax.annotation.concurrent.GuardedBy;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.query.mailbox.GrpcMailboxService;
import org.apache.pinot.query.mailbox.GrpcReceivingMailbox;
@@ -37,8 +37,6 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.lang.Math.max;
-
/**
* {@code MailboxContentStreamObserver} is the content streaming observer used
to receive mailbox content.
@@ -48,59 +46,41 @@ import static java.lang.Math.max;
* to the sender side.
*/
public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.MailboxContent> {
+ public static final int DEFAULT_MAX_PENDING_MAILBOX_CONTENT = 5;
+
private static final Logger LOGGER =
LoggerFactory.getLogger(MailboxContentStreamObserver.class);
+ private static final Mailbox.MailboxContent DEFAULT_ERROR_MAILBOX_CONTENT;
+ // This delta is added to the buffer offer on top of the query timeout to
avoid a race between client cancellation
+ // due to deadline and server-side cancellation due to the receiving buffer
being full.
+ private static final long BUFFER_OFFER_TIMEOUT_DELTA_MS = 1_000;
- private static Mailbox.MailboxContent createErrorContent(Throwable e)
- throws IOException {
- return Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
- TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException(e)).getDataBlock().toBytes()))
- .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true").build();
+ static {
+ try {
+ RuntimeException exception = new RuntimeException(
+ "Error creating error-content.. please file a bug in the
apache/pinot repo");
+ DEFAULT_ERROR_MAILBOX_CONTENT =
Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+
TransferableBlockUtils.getErrorTransferableBlock(exception).getDataBlock().toBytes()))
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true").build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
private final GrpcMailboxService _mailboxService;
private final StreamObserver<Mailbox.MailboxStatus> _responseObserver;
- private final boolean _isEnabledFeedback;
private final AtomicBoolean _isCompleted = new AtomicBoolean(false);
private final BlockingQueue<Mailbox.MailboxContent> _receivingBuffer;
- private ReadWriteLock _bufferSizeLock = new ReentrantReadWriteLock();
- @GuardedBy("bufferSizeLock")
- private int _maxBufferSize = 0;
- private ReadWriteLock _errorLock = new ReentrantReadWriteLock();
- @GuardedBy("_errorLock")
private Mailbox.MailboxContent _errorContent = null;
private JsonMailboxIdentifier _mailboxId;
private Consumer<MailboxIdentifier> _gotMailCallback;
- private void updateMaxBufferSize() {
- _bufferSizeLock.writeLock().lock();
- _maxBufferSize = max(_maxBufferSize, _receivingBuffer.size());
- _bufferSizeLock.writeLock().unlock();
- }
-
- private int getMaxBufferSize() {
- try {
- _bufferSizeLock.readLock().lock();
- return _maxBufferSize;
- } finally {
- _bufferSizeLock.readLock().unlock();
- }
- }
-
public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
StreamObserver<Mailbox.MailboxStatus> responseObserver) {
- this(mailboxService, responseObserver, false);
- }
-
- public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
- StreamObserver<Mailbox.MailboxStatus> responseObserver, boolean
isEnabledFeedback) {
_mailboxService = mailboxService;
_responseObserver = responseObserver;
- // TODO: Replace unbounded queue with bounded queue when we have
backpressure in place.
- // It is possible this will create high memory pressure since we have
memory leak issues.
- _receivingBuffer = new LinkedBlockingQueue();
- _isEnabledFeedback = isEnabledFeedback;
+ _receivingBuffer = new
ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_MAILBOX_CONTENT);
}
/**
@@ -110,92 +90,82 @@ public class MailboxContentStreamObserver implements
StreamObserver<Mailbox.Mail
* to indicate when to call this method.
*/
public Mailbox.MailboxContent poll() {
- try {
- _errorLock.readLock().lock();
- if (_errorContent != null) {
- return _errorContent;
- }
- } finally {
- _errorLock.readLock().unlock();
+ if (_errorContent != null) {
+ return _errorContent;
}
- if (isCompleted()) {
+ if (hasConsumedAllData()) {
return null;
}
return _receivingBuffer.poll();
}
- public boolean isCompleted() {
- return _isCompleted.get() && _receivingBuffer.isEmpty();
- }
-
@Override
public void onNext(Mailbox.MailboxContent mailboxContent) {
_mailboxId = JsonMailboxIdentifier.parse(mailboxContent.getMailboxId());
+ long remainingTimeMs =
Context.current().getDeadline().timeRemaining(TimeUnit.MILLISECONDS);
GrpcReceivingMailbox receivingMailbox = (GrpcReceivingMailbox)
_mailboxService.getReceivingMailbox(_mailboxId);
- _gotMailCallback = receivingMailbox.init(this);
+ _gotMailCallback = receivingMailbox.init(this, _responseObserver);
+ if (_errorContent != null) {
+ // This should never happen because gRPC calls StreamObserver in a
single-threaded fashion, and if error-content
+ // is not null then we would have already issued a onError which means
gRPC will not call onNext again.
+ LOGGER.warn("onNext called even though already errored out");
+ return;
+ }
if
(!mailboxContent.getMetadataMap().containsKey(ChannelUtils.MAILBOX_METADATA_BEGIN_OF_STREAM_KEY))
{
// when the receiving end receives a message put it in the mailbox queue.
- // TODO: pass a timeout to _receivingBuffer.
- if (!_receivingBuffer.offer(mailboxContent)) {
- // TODO: close the stream.
- RuntimeException e = new RuntimeException("Mailbox receivingBuffer is
full:" + _mailboxId);
- LOGGER.error(e.getMessage());
- try {
- _errorLock.writeLock().lock();
- _errorContent = createErrorContent(e);
- } catch (IOException ioe) {
- e = new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
- LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:",
_mailboxId);
+ try {
+ final long offerTimeoutMs = remainingTimeMs +
BUFFER_OFFER_TIMEOUT_DELTA_MS;
+ if (!_receivingBuffer.offer(mailboxContent, offerTimeoutMs,
TimeUnit.MILLISECONDS)) {
+ RuntimeException e = new RuntimeException("Timed out offering to the
receivingBuffer: " + _mailboxId);
LOGGER.error(e.getMessage());
- throw e;
- } finally {
- _errorLock.writeLock().unlock();
+ _errorContent = createErrorContent(e);
+ try {
+ _responseObserver.onError(Status.CANCELLED.asRuntimeException());
+ } catch (Exception ignored) {
+ // Exception can be thrown if the stream deadline has already been
reached, so we simply ignore it.
+ }
}
+ } catch (InterruptedException e) {
+ _errorContent = createErrorContent(e);
+ LOGGER.error("Interrupted while polling receivingBuffer", e);
+ _responseObserver.onError(Status.CANCELLED.asRuntimeException());
}
_gotMailCallback.accept(_mailboxId);
-
- updateMaxBufferSize();
-
- if (_isEnabledFeedback) {
- // TODO: this has race conditions with onCompleted() because sender
blindly closes connection channels once
- // it has finished sending all the data packets.
- int remainingCapacity = _receivingBuffer.remainingCapacity() - 1;
- Mailbox.MailboxStatus.Builder builder =
-
Mailbox.MailboxStatus.newBuilder().setMailboxId(mailboxContent.getMailboxId())
- .putMetadata(ChannelUtils.MAILBOX_METADATA_BUFFER_SIZE_KEY,
String.valueOf(remainingCapacity));
- if
(mailboxContent.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY)
!= null) {
- builder.putAllMetadata(mailboxContent.getMetadataMap());
- builder.putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true");
- }
- Mailbox.MailboxStatus status = builder.build();
- // returns the buffer available size to sender for rate controller /
throttling.
- _responseObserver.onNext(status);
- }
}
}
@Override
public void onError(Throwable e) {
- try {
- _errorLock.writeLock().lock();
+ if (_errorContent == null) {
_errorContent = createErrorContent(e);
- _gotMailCallback.accept(_mailboxId);
- throw new RuntimeException(e);
- } catch (IOException ioe) {
- throw new RuntimeException("Unable to encode exception for cascade
reporting: " + e, ioe);
- } finally {
- _errorLock.writeLock().unlock();
- LOGGER.error("MaxBufferSize:", getMaxBufferSize(), " for mailbox:",
_mailboxId);
}
+ _gotMailCallback.accept(_mailboxId);
}
@Override
public void onCompleted() {
_isCompleted.set(true);
_responseObserver.onCompleted();
- LOGGER.debug("MaxBufferSize:", getMaxBufferSize(), " for mailbox:",
_mailboxId);
+ }
+
+ /**
+ * @return true if all data has been received via {@link #poll()}.
+ */
+ public boolean hasConsumedAllData() {
+ return _isCompleted.get() && _receivingBuffer.isEmpty();
+ }
+
+ private static Mailbox.MailboxContent createErrorContent(Throwable e) {
+ try {
+ return
Mailbox.MailboxContent.newBuilder().setPayload(ByteString.copyFrom(
+ TransferableBlockUtils.getErrorTransferableBlock(new
RuntimeException(e)).getDataBlock().toBytes()))
+ .putMetadata(ChannelUtils.MAILBOX_METADATA_END_OF_STREAM_KEY,
"true").build();
+ } catch (IOException ioException) {
+ LOGGER.error("Error creating error MailboxContent", ioException);
+ return DEFAULT_ERROR_MAILBOX_CONTENT;
+ }
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
index fd7443db12..291a38325b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxStatusStreamObserver.java
@@ -38,10 +38,9 @@ public class MailboxStatusStreamObserver implements
StreamObserver<Mailbox.Mailb
private static final int DEFAULT_MAILBOX_QUEUE_CAPACITY = 5;
private final AtomicInteger _bufferSize = new AtomicInteger(5);
- private CountDownLatch _finishLatch;
+ private final CountDownLatch _finishLatch = new CountDownLatch(1);
- public MailboxStatusStreamObserver(CountDownLatch finishLatch) {
- _finishLatch = finishLatch;
+ public MailboxStatusStreamObserver() {
}
@Override
@@ -60,12 +59,15 @@ public class MailboxStatusStreamObserver implements
StreamObserver<Mailbox.Mailb
@Override
public void onError(Throwable e) {
_finishLatch.countDown();
- LOGGER.error("Receiving error msg from grpc mailbox status stream:", e);
- throw new RuntimeException(e);
+ LOGGER.error("[mailbox] Server returned onError", e);
}
@Override
public void onCompleted() {
_finishLatch.countDown();
}
+
+ public boolean isFinished() {
+ return _finishLatch.getCount() == 0;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index a1d22c678f..f7967322a0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -161,13 +161,38 @@ public class QueryRunner {
public void processQuery(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
long requestId =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+ long timeoutMs =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+ long deadlineMs = System.currentTimeMillis() + timeoutMs;
if (isLeafStage(distributedStagePlan)) {
- // TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
- // and package it here for return. But we should really use a
MailboxSendOperator directly put into the
- // server executor.
+ runLeafStage(distributedStagePlan, requestMetadataMap, deadlineMs,
requestId);
+ } else {
+ StageNode stageRoot = distributedStagePlan.getStageRoot();
+ OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+ new PlanRequestContext(_mailboxService, requestId,
stageRoot.getStageId(), timeoutMs, deadlineMs,
+ new VirtualServerAddress(distributedStagePlan.getServer()),
distributedStagePlan.getMetadataMap()));
+ _scheduler.register(rootOperator);
+ }
+ }
+
+ public ExecutorService getQueryWorkerExecutorService() {
+ return _queryWorkerExecutorService;
+ }
+
+ public ExecutorService getQueryRunnerExecutorService() {
+ return _queryRunnerExecutorService;
+ }
+
+ private void runLeafStage(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap,
+ long deadlineMs, long requestId) {
+ // TODO: make server query request return via mailbox, this is a hack to
gather the non-streaming data table
+ // and package it here for return. But we should really use a
MailboxSendOperator directly put into the
+ // server executor.
+ MailboxSendOperator mailboxSendOperator = null;
+ try {
long leafStageStartMillis = System.currentTimeMillis();
List<ServerPlanRequestContext> serverQueryRequests =
- constructServerQueryRequests(distributedStagePlan,
requestMetadataMap, _helixPropertyStore, _mailboxService);
+ constructServerQueryRequests(distributedStagePlan,
requestMetadataMap, _helixPropertyStore, _mailboxService,
+ deadlineMs);
// send the data table via mailbox in one-off fashion (e.g. no
block-level split, one data table/partition key)
List<InstanceResponseBlock> serverQueryResults = new
ArrayList<>(serverQueryRequests.size());
@@ -181,37 +206,27 @@ public class QueryRunner {
+ (System.currentTimeMillis() - leafStageStartMillis) + " ms");
MailboxSendNode sendNode = (MailboxSendNode)
distributedStagePlan.getStageRoot();
StageMetadata receivingStageMetadata =
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
- MailboxSendOperator mailboxSendOperator = new
MailboxSendOperator(_mailboxService,
+ mailboxSendOperator = new MailboxSendOperator(_mailboxService,
new LeafStageTransferableBlockOperator(serverQueryResults,
sendNode.getDataSchema(), requestId,
sendNode.getStageId(), _rootServer),
receivingStageMetadata.getServerInstances(),
sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
_rootServer, requestId,
- sendNode.getStageId(), sendNode.getReceiverStageId());
+ sendNode.getStageId(), sendNode.getReceiverStageId(), deadlineMs);
int blockCounter = 0;
while
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
LOGGER.debug("Acquired transferable block: {}", blockCounter++);
}
- mailboxSendOperator.toExplainString();
- } else {
- long timeoutMs =
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
- StageNode stageRoot = distributedStagePlan.getStageRoot();
- OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
- new PlanRequestContext(_mailboxService, requestId,
stageRoot.getStageId(), timeoutMs,
- new VirtualServerAddress(distributedStagePlan.getServer()),
distributedStagePlan.getMetadataMap()));
- _scheduler.register(rootOperator);
+ mailboxSendOperator.close();
+ } catch (Exception e) {
+ LOGGER.error(String.format("Error running leafStage for requestId=%s",
requestId), e);
+ if (mailboxSendOperator != null) {
+ mailboxSendOperator.cancel(e);
+ }
}
}
- public ExecutorService getQueryWorkerExecutorService() {
- return _queryWorkerExecutorService;
- }
-
- public ExecutorService getQueryRunnerExecutorService() {
- return _queryRunnerExecutorService;
- }
-
private static List<ServerPlanRequestContext>
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord>
helixPropertyStore,
- MailboxService<TransferableBlock> mailboxService) {
+ MailboxService<TransferableBlock> mailboxService, long deadlineMs) {
StageMetadata stageMetadata =
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
"Server request for V2 engine should only have 1 scan table per
request.");
@@ -231,7 +246,7 @@ public class QueryRunner {
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
requests.add(
ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap, tableConfig,
- schema, stageMetadata.getTimeBoundaryInfo(),
TableType.OFFLINE, tableEntry.getValue()));
+ schema, stageMetadata.getTimeBoundaryInfo(),
TableType.OFFLINE, tableEntry.getValue(), deadlineMs));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
@@ -239,7 +254,7 @@ public class QueryRunner {
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
requests.add(
ServerRequestPlanVisitor.build(mailboxService,
distributedStagePlan, requestMetadataMap, tableConfig,
- schema, stageMetadata.getTimeBoundaryInfo(),
TableType.REALTIME, tableEntry.getValue()));
+ schema, stageMetadata.getTimeBoundaryInfo(),
TableType.REALTIME, tableEntry.getValue(), deadlineMs));
} else {
throw new IllegalArgumentException("Unsupported table type key: " +
tableType);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 15ae722142..5e4260368b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -68,6 +68,7 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
@Override
public void runJob() {
boolean isFinished = false;
+ boolean returnedErrorBlock = false;
Throwable thrown = null;
try {
LOGGER.trace("({}): Executing", operatorChain);
@@ -88,6 +89,7 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
} else {
isFinished = true;
if (result.isErrorBlock()) {
+ returnedErrorBlock = true;
LOGGER.error("({}): Completed erroneously {} {}",
operatorChain, operatorChain.getStats(),
result.getDataBlock().getExceptions());
} else {
@@ -99,11 +101,10 @@ public class OpChainSchedulerService extends
AbstractExecutionThreadService {
LOGGER.error("({}): Failed to execute operator chain! {}",
operatorChain, operatorChain.getStats(), e);
thrown = e;
} finally {
- if (isFinished) {
- closeOpChain(operatorChain);
- } else if (thrown != null) {
- // TODO: It would make sense to cancel OpChains if they returned
an error-block.
+ if (returnedErrorBlock || thrown != null) {
cancelOpChain(operatorChain, thrown);
+ } else if (isFinished) {
+ closeOpChain(operatorChain);
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index 0b16a47229..74bd60e10d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -186,4 +186,20 @@ public class MailboxReceiveOperator extends
MultiStageOperator {
: TransferableBlockUtils.getEndOfStreamTransferableBlock();
return block;
}
+
+ @Override
+ public void close() {
+ super.close();
+ for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+ _mailboxService.releaseReceivingMailbox(sendingMailbox);
+ }
+ }
+
+ @Override
+ public void cancel(Throwable t) {
+ super.cancel(t);
+ for (MailboxIdentifier sendingMailbox : _sendingMailbox) {
+ _mailboxService.releaseReceivingMailbox(sendingMailbox);
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index 504a0d997b..f4e7976527 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -59,7 +59,8 @@ public class MailboxSendOperator extends MultiStageOperator {
@VisibleForTesting
interface BlockExchangeFactory {
BlockExchange build(MailboxService<TransferableBlock> mailboxService,
List<MailboxIdentifier> destinations,
- RelDistribution.Type exchange, KeySelector<Object[], Object[]>
selector, BlockSplitter splitter);
+ RelDistribution.Type exchange, KeySelector<Object[], Object[]>
selector, BlockSplitter splitter,
+ long deadlineMs);
}
@VisibleForTesting
@@ -70,10 +71,10 @@ public class MailboxSendOperator extends MultiStageOperator
{
public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer>
receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector,
- VirtualServerAddress sendingServer, long jobId, int senderStageId, int
receiverStageId) {
+ VirtualServerAddress sendingServer, long jobId, int senderStageId, int
receiverStageId, long deadlineMs) {
this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances,
exchangeType, keySelector,
server -> toMailboxId(server, jobId, senderStageId, receiverStageId,
sendingServer), BlockExchange::getExchange,
- jobId, senderStageId, receiverStageId, sendingServer);
+ jobId, senderStageId, receiverStageId, sendingServer, deadlineMs);
}
@VisibleForTesting
@@ -81,7 +82,7 @@ public class MailboxSendOperator extends MultiStageOperator {
MultiStageOperator dataTableBlockBaseOperator, List<VirtualServer>
receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector<Object[], Object[]>
keySelector,
MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory
blockExchangeFactory, long jobId, int senderStageId,
- int receiverStageId, VirtualServerAddress serverAddress) {
+ int receiverStageId, VirtualServerAddress serverAddress, long
deadlineMs) {
super(jobId, senderStageId, serverAddress);
_dataTableBlockBaseOperator = dataTableBlockBaseOperator;
@@ -111,7 +112,8 @@ public class MailboxSendOperator extends MultiStageOperator
{
}
BlockSplitter splitter = TransferableBlockUtils::splitBlock;
- _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes,
exchangeType, keySelector, splitter);
+ _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes,
exchangeType, keySelector, splitter,
+ deadlineMs);
Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
String.format("Exchange type '%s' is not supported yet",
exchangeType));
@@ -141,9 +143,6 @@ public class MailboxSendOperator extends MultiStageOperator
{
transferableBlock = _dataTableBlockBaseOperator.nextBlock();
}
} catch (final Exception e) {
- // ideally, MailboxSendOperator doesn't ever throw an exception because
- // it will just get swallowed, in this scenario at least we can forward
- // any upstream exceptions as an error block
transferableBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
try {
_exchange.send(transferableBlock);
@@ -154,6 +153,18 @@ public class MailboxSendOperator extends
MultiStageOperator {
return transferableBlock;
}
+ @Override
+ public void close() {
+ super.close();
+ _exchange.close();
+ }
+
+ @Override
+ public void cancel(Throwable t) {
+ super.cancel(t);
+ _exchange.cancel(t);
+ }
+
private static JsonMailboxIdentifier toMailboxId(
VirtualServer destination, long jobId, int senderStageId, int
receiverStageId, VirtualServerAddress sender) {
return new JsonMailboxIdentifier(
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index e6f131b33e..03d9ee2d23 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -29,6 +29,8 @@ import org.apache.pinot.query.mailbox.SendingMailbox;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.runtime.blocks.BlockSplitter;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -36,6 +38,7 @@ import
org.apache.pinot.query.runtime.blocks.TransferableBlock;
* exchanging data across different servers.
*/
public abstract class BlockExchange {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BlockExchange.class);
// TODO: Deduct this value via grpc config maximum byte size; and make it
configurable with override.
// TODO: Max block size is a soft limit. only counts fixedSize datatable
byte buffer
private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
@@ -44,10 +47,10 @@ public abstract class BlockExchange {
public static BlockExchange getExchange(MailboxService<TransferableBlock>
mailboxService,
List<MailboxIdentifier> destinations, RelDistribution.Type exchangeType,
KeySelector<Object[], Object[]> selector,
- BlockSplitter splitter) {
+ BlockSplitter splitter, long deadlineMs) {
List<SendingMailbox<TransferableBlock>> sendingMailboxes = new
ArrayList<>();
for (MailboxIdentifier mid : destinations) {
- sendingMailboxes.add(mailboxService.getSendingMailbox(mid));
+ sendingMailboxes.add(mailboxService.getSendingMailbox(mid, deadlineMs));
}
switch (exchangeType) {
case SINGLETON:
@@ -71,15 +74,19 @@ public abstract class BlockExchange {
_splitter = splitter;
}
- public void send(TransferableBlock block) {
+ public void send(TransferableBlock block)
+ throws Exception {
if (block.isEndOfStreamBlock()) {
- _sendingMailboxes.forEach(destination -> sendBlock(destination, block));
+ for (SendingMailbox<TransferableBlock> sendingMailbox :
_sendingMailboxes) {
+ sendBlock(sendingMailbox, block);
+ }
return;
}
route(_sendingMailboxes, block);
}
- protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox,
TransferableBlock block) {
+ protected void sendBlock(SendingMailbox<TransferableBlock> sendingMailbox,
TransferableBlock block)
+ throws Exception {
if (block.isEndOfStreamBlock()) {
sendingMailbox.send(block);
sendingMailbox.complete();
@@ -93,5 +100,17 @@ public abstract class BlockExchange {
}
}
- protected abstract void route(List<SendingMailbox<TransferableBlock>>
destinations, TransferableBlock block);
+ protected abstract void route(List<SendingMailbox<TransferableBlock>>
destinations, TransferableBlock block)
+ throws Exception;
+
+ // Called when the OpChain gracefully returns.
+ // TODO: This is a no-op right now.
+ public void close() {
+ }
+
+ public void cancel(Throwable t) {
+ for (SendingMailbox<TransferableBlock> sendingMailbox : _sendingMailboxes)
{
+ sendingMailbox.cancel(t);
+ }
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
index 932f7593b6..e9c44d7502 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchange.java
@@ -34,7 +34,8 @@ class BroadcastExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block) {
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block)
+ throws Exception {
for (SendingMailbox mailbox : destinations) {
sendBlock(mailbox, block);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
index 99f0e04f91..b7d5a4a15c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java
@@ -43,7 +43,8 @@ class HashExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block) {
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block)
+ throws Exception {
List<Object[]>[] destIdxToRows = new List[destinations.size()];
for (Object[] row : block.getContainer()) {
int partition = _keySelector.computeHash(row) % destinations.size();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
index 0073e28bf0..dc015378ca 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchange.java
@@ -48,7 +48,8 @@ class RandomExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block) {
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block)
+ throws Exception {
int destinationIdx = _rand.apply(destinations.size());
sendBlock(destinations.get(destinationIdx), block);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
index 713cfa5ba2..ebe9eb370e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchange.java
@@ -35,7 +35,8 @@ class SingletonExchange extends BlockExchange {
}
@Override
- protected void route(List<SendingMailbox<TransferableBlock>> mailbox,
TransferableBlock block) {
+ protected void route(List<SendingMailbox<TransferableBlock>> mailbox,
TransferableBlock block)
+ throws Exception {
for (SendingMailbox sendingMailbox : mailbox) {
sendBlock(sendingMailbox, block);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index dec70bd4e2..8ff1c041d1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -80,7 +80,8 @@ public class PhysicalPlanVisitor implements
StageNodeVisitor<MultiStageOperator,
StageMetadata receivingStageMetadata =
context.getMetadataMap().get(node.getReceiverStageId());
return new MailboxSendOperator(context.getMailboxService(), nextOperator,
receivingStageMetadata.getServerInstances(), node.getExchangeType(),
node.getPartitionKeySelector(),
- context.getServer(), context.getRequestId(), node.getStageId(),
node.getReceiverStageId());
+ context.getServer(), context.getRequestId(), node.getStageId(),
node.getReceiverStageId(),
+ context.getDeadlineMs());
}
@Override
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
index db5b0e028a..dac0f97575 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -33,18 +33,21 @@ public class PlanRequestContext {
protected final MailboxService<TransferableBlock> _mailboxService;
protected final long _requestId;
protected final int _stageId;
+ // TODO: Timeout is not needed since deadline is already present.
private final long _timeoutMs;
+ private final long _deadlineMs;
protected final VirtualServerAddress _server;
protected final Map<Integer, StageMetadata> _metadataMap;
protected final List<MailboxIdentifier> _receivingMailboxes = new
ArrayList<>();
public PlanRequestContext(MailboxService<TransferableBlock> mailboxService,
long requestId, int stageId,
- long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata>
metadataMap) {
+ long timeoutMs, long deadlineMs, VirtualServerAddress server,
Map<Integer, StageMetadata> metadataMap) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
_timeoutMs = timeoutMs;
+ _deadlineMs = deadlineMs;
_server = server;
_metadataMap = metadataMap;
}
@@ -61,6 +64,10 @@ public class PlanRequestContext {
return _timeoutMs;
}
+ public long getDeadlineMs() {
+ return _deadlineMs;
+ }
+
public VirtualServerAddress getServer() {
return _server;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
index 55141cc739..b6194bd15b 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -91,7 +91,7 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
public static ServerPlanRequestContext
build(MailboxService<TransferableBlock> mailboxService,
DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap,
TableConfig tableConfig, Schema schema,
- TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String>
segmentList) {
+ TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String>
segmentList, long deadlineMs) {
// Before-visit: construct the ServerPlanRequestContext baseline
// Making a unique requestId for leaf stages otherwise it causes problem
on stats/metrics/tracing.
long requestId =
(Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID))
<< 16)
@@ -107,7 +107,7 @@ public class ServerRequestPlanVisitor implements
StageNodeVisitor<Void, ServerPl
LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
pinotQuery.setExplain(false);
ServerPlanRequestContext context =
- new ServerPlanRequestContext(mailboxService, requestId,
stagePlan.getStageId(), timeoutMs,
+ new ServerPlanRequestContext(mailboxService, requestId,
stagePlan.getStageId(), timeoutMs, deadlineMs,
new VirtualServerAddress(stagePlan.getServer()),
stagePlan.getMetadataMap(), pinotQuery, tableType,
timeBoundaryInfo);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 35142fe0cf..4403d35e6d 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -42,9 +42,9 @@ public class ServerPlanRequestContext extends
PlanRequestContext {
protected InstanceRequest _instanceRequest;
public ServerPlanRequestContext(MailboxService<TransferableBlock>
mailboxService, long requestId, int stageId,
- long timeoutMs, VirtualServerAddress server, Map<Integer, StageMetadata>
metadataMap, PinotQuery pinotQuery,
- TableType tableType, TimeBoundaryInfo timeBoundaryInfo) {
- super(mailboxService, requestId, stageId, timeoutMs, server, metadataMap);
+ long timeoutMs, long deadlineMs, VirtualServerAddress server,
Map<Integer, StageMetadata> metadataMap,
+ PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo
timeBoundaryInfo) {
+ super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server,
metadataMap);
_pinotQuery = pinotQuery;
_tableType = tableType;
_timeBoundaryInfo = timeBoundaryInfo;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
index 9b7d0426de..7d08439601 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java
@@ -21,12 +21,16 @@ package org.apache.pinot.query.mailbox;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import org.apache.commons.collections.MapUtils;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.channel.MailboxContentStreamObserver;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.service.QueryConfig;
@@ -76,13 +80,10 @@ public class GrpcMailboxServiceTest {
@Test(timeOut = 10_000L)
public void testHappyPath()
throws Exception {
+ final long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
- JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
- "happypath",
- new VirtualServerAddress("localhost",
_mailboxService1.getMailboxPort(), 0),
- new VirtualServerAddress("localhost",
_mailboxService2.getMailboxPort(), 0),
- DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
- SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId);
+ JsonMailboxIdentifier mailboxId = createMailboxId("happypath");
+ SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId, deadlineMs);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
_mail2GotData.set(ignored -> gotData.countDown());
@@ -109,14 +110,10 @@ public class GrpcMailboxServiceTest {
@Test(timeOut = 10_000L)
public void testGrpcException()
throws Exception {
+ final long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
- JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
- "exception",
- new VirtualServerAddress("localhost",
_mailboxService1.getMailboxPort(), 0),
- new VirtualServerAddress("localhost",
_mailboxService2.getMailboxPort(), 0),
- DEFAULT_SENDER_STAGE_ID,
- DEFAULT_RECEIVER_STAGE_ID);
- SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId);
+ JsonMailboxIdentifier mailboxId = createMailboxId("exception");
+ SendingMailbox<TransferableBlock> sendingMailbox =
_mailboxService1.getSendingMailbox(mailboxId, deadlineMs);
ReceivingMailbox<TransferableBlock> receivingMailbox =
_mailboxService2.getReceivingMailbox(mailboxId);
CountDownLatch gotData = new CountDownLatch(1);
_mail2GotData.set(ignored -> gotData.countDown());
@@ -134,6 +131,207 @@ public class GrpcMailboxServiceTest {
Assert.assertFalse(receivedDataBlock.getExceptions().isEmpty());
}
+ /**
+ * When the connection reaches deadline before the EOS block could be sent,
the receiving mailbox should return a
+ * error block.
+ */
+ @Test
+ public void testGrpcStreamDeadline()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 1_000;
+ JsonMailboxIdentifier mailboxId = createMailboxId("conndeadline");
+
+ GrpcSendingMailbox grpcSendingMailbox =
+ (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId,
deadlineMs);
+ GrpcReceivingMailbox grpcReceivingMailbox =
+ (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+ CountDownLatch latch = new CountDownLatch(2);
+ Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+ @Override
+ public void accept(MailboxIdentifier mailboxIdentifier) {
+ latch.countDown();
+ }
+ };
+ _mail2GotData.set(callback);
+
+ // Send 1 normal block.
+ grpcSendingMailbox.send(getTestTransferableBlock());
+
+ // Latch had started with count=2. We don't send any EOS block and instead
wait for connection deadline to
+ // trigger the next callback. The latch won't await the full wait timeout
and instead should return immediately
+ // as soon as the deadline is hit and MailboxContentStreamObserver#onError
is called.
+ Assert.assertTrue(latch.await(4_000, TimeUnit.SECONDS));
+
+ // In case of errors, MailboxContentStreamObserver short-circuits and
skips returning the normal data-block.
+ TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertTrue(receivedBlock.isErrorBlock());
+ Map<Integer, String> exceptions =
receivedBlock.getDataBlock().getExceptions();
+ Assert.assertTrue(MapUtils.isNotEmpty(exceptions));
+ String exceptionMessage = exceptions.values().iterator().next();
+ Assert.assertTrue(exceptionMessage.contains("CANCELLED"));
+
+ // GrpcReceivingMailbox#cancel shouldn't throw and instead silently
swallow exception
+ grpcReceivingMailbox.cancel();
+ }
+
+ /**
+ * This test ensures that when the buffer in MailboxContentStreamObserver is
full:
+ *
+ * 1. The sender is not blocked and can complete successfully.
+ * 2. The gotMail callback is called (bufferSize + 1) times.
+ * 3. The offer to the buffer in MailboxContentStreamObserver times out
around the time the query deadline is reached.
+ * 4. A error-block is returned by a subsequent {@link
GrpcReceivingMailbox#receive()} call.
+ */
+ @Test
+ public void testMailboxContentStreamBufferFull()
+ throws Exception {
+ final int bufferSize =
MailboxContentStreamObserver.DEFAULT_MAX_PENDING_MAILBOX_CONTENT;
+ long queryTimeoutMs = 2_000;
+ long deadlineMs = System.currentTimeMillis() + queryTimeoutMs;
+ int blocksSent = 20;
+ JsonMailboxIdentifier mailboxId = createMailboxId("buffer-full");
+
+ GrpcSendingMailbox grpcSendingMailbox =
+ (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId,
deadlineMs);
+ GrpcReceivingMailbox grpcReceivingMailbox =
+ (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+ CountDownLatch bufferSizeLatch = new CountDownLatch(bufferSize);
+ CountDownLatch bufferSizePlusOneLatch = new CountDownLatch(bufferSize + 1);
+ CountDownLatch bufferSizePlusTwoLatch = new CountDownLatch(bufferSize + 2);
+ CountDownLatch bufferSizePlusThreeLatch = new CountDownLatch(bufferSize +
3);
+ Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+ @Override
+ public void accept(MailboxIdentifier mailboxIdentifier) {
+ bufferSizeLatch.countDown();
+ bufferSizePlusOneLatch.countDown();
+ bufferSizePlusTwoLatch.countDown();
+ bufferSizePlusThreeLatch.countDown();
+ }
+ };
+ _mail2GotData.set(callback);
+
+ // Sending mailbox will not be blocked if receiver buffer is full
+ for (int i = 0; i < blocksSent; i++) {
+ grpcSendingMailbox.send(getTestTransferableBlock());
+ }
+ grpcSendingMailbox.complete();
+
+ // Ensure that the buffer is completely filled
+ Assert.assertTrue(bufferSizeLatch.await(1, TimeUnit.SECONDS));
+ // Wait for the buffer offer to fail. After it fails, gotMail callback
will be called once more in onNext
+ Assert.assertTrue(bufferSizePlusOneLatch.await(queryTimeoutMs + 1_000,
TimeUnit.MILLISECONDS));
+ // Since buffer offer fails after the stream deadline has already been
reached,
+ // MailboxContentStreamObserver#onError will be called
+ Assert.assertTrue(bufferSizePlusTwoLatch.await(1, TimeUnit.SECONDS));
+ // gotMail callback will be called (bufferSize + 1) times from onNext and
once from onError, for a total of
+ // (bufferSize + 2) calls. The following latch await ensures that the
callback is never called more than that.
+ Assert.assertFalse(bufferSizePlusThreeLatch.await(1, TimeUnit.SECONDS));
+
+ // Ensure that a error-block is returned by the receiving mailbox.
+ TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertTrue(receivedBlock.isErrorBlock());
+ Map<Integer, String> exceptions =
receivedBlock.getDataBlock().getExceptions();
+ Assert.assertTrue(exceptions.size() > 0);
+ Assert.assertTrue(exceptions.values().iterator().next().contains("Timed
out offering to the receivingBuffer"));
+ }
+
+ /**
+ * This test ensures that when a stream is cancelled by the receiver, any
future sends by the sender will throw.
+ */
+ @Test
+ public void testStreamCancellationByReceiver()
+ throws Exception {
+ // set a large deadline
+ long deadlineMs = System.currentTimeMillis() + 120_000;
+ JsonMailboxIdentifier mailboxId = createMailboxId("recv-cancel");
+
+ GrpcSendingMailbox grpcSendingMailbox =
+ (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId,
deadlineMs);
+ GrpcReceivingMailbox grpcReceivingMailbox =
+ (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+ CountDownLatch receivedDataLatch = new CountDownLatch(1);
+ Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+ @Override
+ public void accept(MailboxIdentifier mailboxIdentifier) {
+ receivedDataLatch.countDown();
+ }
+ };
+ _mail2GotData.set(callback);
+
+ // Send and receive 1 data block to ensure stream is established
+ grpcSendingMailbox.send(getTestTransferableBlock());
+ Assert.assertTrue(receivedDataLatch.await(1, TimeUnit.SECONDS));
+ TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+ // Receiver issues a cancellation
+ grpcReceivingMailbox.cancel();
+
+ // Send from sender will now throw. We await a few milliseconds since
cancellation may have a lag in getting
+ // processed at the other side.
+ CountDownLatch neverEndingLatch = new CountDownLatch(1);
+ try {
+ Assert.assertFalse(neverEndingLatch.await(100, TimeUnit.MILLISECONDS));
+ grpcSendingMailbox.send(getTestTransferableBlock());
+ Assert.fail("Send call above should have thrown since the stream is
cancelled");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Called send when stream is
already closed"));
+ }
+ }
+
+ @Test
+ public void testStreamCancellationBySender()
+ throws Exception {
+ // set a large deadline
+ long deadlineMs = System.currentTimeMillis() + 120_000;
+ JsonMailboxIdentifier mailboxId = createMailboxId("sender-cancel");
+
+ GrpcSendingMailbox grpcSendingMailbox =
+ (GrpcSendingMailbox) _mailboxService1.getSendingMailbox(mailboxId,
deadlineMs);
+ GrpcReceivingMailbox grpcReceivingMailbox =
+ (GrpcReceivingMailbox) _mailboxService2.getReceivingMailbox(mailboxId);
+
+ CountDownLatch receivedDataLatch = new CountDownLatch(1);
+ Consumer<MailboxIdentifier> callback = new Consumer<MailboxIdentifier>() {
+ @Override
+ public void accept(MailboxIdentifier mailboxIdentifier) {
+ receivedDataLatch.countDown();
+ }
+ };
+ _mail2GotData.set(callback);
+
+ // Send and receive 1 data block to ensure stream is established
+ grpcSendingMailbox.send(getTestTransferableBlock());
+ Assert.assertTrue(receivedDataLatch.await(1, TimeUnit.SECONDS));
+ TransferableBlock receivedBlock = grpcReceivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+ // Sender issues a cancellation
+ grpcSendingMailbox.cancel(new RuntimeException("foo"));
+
+ // receiving mailbox should return a error-block
+ CountDownLatch neverEndingLatch = new CountDownLatch(1);
+ Assert.assertFalse(neverEndingLatch.await(100, TimeUnit.MILLISECONDS));
+ receivedBlock = grpcReceivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertTrue(receivedBlock.isErrorBlock());
+ }
+
+ private JsonMailboxIdentifier createMailboxId(String jobId) {
+ return new JsonMailboxIdentifier(
+ jobId,
+ new VirtualServerAddress("localhost",
_mailboxService1.getMailboxPort(), 0),
+ new VirtualServerAddress("localhost",
_mailboxService2.getMailboxPort(), 0),
+ DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
+ }
+
private TransferableBlock getTestTransferableBlock() {
List<Object[]> rows = new ArrayList<>();
rows.add(createRow(0, "test_string"));
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
index 0c1351fd04..78edcec092 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/InMemoryMailboxServiceTest.java
@@ -20,11 +20,13 @@ package org.apache.pinot.query.mailbox;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.datablock.DataBlock;
-import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -33,6 +35,12 @@ public class InMemoryMailboxServiceTest {
private static final int DEFAULT_SENDER_STAGE_ID = 0;
private static final int DEFAULT_RECEIVER_STAGE_ID = 1;
+ private static final JsonMailboxIdentifier MAILBOX_ID = new
JsonMailboxIdentifier(
+ String.format("%s_%s", 1234, DEFAULT_RECEIVER_STAGE_ID),
+ new VirtualServerAddress("localhost", 0, 0),
+ new VirtualServerAddress("localhost", 0, 0),
+ DEFAULT_SENDER_STAGE_ID,
+ DEFAULT_RECEIVER_STAGE_ID);
private static final DataSchema TEST_DATA_SCHEMA = new DataSchema(new
String[]{"foo", "bar"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING});
private static final int NUM_ENTRIES = 5;
@@ -40,13 +48,12 @@ public class InMemoryMailboxServiceTest {
@Test
public void testHappyPath()
throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
- final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
- "happyPathJob", new VirtualServerAddress("localhost", 0, 0), new
VirtualServerAddress("localhost", 0, 0),
- DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
InMemoryReceivingMailbox receivingMailbox = (InMemoryReceivingMailbox)
mailboxService.getReceivingMailbox(
- mailboxId);
- InMemorySendingMailbox sendingMailbox = (InMemorySendingMailbox)
mailboxService.getSendingMailbox(mailboxId);
+ MAILBOX_ID);
+ InMemorySendingMailbox sendingMailbox =
+ (InMemorySendingMailbox) mailboxService.getSendingMailbox(MAILBOX_ID,
deadlineMs);
// Sends are non-blocking as long as channel capacity is not breached
for (int i = 0; i < NUM_ENTRIES; i++) {
@@ -77,14 +84,15 @@ public class InMemoryMailboxServiceTest {
*/
@Test
public void testNonLocalMailboxId() {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
- final JsonMailboxIdentifier mailboxId = new JsonMailboxIdentifier(
+ final JsonMailboxIdentifier nonLocalMailboxId = new JsonMailboxIdentifier(
"happyPathJob", new VirtualServerAddress("localhost", 0, 0), new
VirtualServerAddress("localhost", 1, 0),
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID);
// Test getReceivingMailbox
try {
- mailboxService.getReceivingMailbox(mailboxId);
+ mailboxService.getReceivingMailbox(nonLocalMailboxId);
Assert.fail("Method call above should have failed");
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("non-local transport"));
@@ -92,16 +100,109 @@ public class InMemoryMailboxServiceTest {
// Test getSendingMailbox
try {
- mailboxService.getSendingMailbox(mailboxId);
+ mailboxService.getSendingMailbox(nonLocalMailboxId, deadlineMs);
Assert.fail("Method call above should have failed");
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("non-local transport"));
}
}
+ @Test
+ public void testInMemoryStreamCancellationByReceiver()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
+ InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
+
+ SendingMailbox<TransferableBlock> sendingMailbox =
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
+ ReceivingMailbox<TransferableBlock> receivingMailbox =
mailboxService.getReceivingMailbox(MAILBOX_ID);
+
+ // Send and receive one data block
+ sendingMailbox.send(getTestTransferableBlock(0, false));
+ TransferableBlock receivedBlock = receivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+ receivingMailbox.cancel();
+
+ // After the stream is cancelled, sender will start seeing errors
+ try {
+ sendingMailbox.send(getTestTransferableBlock(1, false));
+ Assert.fail("Method call above should have failed");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("cancelled InMemory"));
+ }
+
+ // Cancel is idempotent for both sending and receiving mailbox so safe to
call multiple times
+ receivingMailbox.cancel();
+ sendingMailbox.cancel(new RuntimeException("foo"));
+ }
+
+ @Test
+ public void testInMemoryStreamCancellationBySender()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
+ InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
+
+ SendingMailbox<TransferableBlock> sendingMailbox =
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
+ ReceivingMailbox<TransferableBlock> receivingMailbox =
mailboxService.getReceivingMailbox(MAILBOX_ID);
+
+ // Send and receive one data block
+ sendingMailbox.send(getTestTransferableBlock(0, false));
+ TransferableBlock receivedBlock = receivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+ sendingMailbox.cancel(new RuntimeException("foo"));
+
+ // After the stream is cancelled, receiver will get error-blocks
+ receivedBlock = receivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertTrue(receivedBlock.isErrorBlock());
+
+ // Cancel is idempotent for both sending and receiving mailbox so safe to
call multiple times
+ sendingMailbox.cancel(new RuntimeException("foo"));
+ receivingMailbox.cancel();
+ }
+
+ @Test
+ public void testInMemoryStreamTimeOut()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 1000;
+ InMemoryMailboxService mailboxService = new
InMemoryMailboxService("localhost", 0, ignored -> { });
+
+ SendingMailbox<TransferableBlock> sendingMailbox =
mailboxService.getSendingMailbox(MAILBOX_ID, deadlineMs);
+ ReceivingMailbox<TransferableBlock> receivingMailbox =
mailboxService.getReceivingMailbox(MAILBOX_ID);
+
+ // Send and receive one data block
+ sendingMailbox.send(getTestTransferableBlock(0, false));
+ TransferableBlock receivedBlock = receivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertEquals(receivedBlock.getNumRows(), 1);
+
+ CountDownLatch neverEndingLatch = new CountDownLatch(1);
+ Assert.assertFalse(neverEndingLatch.await(1, TimeUnit.SECONDS));
+
+ // Sends for the mailbox will throw
+ try {
+ sendingMailbox.send(getTestTransferableBlock(0, false));
+ Assert.fail("Method call above should have failed");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Deadline"));
+ }
+
+ // Receiver will receive error-blocks after stream timeout
+ receivedBlock = receivingMailbox.receive();
+ Assert.assertNotNull(receivedBlock);
+ Assert.assertTrue(receivedBlock.isErrorBlock());
+
+ // Cancel will be a no-op and will not throw.
+ sendingMailbox.cancel(new RuntimeException("foo"));
+ receivingMailbox.cancel();
+ }
+
private TransferableBlock getTestTransferableBlock(int index, boolean
isEndOfStream) {
if (isEndOfStream) {
- return new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock());
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
}
List<Object[]> rows = new ArrayList<>(index);
rows.add(new Object[]{index, "test_data"});
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
index 01232aa03e..89d5e722b3 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/MultiplexingMailboxServiceTest.java
@@ -45,11 +45,11 @@ public class MultiplexingMailboxServiceTest {
Mockito.doReturn(1000).when(grpcMailboxService).getMailboxPort();
Mockito.doReturn(1000).when(inMemoryMailboxService).getMailboxPort();
Mockito.doReturn(Mockito.mock(InMemorySendingMailbox.class)).when(inMemoryMailboxService).getSendingMailbox(
- Mockito.any());
+ Mockito.any(), Mockito.anyLong());
Mockito.doReturn(Mockito.mock(InMemoryReceivingMailbox.class)).when(inMemoryMailboxService).getReceivingMailbox(
Mockito.any());
Mockito.doReturn(Mockito.mock(GrpcSendingMailbox.class)).when(grpcMailboxService).getSendingMailbox(
- Mockito.any());
+ Mockito.any(), Mockito.anyLong());
Mockito.doReturn(Mockito.mock(GrpcReceivingMailbox.class)).when(grpcMailboxService).getReceivingMailbox(
Mockito.any());
@@ -66,8 +66,8 @@ public class MultiplexingMailboxServiceTest {
Assert.assertEquals("localhost", multiplexService.getHostname());
Assert.assertEquals(1000, multiplexService.getMailboxPort());
- Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID)
instanceof InMemorySendingMailbox);
- Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID)
instanceof GrpcSendingMailbox);
+ Assert.assertTrue(multiplexService.getSendingMailbox(LOCAL_MAILBOX_ID, -1)
instanceof InMemorySendingMailbox);
+ Assert.assertTrue(multiplexService.getSendingMailbox(NON_LOCAL_MAILBOX_ID,
-1) instanceof GrpcSendingMailbox);
Assert.assertTrue(multiplexService.getReceivingMailbox(LOCAL_MAILBOX_ID)
instanceof InMemoryReceivingMailbox);
Assert.assertTrue(multiplexService.getReceivingMailbox(NON_LOCAL_MAILBOX_ID)
instanceof GrpcReceivingMailbox);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index aea548607a..4ba8090e09 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -183,7 +183,7 @@ public class OpChainSchedulerServiceTest {
}
@Test
- public void shouldCallCloseOnOperatorsThatReturnErrorBlock()
+ public void shouldCallCancelOnOperatorsThatReturnErrorBlock()
throws InterruptedException {
initExecutor(1);
OpChain opChain = getChain(_operatorA);
@@ -196,7 +196,7 @@ public class OpChainSchedulerServiceTest {
Mockito.doAnswer(inv -> {
latch.countDown();
return null;
- }).when(_operatorA).close();
+ }).when(_operatorA).cancel(Mockito.any());
scheduler.startAsync().awaitRunning();
scheduler.register(opChain);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index b916b5cc5e..ea7df49829 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -64,7 +64,8 @@ public class MailboxSendOperatorTest {
@BeforeMethod
public void setUp() {
_mocks = MockitoAnnotations.openMocks(this);
- Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any()))
+ Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(),
+ Mockito.anyLong()))
.thenReturn(_exchange);
Mockito.when(_server.getHostname()).thenReturn("mock");
@@ -79,13 +80,15 @@ public class MailboxSendOperatorTest {
}
@Test
- public void shouldSwallowNoOpBlockFromUpstream() {
+ public void shouldSwallowNoOpBlockFromUpstream()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
MailboxSendOperator operator = new MailboxSendOperator(_mailboxService,
_input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2",
DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1,
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
- new VirtualServerAddress(_server));
+ new VirtualServerAddress(_server), deadlineMs);
Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
// When:
@@ -97,13 +100,15 @@ public class MailboxSendOperatorTest {
}
@Test
- public void shouldSendErrorBlock() {
+ public void shouldSendErrorBlock()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
MailboxSendOperator operator = new MailboxSendOperator(_mailboxService,
_input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2",
DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1,
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
- new VirtualServerAddress(_server));
+ new VirtualServerAddress(_server), deadlineMs);
TransferableBlock errorBlock =
TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"));
Mockito.when(_input.nextBlock()).thenReturn(errorBlock);
@@ -116,13 +121,15 @@ public class MailboxSendOperatorTest {
}
@Test
- public void shouldSendErrorBlockWhenInputThrows() {
+ public void shouldSendErrorBlockWhenInputThrows()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
MailboxSendOperator operator = new MailboxSendOperator(_mailboxService,
_input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2",
DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1,
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
- new VirtualServerAddress(_server));
+ new VirtualServerAddress(_server), deadlineMs);
Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!"));
ArgumentCaptor<TransferableBlock> captor =
ArgumentCaptor.forClass(TransferableBlock.class);
@@ -136,13 +143,15 @@ public class MailboxSendOperatorTest {
}
@Test
- public void shouldSendEosBlock() {
+ public void shouldSendEosBlock()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
MailboxSendOperator operator = new MailboxSendOperator(_mailboxService,
_input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2",
DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1,
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
- new VirtualServerAddress(_server));
+ new VirtualServerAddress(_server), deadlineMs);
TransferableBlock eosBlock =
TransferableBlockUtils.getEndOfStreamTransferableBlock();
Mockito.when(_input.nextBlock()).thenReturn(eosBlock);
@@ -155,13 +164,15 @@ public class MailboxSendOperatorTest {
}
@Test
- public void shouldSendDataBlock() {
+ public void shouldSendDataBlock()
+ throws Exception {
+ long deadlineMs = System.currentTimeMillis() + 10_000;
// Given:
MailboxSendOperator operator = new MailboxSendOperator(_mailboxService,
_input, ImmutableList.of(_server),
RelDistribution.Type.HASH_DISTRIBUTED, _selector,
server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2",
DEFAULT_SENDER_STAGE_ID,
DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1,
DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID,
- new VirtualServerAddress(_server));
+ new VirtualServerAddress(_server), deadlineMs);
TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new
DataSchema.ColumnDataType[]{}));
Mockito.when(_input.nextBlock()).thenReturn(dataBlock)
.thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index 033f010a8c..6afdf89a03 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -58,7 +58,8 @@ public class BlockExchangeTest {
}
@Test
- public void shouldSendEosBlockToAllDestinations() {
+ public void shouldSendEosBlockToAllDestinations()
+ throws Exception {
// Given:
List<SendingMailbox<TransferableBlock>> destinations =
ImmutableList.of(_mailbox1, _mailbox2);
BlockExchange exchange = new TestBlockExchange(destinations);
@@ -78,7 +79,8 @@ public class BlockExchangeTest {
}
@Test
- public void shouldSendDataBlocksOnlyToTargetDestination() {
+ public void shouldSendDataBlocksOnlyToTargetDestination()
+ throws Exception {
// Given:
List<SendingMailbox<TransferableBlock>> destinations =
ImmutableList.of(_mailbox1);
BlockExchange exchange = new TestBlockExchange(destinations);
@@ -97,7 +99,8 @@ public class BlockExchangeTest {
}
@Test
- public void shouldSplitBlocks() {
+ public void shouldSplitBlocks()
+ throws Exception {
// Given:
List<SendingMailbox<TransferableBlock>> destinations =
ImmutableList.of(_mailbox1);
@@ -138,7 +141,8 @@ public class BlockExchangeTest {
}
@Override
- protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block) {
+ protected void route(List<SendingMailbox<TransferableBlock>> destinations,
TransferableBlock block)
+ throws Exception {
for (SendingMailbox mailbox : destinations) {
sendBlock(mailbox, block);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
index b39d8e43fb..f815e00b78 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BroadcastExchangeTest.java
@@ -57,7 +57,8 @@ public class BroadcastExchangeTest {
}
@Test
- public void shouldBroadcast() {
+ public void shouldBroadcast()
+ throws Exception {
// Given:
ImmutableList<SendingMailbox<TransferableBlock>> destinations =
ImmutableList.of(_mailbox1, _mailbox2);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
index 50b8b4bdc9..1e140553ca 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java
@@ -63,7 +63,8 @@ public class HashExchangeTest {
}
@Test
- public void shouldSplitAndRouteBlocksBasedOnPartitionKey() {
+ public void shouldSplitAndRouteBlocksBasedOnPartitionKey()
+ throws Exception {
// Given:
TestSelector selector = new TestSelector(Iterators.forArray(2, 0, 1));
Mockito.when(_block.getContainer()).thenReturn(ImmutableList.of(new
Object[]{0}, new Object[]{1}, new Object[]{2}));
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
index ab97ea6196..aca4180174 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/RandomExchangeTest.java
@@ -56,7 +56,8 @@ public class RandomExchangeTest {
}
@Test
- public void shouldRouteRandomly() {
+ public void shouldRouteRandomly()
+ throws Exception {
// Given:
ImmutableList<SendingMailbox<TransferableBlock>> destinations =
ImmutableList.of(_mailbox1, _mailbox2);
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
index 09855be2a4..3a964494ec 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/SingletonExchangeTest.java
@@ -54,7 +54,8 @@ public class SingletonExchangeTest {
}
@Test
- public void shouldRouteSingleton() {
+ public void shouldRouteSingleton()
+ throws Exception {
// Given:
ImmutableList<SendingMailbox<TransferableBlock>> destinations =
ImmutableList.of(_mailbox1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]