This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ef36e829412 Fix resource tracking for gRPC ReceivingMailbox (#17797)
ef36e829412 is described below

commit ef36e829412b3f535e941325277092e54b93aac3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Mar 3 14:13:38 2026 -0800

    Fix resource tracking for gRPC ReceivingMailbox (#17797)
---
 .../pinot/query/mailbox/ReceivingMailbox.java      | 93 +++++++++++++++++++---
 .../operator/BaseMailboxReceiveOperator.java       |  6 +-
 .../pinot/query/mailbox/ReceivingMailboxTest.java  | 71 +++++++++++++----
 3 files changed, 144 insertions(+), 26 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
index 4cc69962539..0abf4a782c8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/ReceivingMailbox.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.errorprone.annotations.ThreadSafe;
 import java.nio.ByteBuffer;
@@ -39,22 +40,27 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.SerializedDataBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
+import org.apache.pinot.spi.accounting.TrackingScope;
 import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.exception.TerminationException;
+import org.apache.pinot.spi.query.QueryExecutionContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/// Mailbox that's used to receive data. Ownership of the ReceivingMailbox is 
with the MailboxService, which is unlike
-/// the [SendingMailbox] whose ownership lies with the send operator. This is 
because the ReceivingMailbox can be
-/// initialized even before the corresponding OpChain is registered on the 
receiver, whereas the SendingMailbox is
+
+/// Mailbox that's used to receive data. Ownership of the [ReceivingMailbox] 
is with the [MailboxService], unlike the
+/// [SendingMailbox] whose ownership lies with the send operator. This is 
because the [ReceivingMailbox] can be
+/// initialized even before the corresponding OpChain is registered on the 
receiver, whereas the [SendingMailbox] is
 /// initialized when the send operator is running.
 ///
-/// There is a single ReceivingMailbox for each pair of (sender, receiver). 
This means that each receive operator will
-/// have multiple ReceivingMailbox instances, one for each sender. They are 
coordinated by a
+/// There is a single [ReceivingMailbox] for each pair of (sender, receiver). 
This means that each receive operator will
+/// have multiple [ReceivingMailbox] instances, one for each sender. They are 
coordinated by a
 /// 
[org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer].
 ///
-/// A ReceivingMailbox can have at most one reader and one writer at any given 
time. This means that different threads
+/// A [ReceivingMailbox] can have at most one reader and one writer at any 
given time. This means that different threads
 /// writing to the same mailbox must be externally synchronized.
 ///
 /// The offer methods will be called when new blocks are received from 
different sources. For example local workers will
@@ -63,6 +69,23 @@ import org.slf4j.LoggerFactory;
 ///
 /// All exceptions thrown from the offer methods should be handled within this 
class, and converted into a proper error
 /// block to be consumed by the reader.
+///
+/// Resource tracking (CPU / memory usage):
+///
+/// - For blocks received from local workers:
+///   [#offer(MseBlock, List, long)] is invoked by the sender's query 
execution thread.
+///   Since this thread is already associated with the query, resource usage 
is tracked automatically.
+///
+/// - For blocks received from remote workers:
+///   [#offerRaw(List, long)] is invoked by a shared gRPC executor thread.
+///   Because this thread is shared across multiple queries, resource usage is 
not tracked by default.
+///
+///   To enable tracking, 
[#registerReceiveOperatorThreadContext(QueryThreadContext)] can be used to 
register the
+///   mailbox receive operator’s [QueryThreadContext].
+///
+///   NOTE:
+///   Blocks may arrive before or after the mailbox receive operator is 
constructed. Therefore, resource usage must be
+///   accumulated even before the [QueryThreadContext] is registered, and then 
reconciled when registration occurs.
 @ThreadSafe
 public class ReceivingMailbox {
   public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
@@ -74,10 +97,15 @@ public class ReceivingMailbox {
   // TODO: Apply backpressure at the sender side when the queue is full.
   /// The queue where blocks are going to be stored.
   private final CancellableBlockingQueue _blocks;
-  private long _lastArriveTime = System.currentTimeMillis();
 
   private final StatMap<StatKey> _stats = new StatMap<>(StatKey.class);
 
+  /// Following variables are protected with synchronized block.
+  private long _lastArriveTime = System.currentTimeMillis();
+  private QueryThreadContext _receiveOperatorThreadContext;
+  private long _untrackedCpuTimeNs;
+  private long _untrackedAllocatedBytes;
+
   public ReceivingMailbox(String id, int maxPendingBlocks) {
     _id = id;
     _blocks = new CancellableBlockingQueue(id, maxPendingBlocks);
@@ -91,14 +119,43 @@ public class ReceivingMailbox {
     _blocks.registerReader(reader);
   }
 
+  /// Registers the [QueryThreadContext] of the mailbox receive operator.
+  public synchronized void registerReceiveOperatorThreadContext(@Nullable 
QueryThreadContext threadContext) {
+    assert _receiveOperatorThreadContext == null;
+    // NOTE: In production code, threadContext should never be null. It might 
be null in tests when QueryThreadContext
+    //       is not set up.
+    if (threadContext == null) {
+      return;
+    }
+    _receiveOperatorThreadContext = threadContext;
+    if (_untrackedCpuTimeNs > 0 || _untrackedAllocatedBytes > 0) {
+      updateResourceUsage(threadContext, _untrackedCpuTimeNs, 
_untrackedAllocatedBytes);
+      _untrackedCpuTimeNs = 0;
+      _untrackedAllocatedBytes = 0;
+    }
+  }
+
+  @VisibleForTesting
+  void updateResourceUsage(QueryThreadContext threadContext, long cpuTimeNs, 
long allocatedBytes) {
+    ThreadAccountant accountant = threadContext.getAccountant();
+    QueryExecutionContext executionContext = 
threadContext.getExecutionContext();
+    accountant.updateUntrackedResourceUsage(executionContext.getCid(), 
cpuTimeNs, allocatedBytes, TrackingScope.QUERY);
+    
accountant.updateUntrackedResourceUsage(executionContext.getWorkloadName(), 
cpuTimeNs, allocatedBytes,
+        TrackingScope.WORKLOAD);
+  }
+
   public String getId() {
     return _id;
   }
 
   /// Offers a raw block into the mailbox within the timeout specified, 
returns the status of the mailbox.
+  ///
+  /// NOTE:
+  /// This method is executed by a shared gRPC executor thread rather than a 
query execution thread.
+  /// Therefore, CPU and memory usage must be tracked explicitly and 
independently.
   public ReceivingMailboxStatus offerRaw(List<ByteBuffer> byteBuffers, long 
timeoutMs) {
+    ThreadResourceSnapshot resourceSnapshot = new ThreadResourceSnapshot();
     updateWaitCpuTime();
-
     MseBlock block;
     List<DataBuffer> stats;
     try {
@@ -129,7 +186,12 @@ public class ReceivingMailbox {
       }
     } catch (Exception e) {
       // Use the terminate exception when query is explicitly terminated.
-      TerminationException terminateException = 
QueryThreadContext.getTerminateException();
+      TerminationException terminateException = null;
+      synchronized (this) {
+        if (_receiveOperatorThreadContext != null) {
+          terminateException = 
_receiveOperatorThreadContext.getExecutionContext().getTerminateException();
+        }
+      }
       if (terminateException != null) {
         block = ErrorMseBlock.fromException(terminateException);
       } else {
@@ -139,7 +201,18 @@ public class ReceivingMailbox {
       }
       stats = List.of();
     }
-    return offerPrivate(block, stats, timeoutMs);
+    ReceivingMailboxStatus status = offerPrivate(block, stats, timeoutMs);
+    long cpuTimeNs = resourceSnapshot.getCpuTimeNs();
+    long allocatedBytes = resourceSnapshot.getAllocatedBytes();
+    synchronized (this) {
+      if (_receiveOperatorThreadContext != null) {
+        updateResourceUsage(_receiveOperatorThreadContext, cpuTimeNs, 
allocatedBytes);
+      } else {
+        _untrackedCpuTimeNs += cpuTimeNs;
+        _untrackedAllocatedBytes += allocatedBytes;
+      }
+    }
+    return status;
   }
 
   /// Offers a block into the mailbox within the timeout specified, returns 
the status of the mailbox.
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 54cce0d78cf..635fd3ace2a 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.query.runtime.operator.utils.AsyncStream;
 import 
org.apache.pinot.query.runtime.operator.utils.BlockingMultiStreamConsumer;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
 
 
 /**
@@ -72,8 +73,9 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
       List<ReadMailboxAsyncStream> asyncStreams = new 
ArrayList<>(numMailboxes);
       _receivingStats = new ArrayList<>(numMailboxes);
       for (String mailboxId : _mailboxIds) {
-        ReadMailboxAsyncStream asyncStream =
-            new 
ReadMailboxAsyncStream(_mailboxService.getReceivingMailbox(mailboxId), this);
+        ReceivingMailbox receivingMailbox = 
_mailboxService.getReceivingMailbox(mailboxId);
+        
receivingMailbox.registerReceiveOperatorThreadContext(QueryThreadContext.getIfAvailable());
+        ReadMailboxAsyncStream asyncStream = new 
ReadMailboxAsyncStream(receivingMailbox, this);
         asyncStreams.add(asyncStream);
         _receivingStats.add(asyncStream._mailbox.getStatMap());
       }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
index 063d4a1cc20..4074a1bd060 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/ReceivingMailboxTest.java
@@ -25,20 +25,21 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.mockito.Mockito;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.*;
 
 
 public class ReceivingMailboxTest {
@@ -48,9 +49,15 @@ public class ReceivingMailboxTest {
   private static final MseBlock.Data DATA_BLOCK = new 
RowHeapDataBlock(List.of(), DATA_SCHEMA, null);
   private ReceivingMailbox.Reader _reader;
 
-  @BeforeMethod
+  @BeforeClass
   public void setUp() {
-    _reader = Mockito.mock(ReceivingMailbox.Reader.class);
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(true);
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true);
+  }
+
+  @BeforeMethod
+  public void setUpMethod() {
+    _reader = mock(ReceivingMailbox.Reader.class);
   }
 
   @Test
@@ -108,10 +115,10 @@ public class ReceivingMailboxTest {
     ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
     receivingMailbox.registeredReader(_reader);
 
-    MseBlock[] offeredBlocks = new MseBlock[] {
-      new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
-      new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
-      new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
+    MseBlock[] offeredBlocks = new MseBlock[]{
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
+        new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
     };
     for (MseBlock block : offeredBlocks) {
       ReceivingMailbox.ReceivingMailboxStatus status = 
receivingMailbox.offer(block, List.of(), 10);
@@ -132,7 +139,7 @@ public class ReceivingMailboxTest {
     ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
     receivingMailbox.registeredReader(_reader);
 
-    MseBlock[] offeredBlocks = new MseBlock[] {
+    MseBlock[] offeredBlocks = new MseBlock[]{
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
@@ -172,7 +179,7 @@ public class ReceivingMailboxTest {
     ReceivingMailbox receivingMailbox = new ReceivingMailbox("id", 10);
     receivingMailbox.registeredReader(_reader);
 
-    MseBlock[] offeredBlocks = new MseBlock[] {
+    MseBlock[] offeredBlocks = new MseBlock[]{
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
@@ -201,7 +208,7 @@ public class ReceivingMailboxTest {
     receivingMailbox.registeredReader(_reader);
     ErrorMseBlock errorBlock = ErrorMseBlock.fromException(new 
RuntimeException("Test error"));
 
-    MseBlock[] offeredBlocks = new MseBlock[] {
+    MseBlock[] offeredBlocks = new MseBlock[]{
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null),
         new RowHeapDataBlock(List.of(), DATA_SCHEMA, null)
@@ -328,4 +335,40 @@ public class ReceivingMailboxTest {
       ExecutorService executor) {
     return CompletableFuture.supplyAsync(() -> receivingMailbox.offer(block, 
List.of(), 10_000), executor);
   }
+
+  @Test
+  public void testResourceTracking()
+      throws Exception {
+    QueryThreadContext threadContext = mock(QueryThreadContext.class);
+
+    // Receive after setting thread context
+    TestReceivingMailbox receivingMailbox = new TestReceivingMailbox("id", 
threadContext);
+    receivingMailbox.registerReceiveOperatorThreadContext(threadContext);
+    
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
+    assertTrue(receivingMailbox._resourceUsageUpdated);
+
+    // Receive before setting thread context
+    receivingMailbox = new TestReceivingMailbox("id", threadContext);
+    
receivingMailbox.offerRaw(DataBlockUtils.serialize(DATA_BLOCK.asSerialized().getDataBlock()),
 10_000);
+    receivingMailbox.registerReceiveOperatorThreadContext(threadContext);
+    assertTrue(receivingMailbox._resourceUsageUpdated);
+  }
+
+  private static class TestReceivingMailbox extends ReceivingMailbox {
+    final QueryThreadContext _expectedThreadContext;
+    boolean _resourceUsageUpdated;
+
+    public TestReceivingMailbox(String id, QueryThreadContext 
expectedThreadContext) {
+      super(id);
+      _expectedThreadContext = expectedThreadContext;
+    }
+
+    @Override
+    void updateResourceUsage(QueryThreadContext threadContext, long cpuTimeNs, 
long allocatedBytes) {
+      assertSame(threadContext, _expectedThreadContext);
+      assertTrue(cpuTimeNs > 0);
+      assertTrue(allocatedBytes > 0);
+      _resourceUsageUpdated = true;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to