This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b87ef4cbe9 Add broker extension seams for query-thread estimation and
routing-metadata listeners (#18800)
4b87ef4cbe9 is described below
commit 4b87ef4cbe90dd158a9a6bab7cc53d97aaa8f2f3
Author: Krishan Goyal <[email protected]>
AuthorDate: Fri Jun 19 01:58:13 2026 +0530
Add broker extension seams for query-thread estimation and routing-metadata
listeners (#18800)
---
.../MultiStageBrokerRequestHandler.java | 11 ++++++++-
.../routing/manager/BaseBrokerRoutingManager.java | 26 ++++++++++++++++++++++
.../planner/physical/DispatchableSubPlan.java | 24 +++++++++++++++-----
3 files changed, 55 insertions(+), 6 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ddd8c00da83..725c0925bf7 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -608,6 +608,15 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
RequestContext requestContext, HttpHeaders httpHeaders) {
}
+ /**
+ * Estimates the total number of server query threads the given plan will
consume; used to acquire permits from the
+ * multi-stage query throttler before dispatch. Subclasses can override to
accurately
+ * estimate the real per-server work.
+ */
+ protected int getEstimatedNumQueryThreads(DispatchableSubPlan
dispatchableSubPlan) {
+ return dispatchableSubPlan.getEstimatedNumQueryThreads();
+ }
+
private long getTimeoutMs(Map<String, String> queryOptions) {
Long timeoutMsFromQueryOption =
QueryOptionsUtils.getTimeoutMs(queryOptions);
return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption :
_brokerTimeoutMs;
@@ -696,7 +705,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
// Short-circuit: if all leaf stages are empty (all segments pruned or
table has no data),
// run only the broker reduce stage locally. No server dispatch is
attempted.
boolean allLeafStagesEmpty = dispatchableSubPlan.isAllLeafStagesEmpty();
- int estimatedNumQueryThreads =
dispatchableSubPlan.getEstimatedNumQueryThreads();
+ int estimatedNumQueryThreads =
getEstimatedNumQueryThreads(dispatchableSubPlan);
try {
// It's fine to block in this thread because we use a separate thread
pool from the main Jersey server to process
// these requests.
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
index 13adc526656..df5c1c558cb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -39,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.function.Function;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.collections4.CollectionUtils;
@@ -143,6 +145,14 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
@Nullable
private Consumer<ServerInstance> _serverReenableCallback;
+ // Providers of extra per-table segment ZK metadata fetch listeners,
registered alongside the built-in segment
+ // pruners on every table's routing entry. Each provider is invoked once per
table (with the table name with type)
+ // and may return null to skip that table. Lets deployments attach
routing-adjacent metadata caches that observe the
+ // same ZNRecords the routing manager already fetches, without extra ZK
reads. CopyOnWriteArrayList because providers
+ // are added once at startup but iterated concurrently across per-table
routing builds.
+ private final List<Function<String, SegmentZkMetadataFetchListener>>
_extraFetchListenerProviders =
+ new CopyOnWriteArrayList<>();
+
// Global read-write lock for protecting the global data structures such as
_enabledServerInstanceMap,
// _excludedServers, and _routableServerInstanceMap. Write lock must be held
if any of these are modified, read lock
// must be held otherwise
@@ -209,6 +219,16 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
_serverReenableCallback = callback;
}
+ /**
+ * Registers a provider of an extra {@link SegmentZkMetadataFetchListener}
that is attached to every table's routing
+ * entry, alongside the built-in segment pruners. The provider is invoked
once per table with the table name with
+ * type and may return {@code null} to skip a table. Must be called before
routing is built (i.e. before the cluster
+ * change mediator starts) so that all tables pick it up.
+ */
+ public void addSegmentZkMetadataFetchListenerProvider(Function<String,
SegmentZkMetadataFetchListener> provider) {
+ _extraFetchListenerProviders.add(provider);
+ }
+
private Object getRoutingTableBuildLock(String tableNameWithType) {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
return _routingTableBuildLocks.computeIfAbsent(rawTableName, k -> new
Object());
@@ -833,6 +853,12 @@ public abstract class BaseBrokerRoutingManager implements
RoutingManager, Cluste
if (partitionMetadataManager != null) {
segmentZkMetadataFetcher.register(partitionMetadataManager);
}
+ for (Function<String, SegmentZkMetadataFetchListener> provider :
_extraFetchListenerProviders) {
+ SegmentZkMetadataFetchListener listener =
provider.apply(tableNameWithType);
+ if (listener != null) {
+ segmentZkMetadataFetcher.register(listener);
+ }
+ }
segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
// Build table sampler contexts keyed by normalized sampler name.
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
index 7e1ecb2fd80..1711b67abe9 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchableSubPlan.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.function.ToIntFunction;
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.core.util.QueryMultiThreadingUtils;
@@ -147,6 +148,18 @@ public class DispatchableSubPlan {
* Get the estimated total number of threads that will be spawned for this
query (across all stages and servers).
*/
public int getEstimatedNumQueryThreads() {
+ return getEstimatedNumQueryThreads(segment -> 1);
+ }
+
+ /**
+ * Get the estimated total number of threads that will be spawned for this
query (across all stages and servers),
+ * weighting each leaf-stage segment by the number of work units it
represents.
+ *
+ * <p>{@code segmentWorkUnits} maps a leaf-stage segment name to its
work-unit count (default 1). A caller can
+ * return a value that more accurately
+ * reflects the real work the server will perform rather than the number of
routed entries.
+ */
+ public int getEstimatedNumQueryThreads(ToIntFunction<String>
segmentWorkUnits) {
if (_allLeafStagesEmpty) {
return 0;
}
@@ -159,11 +172,12 @@ public class DispatchableSubPlan {
} else {
// Leaf stage
for (Map<String, List<String>> segmentsMap :
stage.getWorkerIdToSegmentsMap().values()) {
- int numSegments = segmentsMap
- .values()
- .stream()
- .mapToInt(List::size)
- .sum();
+ int numSegments = 0;
+ for (List<String> segments : segmentsMap.values()) {
+ for (String segment : segments) {
+ numSegments += segmentWorkUnits.applyAsInt(segment);
+ }
+ }
// The leaf stage operator itself spawns a thread for each server
query request
estimatedNumQueryThreads++;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]