xiangfu0 opened a new pull request, #18785: URL: https://github.com/apache/pinot/pull/18785
## Summary Realtime servers consuming **upsert/dedup** tables hold large **on-heap** primary-key metadata (`ConcurrentMapPartitionUpsertMetadataManager`'s key→record-location maps). Under sustained ingestion these grow unbounded and OOM the server JVM, taking down every table on that host. Today the only throttles are controller-driven (disk-based `ResourceUtilizationManager` + whole-table `PauseState`, slow and ZK-mediated) and the per-query OOM accountant (off by default, kills queries — not ingestion). There is no fast, **server-local** mechanism that halts ingestion when heap is about to OOM. This PR adds **`RealtimeIngestionMemoryGuard`**, a server-local, self-healing guard (mirroring the existing `RealtimeConsumptionRateManager` singleton). A daemon thread samples JVM heap; when usage crosses a tunable ratio, realtime consumers **park** inside their consume loop (stop fetching from the stream, so memory stops growing) and **resume automatically** once heap recovers. No controller/ZK involvement. ## Design - **Two independent triggers:** global heap-usage ratio with hysteresis (pause ≥ `pauseRatio`, resume ≤ `resumeRatio`), plus an optional per-(table, partition) upsert primary-key count cap. - **Mode** `ALL` / `UPSERT_DEDUP_ONLY` / `DISABLED` controls which tables are guarded. - **Pause point:** only while a segment is in `INITIAL_CONSUMING`. Catch-up states (`CATCHING_UP` / `CONSUMING_TO_ONLINE`) are intentionally **not** paused, because stalling them would wedge the Helix CONSUMING→ONLINE transition. - **Safety:** the park sleep sits above the fetch and is interrupt-safe (prompt `stop()`); end-criteria are still evaluated every iteration so force-commit/time-limit/end-of-partition keep precedence; the idle timer is reset on resume; and the heap-pressure trigger **fails open** if the sampler thread ever dies, so it can never wedge ingestion. ### Configuration (server-level, prefix `pinot.server.consumption.memory.`) | Key | Default | Meaning | |---|---|---| | `pause.mode` | `UPSERT_DEDUP_ONLY` | `ALL` / `UPSERT_DEDUP_ONLY` / `DISABLED` | | `pause.heap.usage.ratio` | `0.85` | pause when usedHeap/maxHeap ≥ this | | `resume.heap.usage.ratio` | `0.75` | resume when usedHeap/maxHeap ≤ this (hysteresis) | | `check.interval.ms` | `1000` | heap sampling / pause re-check interval | | `pause.primary.key.cap` | `0` (off) | per-(table,partition) upsert primary-key cap | ### Metrics - `ServerGauge.REALTIME_INGESTION_MEMORY_PAUSED` (global 0/1) - `ServerMeter.REALTIME_CONSUMPTION_PAUSED_MEMORY` (per-table, counts pause cycles) — lets dashboards/alerts tell "deliberately paused" from "wedged". ## Limitation (by design) Pausing halts memory **growth** and buys time for GC, segment commits, and TTL eviction; it does **not** by itself reclaim already-allocated upsert primary-key metadata (committing a consuming segment does not free it). This is the right primitive for "stop ingestion to prevent OOM", documented in the guard's Javadoc. ## Backward compatibility / release note **This feature is ON by default** (mode `UPSERT_DEDUP_ONLY`): on upgrade, servers will pause upsert/dedup realtime consumption when heap usage exceeds 0.85, resuming below 0.75. Set `pinot.server.consumption.memory.pause.mode=DISABLED` to turn it off, or `ALL` for whole-server coverage. The new `getNumberOfPrimaryKeys()` on the `PartitionUpsertMetadataManager` interface is a `default` method, so existing out-of-tree implementations remain binary/source compatible. ## Testing - `RealtimeIngestionMemoryGuardTest` (10 tests): hysteresis incl. exact boundaries, mode/eligibility matrix, primary-key-cap trigger, fail-open on stale heartbeat, unknown-max-heap, and config validation fallback. - `RealtimeSegmentDataManagerTest` (2 new tests): consume loop pauses then resumes under simulated pressure in `INITIAL_CONSUMING`, and the guard is **not** consulted while `CATCHING_UP`. - spotless / checkstyle / license clean on all touched modules. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
