This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ef36e829412 Fix resource tracking for gRPC ReceivingMailbox (#17797)
ef36e829412 is described below
commit ef36e829412b3f535e941325277092e54b93aac3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 3 14:13:38 2026 -0800
Fix resource tracking for gRPC ReceivingMailbox (#17797)
---
.../pinot/query/mailbox/ReceivingMailbox.java | 93 +++++++++++++++++++---
.../operator/BaseMailboxReceiveOperator.java | 6 +-
.../pinot/query/mailbox/ReceivingMailboxTest.java | 71 +++++++++++++----
3 files changed, 144 insertions(+), 26 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 4cc69962539..0abf4a782c8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.mailbox;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.ThreadSafe;
import java.nio.ByteBuffer;
@@ -39,22 +40,27 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SerializedDataBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.segment.spi.memory.DataBuffer;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
+import org.apache.pinot.spi.accounting.TrackingScope;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.TerminationException;
+import org.apache.pinot.spi.query.QueryExecutionContext;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/// Mailbox that's used to receive data. Ownership of the ReceivingMailbox is
with the MailboxService, which is unlike
-/// the [SendingMailbox] whose ownership lies with the send operator. This is
because the ReceivingMailbox can be
-/// initialized even before the corresponding OpChain is registered on the
receiver, whereas the SendingMailbox is
+
+/// Mailbox that's used to receive data. Ownership of the [ReceivingMailbox]
is with the [MailboxService], unlike the
+/// [SendingMailbox] whose ownership lies with the send operator. This is
because the [ReceivingMailbox] can be
+/// initialized even before the corresponding OpChain is registered on the
receiver, whereas the [SendingMailbox] is
/// initialized when the send operator is running.
///
-/// There is a single ReceivingMailbox for each pair of (sender, receiver).
This means that each receive operator will
-/// have multiple ReceivingMailbox instances, one for each sender. They are
coordinated by a
+/// There is a single [ReceivingMailbox] for each pair of (sender, receiver).
This means that each receive operator will
+/// have multiple [ReceivingMailbox] instances, one for each sender. They are
coordinated by a
///
[org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer].
///
-/// A ReceivingMailbox can have at most one reader and one writer at any given
time. This means that different threads
+/// A [ReceivingMailbox] can have at most one reader and one writer at any
given time. This means that different threads
/// writing to the same mailbox must be externally synchronized.
///
/// The offer methods will be called when new blocks are received from
different sources. For example local workers will
@@ -63,6 +69,23 @@ import org.slf4j.LoggerFactory;
///
/// All exceptions thrown from the offer methods should be handled within this
class, and converted into a proper error
/// block to be consumed by the reader.
+///
+/// Resource tracking (CPU / memory usage):
+///
+/// - For blocks received from local workers:
+/// [#offer(MseBlock, List, long)] is invoked by the sender's query
execution thread.
+/// Since this thread is already associated with the query, resource usage
is tracked automatically.
+///
+/// - For blocks received from remote workers:
+/// [#offerRaw(List, long)] is invoked by a shared gRPC executor thread.
+/// Because this thread is shared across multiple queries, resource usage is
not tracked by default.
+///
+/// To enable tracking,
[#registerReceiveOperatorThreadContext(QueryThreadContext)] can be used to
register the
+/// mailbox receive operator’s [QueryThreadContext].
+///
+/// NOTE:
+/// Blocks may arrive before or after the mailbox receive operator is
constructed. Therefore, resource usage must be
+/// accumulated even before the [QueryThreadContext] is registered, and then
reconciled when registration occurs.
@ThreadSafe
public class ReceivingMailbox {
public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
@@ -74,10 +97,15 @@ public class ReceivingMailbox {
// TODO: Apply backpressure at the sender side when the queue is full.
/// The queue where blocks are going to be stored.
private final CancellableBlockingQueue _blocks;
- private long _lastArriveTime = System.currentTimeMillis();
private final StatMap<StatKey> _stats = new StatMap<>(StatKey.class);
+ /// Following variables are protected with synchronized block.
+ private long _lastArriveTime = System.currentTimeMillis();
+ private QueryThreadContext _receiveOperatorThreadContext;
+ private long _untrackedCpuTimeNs;
+ private long _untrackedAllocatedBytes;
+
public ReceivingMailbox(String id, int maxPendingBlocks) {
_id = id;
_blocks = new CancellableBlockingQueue(id, maxPendingBlocks);
@@ -91,14 +119,43 @@ public class ReceivingMailbox {
_blocks.registerReader(reader);
}
+ /// Registers the [QueryThreadContext] of the mailbox receive operator.
+ public synchronized void registerReceiveOperatorThreadContext(@Nullable
QueryThreadContext threadContext) {
+ assert _receiveOperatorThreadContext == null;
+ // NOTE: In production code, threadContext should never be null. It might
be null in tests when QueryThreadContext
+ // is not set up.
+ if (threadContext == null) {
+ return;
+ }
+ _receiveOperatorThreadContext = threadContext;
+ if (_untrackedCpuTimeNs > 0 || _untrackedAllocatedBytes > 0) {
+ updateResourceUsage(threadContext, _untrackedCpuTimeNs,
_untrackedAllocatedBytes);
+ _untrackedCpuTimeNs = 0;
+ _untrackedAllocatedBytes = 0;
+ }
+ }
+
+ @VisibleForTesting
+ void updateResourceUsage(QueryThreadContext threadContext, long cpuTimeNs,
long allocatedBytes) {
+ ThreadAccountant accountant = threadContext.getAccountant();
+ QueryExecutionContext executionContext =
threadContext.getExecutionContext();
+ accountant.updateUntrackedResourceUsage(executionContext.getCid(),
cpuTimeNs, allocatedBytes, TrackingScope.QUERY);
+
accountant.updateUntrackedResourceUsage(executionContext.getWorkloadName(),
cpuTimeNs, allocatedBytes,
+ TrackingScope.WORKLOAD);
+ }
+
public String getId() {
return _id;
}
/// Offers a raw block into the mailbox within the timeout specified,
returns the status of the mailbox.
+ ///
+ /// NOTE:
+ /// This method is executed by a shared gRPC executor thread rather than a
query execution thread.
+ /// Therefore, CPU and memory usage must be tracked explicitly and
independently.
public ReceivingMailboxStatus offerRaw(List<ByteBuffer> byteBuffers, long
timeoutMs) {
+ ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
updateWaitCpuTime();
-
MseBlock block;
List<DataBuffer> stats;
try {
@@ -129,7 +186,12 @@ public class ReceivingMailbox {
}
} catch (Exception e) {
// Use the terminate exception when query is explicitly terminated.
- TerminationException terminateException =
QueryThreadContext.getTerminateException();
+ TerminationException terminateException = null;
+ synchronized (this) {
+ if (_receiveOperatorThreadContext != null) {
+ terminateException =
_receiveOperatorThreadContext.getExecutionContext().getTerminateException();
+ }
+ }
if (terminateException != null) {
block = ErrorMseBlock.fromException(terminateException);
} else {
@@ -139,7 +201,18 @@ public class ReceivingMailbox {
}
stats = List.of();
}
- return offerPrivate(block, stats, timeoutMs);
+ ReceivingMailboxStatus status = offerPrivate(block, stats, timeoutMs);
+ long cpuTimeNs = resourceSnapshot.getCpuTimeNs();
+ long allocatedBytes = resourceSnapshot.getAllocatedBytes();
+ synchronized (this) {
+ if (_receiveOperatorThreadContext != null) {
+ updateResourceUsage(_receiveOperatorThreadContext, cpuTimeNs,
allocatedBytes);
+ } else {
+ _untrackedCpuTimeNs += cpuTimeNs;
+ _untrackedAllocatedBytes += allocatedBytes;
+ }
+ }
+ return status;
}
/// Offers a block into the mailbox within the timeout specified, returns
the status of the mailbox.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 54cce0d78cf..635fd3ace2a 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -34,6 +34,7 @@ import
org.apache.pinot.query.runtime.operator.utils.AsyncStream;
import
org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
/**
@@ -72,8 +73,9 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
List<ReadMailboxAsyncStream> asyncStreams = new
ArrayList<>(numMailboxes);
_receivingStats = new ArrayList<>(numMailboxes);
for (String mailboxId : _mailboxIds) {
- ReadMailboxAsyncStream asyncStream =
- new
ReadMailboxAsyncStream(_mailboxService.getReceivingMailbox(mailboxId), this);
+ ReceivingMailbox receivingMailbox =
_mailboxService.getReceivingMailbox(mailboxId);
+
receivingMailbox.registerReceiveOperatorThreadContext(QueryThreadContext.getIfAvailable());
+ ReadMailboxAsyncStream asyncStream = new
ReadMailboxAsyncStream(receivingMailbox, this);
asyncStreams.add(asyncStream);
_receivingStats.add(asyncStream._mailbox.getStatMap());
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
index 063d4a1cc20..4074a1bd060 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
@@ -25,20 +25,21 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.mockito.Mockito;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
public class ReceivingMailboxTest {
@@ -48,9 +49,15 @@ public class ReceivingMailboxTest {
private static final MseBlock.Data DATA_BLOCK = new
RowHeapDataBlock(List.of(), DATA_SCHEMA, null);
private ReceivingMailbox.Reader _reader;
- @BeforeMethod
+ @BeforeClass
public void setUp() {
- _reader = Mockito.mock(ReceivingMailbox.Reader.class);
+ ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+ }
+
+ @BeforeMethod
+ public void setUpMethod() {
+ _reader = mock(ReceivingMailbox.Reader.class);
}
@Test
@@ -108,10 +115,10 @@ public class ReceivingMailboxTest {
ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
receivingMailbox.registeredReader(_reader);
- MseBlock[] offeredBlocks = new MseBlock[] {
- new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
- new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
- new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+ MseBlock[] offeredBlocks = new MseBlock[]{
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+ new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
};
for (MseBlock block : offeredBlocks) {
ReceivingMailbox.ReceivingMailboxStatus status =
receivingMailbox.offer(block, List.of(), 10);
@@ -132,7 +139,7 @@ public class ReceivingMailboxTest {
ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
receivingMailbox.registeredReader(_reader);
- MseBlock[] offeredBlocks = new MseBlock[] {
+ MseBlock[] offeredBlocks = new MseBlock[]{
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
@@ -172,7 +179,7 @@ public class ReceivingMailboxTest {
ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
receivingMailbox.registeredReader(_reader);
- MseBlock[] offeredBlocks = new MseBlock[] {
+ MseBlock[] offeredBlocks = new MseBlock[]{
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
@@ -201,7 +208,7 @@ public class ReceivingMailboxTest {
receivingMailbox.registeredReader(_reader);
ErrorMseBlock errorBlock = ErrorMseBlock.fromException(new
RuntimeException("Test error"));
- MseBlock[] offeredBlocks = new MseBlock[] {
+ MseBlock[] offeredBlocks = new MseBlock[]{
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
@@ -328,4 +335,40 @@ public class ReceivingMailboxTest {
ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> receivingMailbox.offer(block,
List.of(), 10_000), executor);
}
+
+ @Test
+ public void testResourceTracking()
+ throws Exception {
+ QueryThreadContext threadContext = mock(QueryThreadContext.class);
+
+ // Receive after setting thread context
+ TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id",
threadContext);
+ receivingMailbox.registerReceiveOperatorThreadContext(threadContext);
+
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
+ assertTrue(receivingMailbox._resourceUsageUpdated);
+
+ // Receive before setting thread context
+ receivingMailbox = new TestReceivingMailbox("id", threadContext);
+
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
10_000);
+ receivingMailbox.registerReceiveOperatorThreadContext(threadContext);
+ assertTrue(receivingMailbox._resourceUsageUpdated);
+ }
+
+ private static class TestReceivingMailbox extends ReceivingMailbox {
+ final QueryThreadContext _expectedThreadContext;
+ boolean _resourceUsageUpdated;
+
+ public TestReceivingMailbox(String id, QueryThreadContext
expectedThreadContext) {
+ super(id);
+ _expectedThreadContext = expectedThreadContext;
+ }
+
+ @Override
+ void updateResourceUsage(QueryThreadContext threadContext, long cpuTimeNs,
long allocatedBytes) {
+ assertSame(threadContext, _expectedThreadContext);
+ assertTrue(cpuTimeNs > 0);
+ assertTrue(allocatedBytes > 0);
+ _resourceUsageUpdated = true;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]