jineshparakh opened a new pull request, #18460:
URL: https://github.com/apache/pinot/pull/18460

   ## Summary
   
   For dedup tables with tier configurations, the segment-assignment path can 
incorrectly place a new CONSUMING segment on cold-tier servers, breaking the 
dedup invariant (all segments for a partition on the same hot-tier servers). 
This PR fixes that by positively identifying cold-tier servers via ZK and 
skipping cold-tier segments when looking up the "existing assignment" reference 
for a partition.
   
   This fix prevents new CONSUMING segments from being placed on cold-tier 
servers. It does NOT retroactively repair segments that the bug already 
misplaced. Tables that hit the bug before this fix was deployed will need 
operator-driven recovery; the specific recovery procedure is out of scope for 
this PR.
   
   Affects only `MultiTierStrictRealtimeSegmentAssignment` (dedup tables with 
tier configs). `SingleTierStrictRealtimeSegmentAssignment` (upsert) and plain 
`RealtimeSegmentAssignment` are unaffected.
   
   ---
   
   ## The Bug
   
   ### Symptom
   
   When a dedup table has a tier configuration (e.g. cold tier) and 
`SegmentRelocator` has moved older COMPLETED segments to cold-tier servers, 
subsequent new CONSUMING segments for the same partition could be placed on 
cold-tier servers instead of hot-tier (consuming) servers.
   
   This breaks the dedup invariant that **all segments for a partition must be 
on the same hot-tier servers**, since dedup metadata is co-located with the 
CONSUMING-tagged servers.
   
   ### Root Cause
   
   In `BaseStrictRealtimeSegmentAssignment.getExistingAssignment(partitionId, 
currentAssignment)`:
   
   ```java
   for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
       if (isOfflineSegment(entry.getValue())) {
           continue;
       }
       LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
       if (llcSegmentName == null) {
           uploadedSegments.add(entry.getKey());
           continue;
       }
       if (llcSegmentName.getPartitionGroupId() == partitionId) {
           return entry.getValue().keySet();  // <-- first match returned
       }
   }
   ```
   
   **The fundamental issue**: the method returns the assignment of the first 
matching segment it encounters, without verifying that segment is a valid 
hot-tier reference. Cold-tier assignments are not a valid source of truth for 
the dedup invariant, they describe where the segment was *relocated to*, not 
where new CONSUMING segments for that partition should land.
   
   **Why the bug triggers reliably in practice**: `currentAssignment` is a 
`TreeMap` keyed by segment name (LLC segment names have the form 
`table__partition__seqNum__timestamp`). Iteration is in lexicographic order, 
which for single-digit seqNums (0–9) coincides with numeric order. For a given 
partition, older segments (lower seqNum) are encountered first and older 
completed segments are exactly the ones `SegmentRelocator` moves to cold tier 
first. The first match is therefore very likely to be a cold-tier segment 
whenever any segment for the partition has been relocated.
   
   ### Example
   
   ```
   Partition 3 state in idealState (TreeMap, iterates alphabetically by segment 
name):
   
     testTable__3__874__T  →  {coldTier_server_12: ONLINE, coldTier_server_13: 
ONLINE}
                              ↑ older COMPLETED segment, relocated to cold by 
SegmentRelocator
   
     testTable__3__875__T  →  {hot_server_10: ONLINE, hot_server_12: ONLINE, 
...}
                              ↑ newer COMPLETED segment, still on hot tier
   
     testTable__3__876__T  →  {hot_server_10: CONSUMING, hot_server_12: 
CONSUMING, ...}
                              ↑ actively consuming on hot tier
   
   assignSegment(testTable__3__877__T, ...):
     Pre-fix:  __874 is the first match for partition 3
               → returns {coldTier_server_12, coldTier_server_13}
               → seg__877 (new CONSUMING) gets assigned to cold-tier servers → 
BUG
   
     Post-fix: __874's servers are all in the tier-instance set → filter skips 
it
               → __875 is the next match → returns hot-tier servers
               → seg__877 assigned to hot-tier servers ✓
   ```
   
   ---
   
   ## The Fix
   
   ## Chosen Approach: ZK-based positive tier identification
   
   Add a `protected Set<String> getTierInstances()` template method to 
`BaseStrictRealtimeSegmentAssignment`. Inside `getExistingAssignment`, skip any 
segment whose servers are **entirely** in this set:
   
   ```java
   if (!tierInstances.isEmpty() && 
tierInstances.containsAll(entry.getValue().keySet())) {
       continue;
   }
   ```
   
   - Default implementation: returns `Set.of()` — no filtering, identical 
behavior for upsert.
   - Override in `MultiTierStrictRealtimeSegmentAssignment`: fetches each 
tier's `InstancePartitions` from ZK and aggregates all tier-server identities 
into a single set.
   
   
   ### Why this works for every scenario
   
   | Scenario | Behavior |
   |---|---|
   | Normal: all segments on hot-tier | Tier filter never matches → first hot 
segment returned ✓ |
   | Cold tier exists, mix of cold + hot segments | Cold ones skipped → first 
hot returned ✓ |
   | Full cascade: all segments on cold-tier (bug already fired) | All skipped 
→ returns null → falls back to computed hot-tier assignment → **self-healing** 
✓ |
   | IP change, no cold tier | Old consuming servers NOT in tier pool → not 
skipped → dedup invariant preserved ✓ |
   | IP change + cold tier (mixed) | Cold skipped, old hot returned → dedup 
invariant preserved ✓ |
   | Upsert (`SingleTier`) | Inherits empty default → filter never fires → 
behavior identical to pre-change ✓ |
   | Tier config but no tier IPs in ZK yet | `fetchInstancePartitions` returns 
null → empty tier set → no filtering. Correct because if tier IPs don't exist, 
no segment could have been moved to that tier ✓ |
   
   ### Trade-off: ZK read per `assignSegment`
   
   `getTierInstances` issues one ZK property store read per configured tier on 
every `assignSegment` call. Mitigations:
   
   - `ZkHelixPropertyStore` caches reads locally; only cache-miss/expiry hits 
the actual ZK ensemble
   - `assignSegment` is not on the query hot path (called once per 
CONSUMING-segment commit)
   - Pinot's controller is already heavily ZK-dependent for the same code path
   
   If ZK is unavailable, the fetch will throw and propagate up. This is 
**intentional** — silently falling back to an empty set would re-introduce the 
bug during ZK flakiness. Documented in code.
   
   ---
   
   ## How we guarantee new CONSUMING segments go to hot servers
   
   Tracing the post-fix `assignSegment` for the typical bug scenario:
   
   1. **Compute candidate from instance partitions**:
      `assignConsumingSegment(partitionId, consumingInstancePartitions)` 
returns hot-tier servers by construction.
   
   2. **Look up existing assignment** via `getExistingAssignment(partitionId, 
currentAssignment)`:
      - Fetch tier server set from ZK once
      - Iterate `currentAssignment` (TreeMap, alphabetical order by segment 
name)
      - For each segment:
        - Skip if all servers are OFFLINE
        - **Skip if all servers are in the tier-server set** (the new filter)
        - Match partition → return its keySet (which contains only hot-tier 
servers, since cold-tier segments were skipped)
      - If no match found → return `null`
   
   3. **Reconcile**:
      - If existing == null → use computed hot-tier assignment (case: first 
segment for partition, or full cascade)
      - If existing != null and matches computed → use computed hot-tier
      - If existing != null and differs from computed → use existing (still 
hot-tier, because cold-tier was filtered)
   
   In every branch, the final `instancesAssigned` contains only hot-tier 
servers.
   
   ---
   
   ## Tested Scenarios
   
   All tests in `StrictRealtimeSegmentAssignmentTest`:
   
   ### Existing tests (regression coverage)
   
   - `testAssignSegment(upsert)` — confirms upsert path is unchanged
   - `testAssignSegment(dedup)` — confirms dedup path without tier config is 
unchanged
   - `testAssignSegmentWithOfflineSegment(upsert / dedup)` — OFFLINE segments 
skipped
   - `testRebalanceDedupTableWithTiers` — rebalance path unchanged
   - `testRebalanceUpsertTableWithTiers` — upsert rejects tiers
   - `testAssignSegmentToCompletedServers(upsert / dedup)` — precondition checks
   - `testRebalanceTableToCompletedServers(upsert / dedup)` — precondition 
checks
   
   ### New tests (bug coverage)
   
   | Test | Scenario | What it verifies |
   |---|---|---|
   | `testAssignSegmentIgnoresColdTierSegments` | Single cold segment for p0, 
no other p0 segment, same IPs | Cold skipped → null → computed hot-tier used |
   | `testAssignSegmentAllColdTierFallsBackToComputed` | Single cold segment 
for p0, new IPs | Cold skipped → null → new computed hot-tier used |
   | `testAssignSegmentPrefersSamePartitionOnConsumingTier` | Cold seg_0 + hot 
seg_4 for p0, new IPs | Cold skipped, hot returned → dedup invariant preserved 
(uses old hot servers, not new) |
   | `testAssignSegmentAfterCommitWithColdTier` (parameterized × 2 cold-tier 
replications) | **Production-realistic flow**: previous CONSUMING transitioned 
to ONLINE before `assignSegment`, with cold-tier segment present | Mirrors what 
`PinotLLCRealtimeSegmentManager` does at commit time |
   | `testCascadeColdTierCorruptionSelfHeals` (parameterized × 2 cold-tier 
replications) | **All p0 segments on cold-tier (bug already fired)** | 
Self-healing: all cold skipped → null → falls back to computed hot-tier → 
**breaks the cascade** |
   | `testIPChangeWithTierConfigPreservesDedupInvariant` | Instance-partition 
change, no cold-tier segments | Tier filter does NOT interfere with the normal 
IP-change path |
   
   ### Cold-tier replication coverage
   
   Cold tier may legitimately have a different replication factor than hot 
tier. Two of the most critical tests are parameterized via 
`@DataProvider("coldTierReplications")` to run with:
   
   - **2 cold-tier servers** (cold replication `<` `NUM_REPLICAS=3`)
   - **3 cold-tier servers** (cold replication `==` `NUM_REPLICAS=3`)
   
   The 3-server case is the more rigorous coverage: without the fix, 
`isSameAssignment` would pass the size check (3 == 3) but fail content check, 
then assign the new CONSUMING segment to cold-tier servers — the exact 
production bug. The 2-server case fails earlier on the size mismatch. So the 
parameterization directly exercises the cold-tier-assignment failure mode.
   
   
   
   ## Alternative Approaches Considered & Discarded
   
   ### Approach A: "Highest sequence number" heuristic
   
   Return the assignment of the segment with the highest sequence number for 
the partition, on the assumption that "highest seq = most recently committed = 
always on hot-tier" (since `SegmentRelocator` moves oldest segments first).
   
   **Discarded because**: Doesn't break the cascade. If a previous occurrence 
of the bug already placed the latest segment on cold tier, the highest-seq 
segment IS on cold tier, and the heuristic perpetuates the misplacement:  
   
   ```
   Cascade-corrupted state:
     seg__0__0  → cold-tier  (moved by SegmentRelocator)
     seg__0__1  → cold-tier  (placed there by previous bug occurrence)
     seg__0__2  → cold-tier  (placed there by previous bug occurrence)
   
   assignSegment(seg__0__3) with "highest seq" heuristic:
     → returns seg__0__2's cold-tier assignment
     → seg__0__3 assigned to cold-tier
     → cascade continues forever ✗
   ```
   
    The fix needs to **stop the bleeding** i.e. once deployed, even tables that 
were corrupted by the bug should route *new* segments back to hot tier. (Note: 
this does NOT retroactively repair already-misplaced segments — those remain on 
cold tier until an operator runs a table rebalance.)  
   
   ### Approach B: Override `assignSegment` entirely in the subclass (Just a 
different impl way)
   
   Duplicate all of `assignSegment`'s orchestration (preconditions, partition 
ID, candidate calculation, mismatch comparison, metrics, logging) in 
`MultiTierStrictRealtimeSegmentAssignment` so the base class needs no changes.
   
   **Discarded because**:
   - Duplicated orchestration logic
   - Requires making `getPartitionId`, `isSameAssignment`, 
`getExistingAssignment` protected (widening their visibility)
   - Risk of silent divergence when base logic evolves (new metrics, new log 
fields, new edge case handling)
   
   ### Approach C: Skip segments with sequence number lower than the highest 
hot-tier segment
   
   Iterate to find the highest-seq segment on hot-tier; use that as the 
reference.
   
   **Discarded because**: Same cascade problem as Approach A. In the corrupted 
state, there is no hot-tier segment for the partition, and we'd return some 
arbitrary cold-tier segment. Adding "if no hot-tier segment, return null" is 
equivalent to Approach B for the cascade case.
   
   ---
   
   ## Backwards Compatibility
   
   | Path | Behavior |
   |---|---|
   | Plain realtime tables (`RealtimeSegmentAssignment`) | Unaffected — not in 
the changed class hierarchy |
   | Upsert tables (`SingleTierStrictRealtimeSegmentAssignment`) | Identical to 
pre-change (default `getTierInstances` returns empty set; filter never fires) |
   | Dedup tables without tier config 
(`MultiTierStrictRealtimeSegmentAssignment`, no tiers) | Identical to 
pre-change (`getTierConfigsList()` returns null → empty set returned without ZK 
calls) |
   | Dedup tables with tier config but no tier IPs persisted in ZK yet | 
Identical to pre-change (no segments could have been moved to non-existent 
tiers, so empty tier set is correct) |
   | Dedup tables with tier config + tier IPs in ZK | New behavior: cold-tier 
segments excluded from existing-assignment lookup (the fix) |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to