shauryachats opened a new pull request, #18855:
URL: https://github.com/apache/pinot/pull/18855
---
## Motivation
When `enableMultiClusterRouting=true`, a broker (e.g. in DCA) builds a
full routing table for each remote cluster (e.g.
PHX), including a `TimeSegmentPruner` per table. The pruner is supposed to
skip segments whose time range does not
overlap the query's time filter — the same pruning that works correctly
for local segments.
In practice, time pruning was silently broken for **all** remote segments.
Every remote segment was mapped to
`DEFAULT_INTERVAL = [0, Long.MAX_VALUE]`, which matches every query, so
nothing was pruned.
On a federated table spanning two clusters in internal testing,
`numSegmentsQueried` with `enableMultiClusterRouting=true` was 6× higher than
two equivalent local-only queries combined (684 vs ~108).
---
## Root cause
Two bugs in the routing update pipeline conspired to produce this outcome.
### Bug 1 — `TimeSegmentPruner.onAssignmentChange`: `computeIfAbsent`
prevented committed segments from being updated
`TimeSegmentPruner` maintains a map of `segment → time interval`. REALTIME
segments first appear in CONSUMING state with `startTime = -1`, so they are
mapped to `DEFAULT_INTERVAL`. When the segment later commits and ZooKeeper is
updated with a valid `startTime`/`endTime`, the pruner should replace
`DEFAULT_INTERVAL` with the real interval.
The old code used `computeIfAbsent`, which only writes if the key is
absent. Since the CONSUMING entry already exists, the update is silently
skipped. The segment stays at `DEFAULT_INTERVAL` permanently.
```java
// Before
_intervalMap.computeIfAbsent(segment, k -> extractInterval(k, znRecord));
// After — re-evaluate any segment still at DEFAULT_INTERVAL
Interval existing = _intervalMap.get(segment);
if (existing == null || existing == DEFAULT_INTERVAL) {
_intervalMap.put(segment, extractInterval(segment, znRecord));
}
```
### Bug 2 — `SegmentZkMetadataFetcher.onAssignmentChange`: consuming
segments were cached and never re-fetched
`SegmentZkMetadataFetcher` maintains `_onlineSegmentsCached` to avoid
redundant ZK reads. Once a segment's ZNRecord is fetched, it is added to the
cache and skipped on all subsequent `onAssignmentChange` calls.
The original caching condition was `znRecord != null`. A CONSUMING segment
**does** have a non-null ZNRecord (it just has `startTime = -1`), so it was
immediately cached. From that point on it was never re-fetched — even after it
committed and ZK was updated with a valid time range. `TimeSegmentPruner` never
received the committed ZNRecord, so Bug 1's fix never had a chance to run.
For **local** routing this is masked: when a server commits a segment, it
sends a Helix user-defined message (UDM)
directly to the local broker, which calls `refreshSegment()` — bypassing
the cache entirely. Remote brokers never receive
these UDMs (**because remote brokers are spectators, not participants** ),
so `onAssignmentChange` is the sole update path for remote segments.
---
## Fix
### `SegmentZkMetadataFetcher` — use ExternalView state instead of caching
consuming segments
Rather than caching a segment as soon as its ZNRecord is non-null, we
consult the ExternalView that is already delivered
with every `onAssignmentChange` call:
```java
for (String segment : onlineSegments) {
if (_onlineSegmentsCached.contains(segment)) continue; // committed,
already cached
if (isConsumingInExternalView(externalView, segment)) continue; //
still consuming, skip
segments.add(segment); // fetch from ZK
}
private static boolean isConsumingInExternalView(ExternalView
externalView, String segment) {
Map<String, String> stateMap = externalView.getStateMap(segment);
return stateMap != null &&
stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
}
```
A segment goes through exactly three states in this logic:
| State | In cache? | CONSUMING in EV? | Action |
|---|---|---|---|
| CONSUMING | No | Yes | Skip — nothing useful to fetch yet |
| Just committed (CONSUMING → ONLINE) | No | No | **Fetch once** — get
committed ZNRecord |
| Committed, previously seen | Yes | No | Skip — already cached |
The ExternalView change that fires `onAssignmentChange` is the same event
that flips the segment's state from CONSUMING to ONLINE. We use that state,
already in memory, to decide what to fetch — no additional ZK reads required to
make the decision.
### `TimeSegmentPruner` — re-evaluate segments at `DEFAULT_INTERVAL`
With the above fix ensuring the committed ZNRecord now flows through on
each transition, the `computeIfAbsent` in
`TimeSegmentPruner` is replaced with a conditional put that updates any
segment still at `DEFAULT_INTERVAL`:
```java
Interval existing = _intervalMap.get(segment);
if (existing == null || existing == DEFAULT_INTERVAL) {
_intervalMap.put(segment, extractInterval(segment, znRecord));
}
```
---
## Why this does not degrade performance
`onAssignmentChange` is called on a background thread in response to Helix
ExternalView changes — it is never on the
query path.
The original code fetched a segment's ZNRecord once (when first seen as
CONSUMING), cached it, and skipped it forever.
The new code does the same for committed segments. The key difference is
what happens to CONSUMING segments: instead of fetching and caching them
eagerly, we skip them entirely using the ExternalView state already in memory.
No ZK read is issued for a CONSUMING segment on any `onAssignmentChange` call.
The number of ZK reads per `onAssignmentChange` is now **O(segments that
just committed in this batch)** — typically 1–5 per ExternalView change during
active ingestion — compared to O(all currently-consuming segments) that a naive
re-fetch approach would require. In steady state between commits, zero extra
ZK reads are performed beyond what the original code did.
The `IntervalTree` rebuild in `TimeSegmentPruner` at the end of every
`onAssignmentChange` is O(N log N) in total
segments and was already happening before this change — unaffected.
---
## Verification
Tested on a staging cluster:
```
useMultiClusterRouting=false (local only): queried=54 processed=30 ✓
(baseline, unchanged)
useMultiClusterRouting=true (local + remote): queried=108 processed=60
✓ (was 684, now = 54+54)
```
`numSegmentsQueried` with multi-cluster routing now equals the sum of both
clusters' local-only counts.
`numSegmentsProcessed` is unchanged — correct results, no data regression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]