This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b87ef4cbe9 Add broker extension seams for query-thread estimation and 
routing-metadata listeners (#18800)
4b87ef4cbe9 is described below

commit 4b87ef4cbe90dd158a9a6bab7cc53d97aaa8f2f3
Author: Krishan Goyal <[email protected]>
AuthorDate: Fri Jun 19 01:58:13 2026 +0530

    Add broker extension seams for query-thread estimation and routing-metadata 
listeners (#18800)
---
 .../MultiStageBrokerRequestHandler.java            | 11 ++++++++-
 .../routing/manager/BaseBrokerRoutingManager.java  | 26 ++++++++++++++++++++++
 .../planner/physical/DispatchableSubPlan.java      | 24 +++++++++++++++-----
 3 files changed, 55 insertions(+), 6 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ddd8c00da83..725c0925bf7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -608,6 +608,15 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       RequestContext requestContext, HttpHeaders httpHeaders) {
   }
 
+  /**
+   * Estimates the total number of server query threads the given plan will 
consume; used to acquire permits from the
+   * multi-stage query throttler before dispatch. Subclasses can override to 
accurately
+   * estimate the real per-server work.
+   */
+  protected int getEstimatedNumQueryThreads(DispatchableSubPlan 
dispatchableSubPlan) {
+    return dispatchableSubPlan.getEstimatedNumQueryThreads();
+  }
+
   private long getTimeoutMs(Map<String, String> queryOptions) {
     Long timeoutMsFromQueryOption = 
QueryOptionsUtils.getTimeoutMs(queryOptions);
     return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : 
_brokerTimeoutMs;
@@ -696,7 +705,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     // Short-circuit: if all leaf stages are empty (all segments pruned or 
table has no data),
     // run only the broker reduce stage locally. No server dispatch is 
attempted.
     boolean allLeafStagesEmpty = dispatchableSubPlan.isAllLeafStagesEmpty();
-    int estimatedNumQueryThreads = 
dispatchableSubPlan.getEstimatedNumQueryThreads();
+    int estimatedNumQueryThreads = 
getEstimatedNumQueryThreads(dispatchableSubPlan);
     try {
       // It's fine to block in this thread because we use a separate thread 
pool from the main Jersey server to process
       // these requests.
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
index 13adc526656..df5c1c558cb 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.commons.collections4.CollectionUtils;
@@ -143,6 +145,14 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
   @Nullable
   private Consumer<ServerInstance> _serverReenableCallback;
 
+  // Providers of extra per-table segment ZK metadata fetch listeners, 
registered alongside the built-in segment
+  // pruners on every table's routing entry. Each provider is invoked once per 
table (with the table name with type)
+  // and may return null to skip that table. Lets deployments attach 
routing-adjacent metadata caches that observe the
+  // same ZNRecords the routing manager already fetches, without extra ZK 
reads. CopyOnWriteArrayList because providers
+  // are added once at startup but iterated concurrently across per-table 
routing builds.
+  private final List<Function<String, SegmentZkMetadataFetchListener>> 
_extraFetchListenerProviders =
+      new CopyOnWriteArrayList<>();
+
   // Global read-write lock for protecting the global data structures such as 
_enabledServerInstanceMap,
   // _excludedServers, and _routableServerInstanceMap. Write lock must be held 
if any of these are modified, read lock
   // must be held otherwise
@@ -209,6 +219,16 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
     _serverReenableCallback = callback;
   }
 
+  /**
+   * Registers a provider of an extra {@link SegmentZkMetadataFetchListener} 
that is attached to every table's routing
+   * entry, alongside the built-in segment pruners. The provider is invoked 
once per table with the table name with
+   * type and may return {@code null} to skip a table. Must be called before 
routing is built (i.e. before the cluster
+   * change mediator starts) so that all tables pick it up.
+   */
+  public void addSegmentZkMetadataFetchListenerProvider(Function<String, 
SegmentZkMetadataFetchListener> provider) {
+    _extraFetchListenerProviders.add(provider);
+  }
+
   private Object getRoutingTableBuildLock(String tableNameWithType) {
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
     return _routingTableBuildLocks.computeIfAbsent(rawTableName, k -> new 
Object());
@@ -833,6 +853,12 @@ public abstract class BaseBrokerRoutingManager implements 
RoutingManager, Cluste
       if (partitionMetadataManager != null) {
         segmentZkMetadataFetcher.register(partitionMetadataManager);
       }
+      for (Function<String, SegmentZkMetadataFetchListener> provider : 
_extraFetchListenerProviders) {
+        SegmentZkMetadataFetchListener listener = 
provider.apply(tableNameWithType);
+        if (listener != null) {
+          segmentZkMetadataFetcher.register(listener);
+        }
+      }
       segmentZkMetadataFetcher.init(idealState, externalView, 
preSelectedOnlineSegments);
 
       // Build table sampler contexts keyed by normalized sampler name.
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
index 7e1ecb2fd80..1711b67abe9 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.function.ToIntFunction;
 import org.apache.calcite.runtime.PairList;
 import org.apache.pinot.core.util.QueryMultiThreadingUtils;
 
@@ -147,6 +148,18 @@ public class DispatchableSubPlan {
    * Get the estimated total number of threads that will be spawned for this 
query (across all stages and servers).
    */
   public int getEstimatedNumQueryThreads() {
+    return getEstimatedNumQueryThreads(segment -> 1);
+  }
+
+  /**
+   * Get the estimated total number of threads that will be spawned for this 
query (across all stages and servers),
+   * weighting each leaf-stage segment by the number of work units it 
represents.
+   *
+   * <p>{@code segmentWorkUnits} maps a leaf-stage segment name to its 
work-unit count (default 1). A caller can
+   *  return a value that more accurately
+   *  reflects the real work the server will perform rather than the number of 
routed entries.
+   */
+  public int getEstimatedNumQueryThreads(ToIntFunction<String> 
segmentWorkUnits) {
     if (_allLeafStagesEmpty) {
       return 0;
     }
@@ -159,11 +172,12 @@ public class DispatchableSubPlan {
       } else {
         // Leaf stage
         for (Map<String, List<String>> segmentsMap : 
stage.getWorkerIdToSegmentsMap().values()) {
-          int numSegments = segmentsMap
-              .values()
-              .stream()
-              .mapToInt(List::size)
-              .sum();
+          int numSegments = 0;
+          for (List<String> segments : segmentsMap.values()) {
+            for (String segment : segments) {
+              numSegments += segmentWorkUnits.applyAsInt(segment);
+            }
+          }
 
           // The leaf stage operator itself spawns a thread for each server 
query request
           estimatedNumQueryThreads++;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to