walterddr commented on code in PR #10322:
URL: https://github.com/apache/pinot/pull/10322#discussion_r1126672929
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
*/
package org.apache.pinot.query.mailbox;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be
cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
*
- * <p>It maintains a collection of connected mailbox servers and clients to
remote hosts. All indexed by the
- * mailboxID in the format of:
<code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ * <li>When the OpChain corresponding to the receiver was never
registered.</li>
+ * <li>When the receiving OpChain exited before data was sent for the first
time by the sender.</li>
+ * </ol>
*
- * <p>Connections are established/initiated from the sender side and only
tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on
a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- * <li>the latter part of the mailboxID consist of the channelID.</li>
- * <li>the job_id should be uniquely identifying a send/receving pair, for
example if one bundle job requires
- * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to
distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a
time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel
to ensure the underlying stream is also
+ * released.
*/
public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcMailboxService.class);
// channel manager
+ private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY =
Duration.ofMinutes(5);
private final ChannelManager _channelManager;
private final String _hostname;
private final int _mailboxPort;
- // maintaining a list of registered mailboxes.
- private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>>
_receivingMailboxMap =
- new ConcurrentHashMap<>();
+ // We use a cache to ensure that the receiving mailbox and the underlying
gRPC stream are not leaked in the cases
+ // where the corresponding OpChain is either never registered or died before
the sender sent data for the first time.
+ private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<String, GrpcReceivingMailbox>()
{
+ @Override
+ public void onRemoval(RemovalNotification<String,
GrpcReceivingMailbox> notification) {
+ if (notification.wasEvicted()) {
+ // TODO: This should be tied with query deadline, but for that
we need to know the query deadline
+ // when the GrpcReceivingMailbox is initialized in
MailboxContentStreamObserver.
+ LOGGER.info("Removing dangling GrpcReceivingMailbox: {}",
notification.getKey());
+ notification.getValue().cancel();
Review Comment:
cancel needs to be idempotent. there's a chance it got called multiple
times.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java:
##########
@@ -18,45 +18,62 @@
*/
package org.apache.pinot.query.mailbox;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import io.grpc.ManagedChannel;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.apache.pinot.common.proto.Mailbox;
+import javax.annotation.Nullable;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.MailboxStatusStreamObserver;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * GRPC-based implementation of {@link MailboxService}.
+ * GRPC-based implementation of {@link MailboxService}. Note that there can be
cases where the ReceivingMailbox
+ * and/or the underlying connection can be leaked:
*
- * <p>It maintains a collection of connected mailbox servers and clients to
remote hosts. All indexed by the
- * mailboxID in the format of:
<code>"jobId:partitionKey:senderHost:senderPort:receiverHost:receiverPort"</code>
+ * <ol>
+ * <li>When the OpChain corresponding to the receiver was never
registered.</li>
+ * <li>When the receiving OpChain exited before data was sent for the first
time by the sender.</li>
+ * </ol>
*
- * <p>Connections are established/initiated from the sender side and only
tier-down from the sender side as well.
- * In the event of exception or timed out, the connection is cloased based on
a mutually agreed upon timeout period
- * after the last successful message sent/received.
- *
- * <p>Noted that:
- * <ul>
- * <li>the latter part of the mailboxID consist of the channelID.</li>
- * <li>the job_id should be uniquely identifying a send/receving pair, for
example if one bundle job requires
- * to open 2 mailboxes, they should use {job_id}_1 and {job_id}_2 to
distinguish the 2 different mailbox.</li>
- * </ul>
+ * To handle these cases, we store the {@link ReceivingMailbox} entries in a
time-expiring cache. If there was a
+ * leak, the entry would be evicted, and in that case we also issue a cancel
to ensure the underlying stream is also
+ * released.
*/
public class GrpcMailboxService implements MailboxService<TransferableBlock> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcMailboxService.class);
// channel manager
+ private static final Duration DANGLING_RECEIVING_MAILBOX_EXPIRY =
Duration.ofMinutes(5);
private final ChannelManager _channelManager;
private final String _hostname;
private final int _mailboxPort;
- // maintaining a list of registered mailboxes.
- private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>>
_receivingMailboxMap =
- new ConcurrentHashMap<>();
+ // We use a cache to ensure that the receiving mailbox and the underlying
gRPC stream are not leaked in the cases
+ // where the corresponding OpChain is either never registered or died before
the sender sent data for the first time.
+ private final Cache<String, GrpcReceivingMailbox> _receivingMailboxCache =
+
CacheBuilder.newBuilder().expireAfterAccess(DANGLING_RECEIVING_MAILBOX_EXPIRY.toMinutes(),
TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<String, GrpcReceivingMailbox>()
{
+ @Override
+ public void onRemoval(RemovalNotification<String,
GrpcReceivingMailbox> notification) {
+ if (notification.wasEvicted()) {
+ // TODO: This should be tied with query deadline, but for that
we need to know the query deadline
+ // when the GrpcReceivingMailbox is initialized in
MailboxContentStreamObserver.
+ LOGGER.info("Removing dangling GrpcReceivingMailbox: {}",
notification.getKey());
Review Comment:
log level to warning? Ideally the opChain should cancel mailboxes. gradually
we should make it less depend on the cache removal.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
int getMailboxPort();
/**
- * Look up a receiving mailbox by {@link MailboxIdentifier}.
- *
- * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist
already, but it might not have been
- * initialized.
+ * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
*
* @param mailboxId mailbox identifier.
* @return a receiving mailbox.
*/
ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
/**
- * Look up a sending mailbox by {@link MailboxIdentifier}.
+ * Same as {@link #getReceivingMailbox} but this would return null if the
mailbox isn't already created.
+ */
+ @Nullable
+ ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier
mailboxId);
+
+ /**
+ * Return a sending-mailbox for the given {@link MailboxIdentifier}. The
returned {@link SendingMailbox} is
+ * uninitialized, i.e. it will not open the underlying channel or acquire
any additional resources. Instead the
+ * {@link SendingMailbox} will initialize lazily when the data is sent for
the first time through it.
*
* @param mailboxId mailbox identifier.
+ * @param deadlineMs deadline in milliseconds, which is usually the same as
the query deadline.
* @return a sending mailbox.
*/
- SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+ SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long
deadlineMs);
+
+ /**
+ * A {@link ReceivingMailbox} for a given {@link OpChain} may be created
before the OpChain is even registered.
+ * Reason being that the sender starts sending data, and the receiver starts
receiving the same without waiting for
+ * the OpChain to be registered. The ownership for the ReceivingMailbox
hence lies with the MailboxService and not
+ * the OpChain. There are two ways in which a MailboxService may release its
references to a ReceivingMailbox and
+ * the underlying resources:
+ *
+ * <ol>
+ * <li>
+ * If the OpChain corresponding to a ReceivingMailbox was closed or
cancelled. In that case,
+ * {@link MailboxReceiveOperator} will call this method as part of its
close/cancel call. This is the main
+ * reason why this method exists.
+ * </li>
+ * <li>
+ * There can be cases where the corresponding OpChain was never
registered with the scheduler. In that case, it
+ * is up to the {@link MailboxService} to ensure that there are no leaks
of resources. E.g. it could setup a
+ * periodic job to detect such mailbox and do any clean-up. Note that
for this case, it is not mandatory for
+ * the {@link MailboxService} to use this method. It can use any
internal method it needs to do the clean-up.
+ * </li>
+ * </ol>
+ * @param mailboxId
+ */
+ void releaseReceivingMailbox(MailboxIdentifier mailboxId);
Review Comment:
IMO.
1. `MailboxService` ALWAYS owns the `MailboxContentStreamObserver` (and thus
the `GRPCReceivingMailbox`)
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
int getMailboxPort();
/**
- * Look up a receiving mailbox by {@link MailboxIdentifier}.
- *
- * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist
already, but it might not have been
- * initialized.
+ * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
*
* @param mailboxId mailbox identifier.
* @return a receiving mailbox.
*/
ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
/**
- * Look up a sending mailbox by {@link MailboxIdentifier}.
+ * Same as {@link #getReceivingMailbox} but this would return null if the
mailbox isn't already created.
+ */
+ @Nullable
+ ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier
mailboxId);
Review Comment:
This API is not needed. make it private and annotate the above method as
Nullable.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -50,21 +55,51 @@
int getMailboxPort();
/**
- * Look up a receiving mailbox by {@link MailboxIdentifier}.
- *
- * <p>the acquired {@link ReceivingMailbox} will be constructed if not exist
already, but it might not have been
- * initialized.
+ * Return a {@link ReceivingMailbox} for the given {@link MailboxIdentifier}.
*
* @param mailboxId mailbox identifier.
* @return a receiving mailbox.
*/
ReceivingMailbox<T> getReceivingMailbox(MailboxIdentifier mailboxId);
/**
- * Look up a sending mailbox by {@link MailboxIdentifier}.
+ * Same as {@link #getReceivingMailbox} but this would return null if the
mailbox isn't already created.
+ */
+ @Nullable
+ ReceivingMailbox<T> getReceivingMailboxIfPresent(MailboxIdentifier
mailboxId);
+
+ /**
+ * Return a sending-mailbox for the given {@link MailboxIdentifier}. The
returned {@link SendingMailbox} is
+ * uninitialized, i.e. it will not open the underlying channel or acquire
any additional resources. Instead the
+ * {@link SendingMailbox} will initialize lazily when the data is sent for
the first time through it.
*
* @param mailboxId mailbox identifier.
+ * @param deadlineMs deadline in milliseconds, which is usually the same as
the query deadline.
* @return a sending mailbox.
*/
- SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId);
+ SendingMailbox<T> getSendingMailbox(MailboxIdentifier mailboxId, long
deadlineMs);
+
+ /**
+ * A {@link ReceivingMailbox} for a given {@link OpChain} may be created
before the OpChain is even registered.
+ * Reason being that the sender starts sending data, and the receiver starts
receiving the same without waiting for
+ * the OpChain to be registered. The ownership for the ReceivingMailbox
hence lies with the MailboxService and not
+ * the OpChain. There are two ways in which a MailboxService may release its
references to a ReceivingMailbox and
+ * the underlying resources:
+ *
+ * <ol>
+ * <li>
+ * If the OpChain corresponding to a ReceivingMailbox was closed or
cancelled. In that case,
+ * {@link MailboxReceiveOperator} will call this method as part of its
close/cancel call. This is the main
+ * reason why this method exists.
+ * </li>
+ * <li>
+ * There can be cases where the corresponding OpChain was never
registered with the scheduler. In that case, it
+ * is up to the {@link MailboxService} to ensure that there are no leaks
of resources. E.g. it could setup a
+ * periodic job to detect such mailbox and do any clean-up. Note that
for this case, it is not mandatory for
+ * the {@link MailboxService} to use this method. It can use any
internal method it needs to do the clean-up.
+ * </li>
+ * </ol>
+ * @param mailboxId
+ */
+ void releaseReceivingMailbox(MailboxIdentifier mailboxId);
Review Comment:
i think releaseReceivingMailbox should only invalidate the mailbox cached
inside the MailboxService.
MailboxOperator will have
1. info regarding the actual mailbox
2. the mailbox service that caches the mailboxID -> mailbox object mapping
entry
we need to make the ownership relation a bit more clear. to me with the
current model, cancel/close needs to be idempotent and states needs to be kept
and even though it is only done 1 once per mailbox. each call into these state
transition will have to be guarded by locks and it is not ideal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]