anuragrai16 opened a new issue, #18867:
URL: https://github.com/apache/pinot/issues/18867

   ## Summary
   
   For realtime tables whose 
`[ParallelSegmentConsumptionPolicy](https://github.com/apache/pinot/blob/master/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/ParallelSegmentConsumptionPolicy.java)`
 is `DISALLOW_ALWAYS` (e.g. partial-upsert, non-pauseless), each partition uses 
a single-permit consumer semaphore (`ConsumerCoordinator`, `Semaphore(1)`) so 
that only one segment per partition consumes at a time. The permit is acquired 
when a consumer thread starts and is released **only** through the offload path 
(`doOffload → closeStreamConsumer → releaseConsumerSemaphore`).
   
   Under certain transition orderings, a `RealtimeSegmentDataManager` (RSDM) 
can be left **running in memory holding the semaphore** while Helix's 
IdealState shows its segment as `OFFLINE`. No state transition is then able to 
reach it, so the **next** segment blocks forever on 
`ConsumerCoordinator.acquire()` and the partition stops ingesting until the 
stuck segment is manually deleted. We have seen this issue in production a few 
times now stemming from some memory pressure on server nodes causing ZK resets. 
   
   ## How it happens
   
   - **T — Initial attempt fails during construction.** An `OFFLINE→CONSUMING` 
for segment `S` throws while building the consuming segment (a transient 
resource error, e.g. a failed mmap/allocation). The constructor's catch block 
schedules a background "janitor" thread that sleeps ~30s and then calls 
`segmentStoppedConsuming` to ask the controller to mark `S` `OFFLINE` in 
IdealState.
   - **T+1 — A retry succeeds.** Before the janitor threaed fires, a retried 
`OFFLINE→CONSUMING` for the **same** segment `S` succeeds. A new RSDM is 
registered and acquires the semaphore, consumption is healthy.
   - **T+2 — The stale janitor fires.** The janitor from the **failed** attempt 
still calls `segmentStoppedConsuming` for `S`. The controller flips IdealState 
for `S` to `OFFLINE`, even though a healthy consumer is now running.
   - **T+3 — Cleanup is lost.** Normally the server would receive 
`CONSUMING→OFFLINE` and offload (releasing the semaphore). If that cleanup 
transition is not delivered/applied — e.g. the participant's ZooKeeper session 
changes around this moment, destroying the session-scoped `CurrentState` before 
the transition is generated — the in-memory RSDM survives (the consumer thread 
and semaphore are **not** tied to the ZK session) while Helix considers `S` 
settled at `OFFLINE`. The consumer is now a "ghost": running, holding the 
semaphore, invisible to Helix.
   - **Result.** The successor segment `S+1` gets `OFFLINE→CONSUMING`, but its 
consumer blocks indefinitely on `acquire()`. Ingestion for that partition 
stalls until `S` is manually deleted (`OFFLINE→DROPPED`), which offloads the 
in-memory RSDM and releases the semaphore.
   
   A related, independent gap: even when an `ERROR→OFFLINE` transition **is** 
delivered, the server's `onBecomeOfflineFromError()` only logs — it does 
**not** offload the segment. So a semaphore held by a failed `CONSUMING→ONLINE` 
path can also leak there.
   
   ## Impact
   
   - A single partition's realtime ingestion can stall indefinitely (successor 
segments never acquire the semaphore).
   - For upsert tables, the orphaned consumer can also retain primary-key 
ownership, which can cause missing/incorrect query results until the stuck 
segment is removed.
   - Recovery currently requires manual intervention (deleting the segment).
   
   ## Proposed changes
   
   `Note: These fixes are reduces the probability window of the above happening 
to near zero but doesn't completely fixes it. The compete fix would be to 
introduce a Reconciliation loop in the Server to catch these ghost segments or 
override the `reset()` logic to handle the cleanup of ZK session change in a 
proper manner. But these are high risk changes requiring a design and review. `
   
   ### 1. `ERROR→OFFLINE` cleanup
   
   Make `SegmentOnlineOfflineStateModel.onBecomeOfflineFromError()` call 
`offloadSegment(...)` (wrapped in try/catch, not rethrowing), mirroring 
`onBecomeOfflineFromConsuming()`.
   
   **Why this fixes it:** `offloadSegment()` runs `doOffload → 
closeStreamConsumer → releaseConsumerSemaphore`, so any RSDM/semaphore left 
behind by a failed transition is cleaned up when the segment moves 
`ERROR→OFFLINE`. It is safe to call unconditionally, `offloadSegment()` is 
null-safe and `releaseConsumerSemaphore()` uses an idempotent CAS.
   
   ### 2. Janitor thread guard before the 30s event fires
   
   In the init-error janitor path (`postStopConsumedMsg`), before sending 
`segmentStoppedConsuming`, skip the notification if a segment data manager for 
the same segment is **already registered and is not `this`** (failed) instance 
— i.e. a retry has already succeeded.
   
   **Why this fixes it:** a failed attempt never registers itself in the 
segment map, so a registered manager for the same segment name means a newer 
attempt is live; suppressing the stale "stopped consuming" prevents the 
wrongful `IdealState→OFFLINE` flip at its source. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to