shauryachats opened a new pull request, #18855:
URL: https://github.com/apache/pinot/pull/18855

     ---
   
     ## Motivation
   
     When `enableMultiClusterRouting=true`, a broker (e.g. in DCA) builds a 
full routing table for each remote cluster (e.g.
     PHX), including a `TimeSegmentPruner` per table. The pruner is supposed to 
skip segments whose time range does not
     overlap the query's time filter — the same pruning that works correctly 
for local segments.
   
     In practice, time pruning was silently broken for **all** remote segments. 
Every remote segment was mapped to
     `DEFAULT_INTERVAL = [0, Long.MAX_VALUE]`, which matches every query, so 
nothing was pruned. 
   
   On a federated table spanning two clusters in internal testing, 
`numSegmentsQueried` with `enableMultiClusterRouting=true` was 6× higher than 
two equivalent local-only queries combined (684 vs ~108).
   
     ---
   
     ## Root cause
   
     Two bugs in the routing update pipeline conspired to produce this outcome.
   
     ### Bug 1 — `TimeSegmentPruner.onAssignmentChange`: `computeIfAbsent` 
prevented committed segments from being updated
   
     `TimeSegmentPruner` maintains a map of `segment → time interval`. REALTIME 
segments first appear in CONSUMING state with `startTime = -1`, so they are 
mapped to `DEFAULT_INTERVAL`. When the segment later commits and ZooKeeper is 
updated with  a valid `startTime`/`endTime`, the pruner should replace 
`DEFAULT_INTERVAL` with the real interval.
   
     The old code used `computeIfAbsent`, which only writes if the key is 
absent. Since the CONSUMING entry already exists,  the update is silently 
skipped. The segment stays at `DEFAULT_INTERVAL` permanently.
   
     ```java
     // Before
     _intervalMap.computeIfAbsent(segment, k -> extractInterval(k, znRecord));
   
     // After — re-evaluate any segment still at DEFAULT_INTERVAL
     Interval existing = _intervalMap.get(segment);
     if (existing == null || existing == DEFAULT_INTERVAL) {
         _intervalMap.put(segment, extractInterval(segment, znRecord));
     }
     ```
   
     ### Bug 2 — `SegmentZkMetadataFetcher.onAssignmentChange`: consuming 
segments were cached and never re-fetched
   
     `SegmentZkMetadataFetcher` maintains `_onlineSegmentsCached` to avoid 
redundant ZK reads. Once a segment's ZNRecord is  fetched, it is added to the 
cache and skipped on all subsequent `onAssignmentChange` calls.
   
     The original caching condition was `znRecord != null`. A CONSUMING segment 
**does** have a non-null ZNRecord (it just has `startTime = -1`), so it was 
immediately cached. From that point on it was never re-fetched — even after it 
committed and ZK was updated with a valid time range. `TimeSegmentPruner` never 
received the committed ZNRecord, so Bug 1's fix  never had a chance to run.
   
     For **local** routing this is masked: when a server commits a segment, it 
sends a Helix user-defined message (UDM)
     directly to the local broker, which calls `refreshSegment()` — bypassing 
the cache entirely. Remote brokers never receive
     these UDMs (**because remote brokers are spectators, not participants** ), 
so `onAssignmentChange` is the sole update path for remote segments.
   
     ---
   
     ## Fix
   
     ### `SegmentZkMetadataFetcher` — use ExternalView state instead of caching 
consuming segments
   
     Rather than caching a segment as soon as its ZNRecord is non-null, we 
consult the ExternalView that is already delivered
     with every `onAssignmentChange` call:
   
     ```java
     for (String segment : onlineSegments) {
         if (_onlineSegmentsCached.contains(segment)) continue;  // committed, 
already cached
         if (isConsumingInExternalView(externalView, segment)) continue;  // 
still consuming, skip
         segments.add(segment);  // fetch from ZK
     }
   
     private static boolean isConsumingInExternalView(ExternalView 
externalView, String segment) {
         Map<String, String> stateMap = externalView.getStateMap(segment);
         return stateMap != null && 
stateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
     }
     ```
   
     A segment goes through exactly three states in this logic:
   
     | State | In cache? | CONSUMING in EV? | Action |
     |---|---|---|---|
     | CONSUMING | No | Yes | Skip — nothing useful to fetch yet |
     | Just committed (CONSUMING → ONLINE) | No | No | **Fetch once** — get 
committed ZNRecord |
     | Committed, previously seen | Yes | No | Skip — already cached |
   
     The ExternalView change that fires `onAssignmentChange` is the same event 
that flips the segment's state from CONSUMING  to ONLINE. We use that state, 
already in memory, to decide what to fetch — no additional ZK reads required to 
make the decision.
   
     ### `TimeSegmentPruner` — re-evaluate segments at `DEFAULT_INTERVAL`
   
     With the above fix ensuring the committed ZNRecord now flows through on 
each transition, the `computeIfAbsent` in 
     `TimeSegmentPruner` is replaced with a conditional put that updates any 
segment still at `DEFAULT_INTERVAL`:
   
     ```java
     Interval existing = _intervalMap.get(segment);
     if (existing == null || existing == DEFAULT_INTERVAL) {
         _intervalMap.put(segment, extractInterval(segment, znRecord));
     }
     ```
     ---
   
     ## Why this does not degrade performance
   
     `onAssignmentChange` is called on a background thread in response to Helix 
ExternalView changes — it is never on the
     query path.
     
     The original code fetched a segment's ZNRecord once (when first seen as 
CONSUMING), cached it, and skipped it forever.  
   The new code does the same for committed segments. The key difference is 
what happens to CONSUMING segments: instead of  fetching and caching them 
eagerly, we skip them entirely using the ExternalView state already in memory. 
No ZK read is  issued for a CONSUMING segment on any `onAssignmentChange` call.
     
     The number of ZK reads per `onAssignmentChange` is now **O(segments that 
just committed in this batch)** — typically 1–5  per ExternalView change during 
active ingestion — compared to O(all currently-consuming segments) that a naive 
re-fetch  approach would require. In steady state between commits, zero extra 
ZK reads are performed beyond what the original code  did.
     
     The `IntervalTree` rebuild in `TimeSegmentPruner` at the end of every 
`onAssignmentChange` is O(N log N) in total
     segments and was already happening before this change — unaffected.
     
     ---
     
     ## Verification
   
     Tested on a staging cluster:
   
     ```
     useMultiClusterRouting=false (local only):  queried=54   processed=30   ✓ 
(baseline, unchanged)
     useMultiClusterRouting=true  (local + remote):   queried=108  processed=60 
  ✓ (was 684, now = 54+54)
     ```
   
     `numSegmentsQueried` with multi-cluster routing now equals the sum of both 
clusters' local-only counts.
     `numSegmentsProcessed` is unchanged — correct results, no data regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to