nsivabalan opened a new pull request, #18860:
URL: https://github.com/apache/hudi/pull/18860

   ### Change Logs
   
   Adds a dynamic per-task memory ceiling on `HoodieAppendHandle`'s in-memory 
record buffer, applied as an upper bound on top of 
`hoodie.logfile.data.block.max.size`. Closes #18859.
   
   The flush gate in `flushToDiskIfRequired` now uses
   ```
   effectiveBlockSize = min(hoodie.logfile.data.block.max.size, dynamicCeiling)
   ```
   where `dynamicCeiling = executorMemory * (1 - spark.memory.fraction) / 
task_slots * hoodie.memory.logfile.append.fraction`, floored at 
`HoodieMemoryConfig.MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES` (16 MB — smaller 
than the 100 MB spillable-map floor because many append handles can be active 
concurrently in a single task).
   
   When the engine does not expose memory/cores (Flink's 
`TaskContextSupplier.getProperty` returns `Option.empty()` unconditionally; 
Spark also returns empty if `SparkEnv` is absent), the new sibling `IOUtils` 
helper returns `Option.empty()` and `effectiveBlockSize` collapses to 
`maxBlockSize` — no behavior change on those engines.
   
   **Stacked on #18843.** That PR makes `averageRecordSize` accurate by sizing 
the buffered post-`prepareRecord` record; without that, the dynamic cap is 
computed against an under-counted estimate and the gate still misses. Please 
merge #18843 first.
   
   ### New config
   
   - `hoodie.memory.logfile.append.fraction` (default `0.6`, advanced) — 
fraction of per-task user memory available to the append-handle buffer. Default 
matches `hoodie.memory.merge.fraction` and `hoodie.memory.compaction.fraction`. 
Users who never set it get sensible behavior.
   
   ### Implementation
   
   - **`HoodieMemoryConfig`**: new `MAX_MEMORY_FRACTION_FOR_LOG_APPEND` 
ConfigProperty + `MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES` constant (16 MB).
   - **`IOUtils`**: new `getMaxMemoryAllowedForLogAppend(supplier, fraction, 
minFloor): Option<Long>`. Distinct from the existing 
`getMaxMemoryAllowedForMerge` in two ways: returns `Option` (lets callers fall 
back to a static cap rather than the spillable-map 1GB default when engine 
properties are absent), and accepts the floor as a parameter (so the 100 MB 
spillable-map floor and the smaller 16 MB log-append floor cannot collide on a 
shared constant). Honors `EngineProperty.SINGLE_TASK_CORES` like the existing 
helper. Existing merge/compaction callers untouched.
   - **`HoodieAppendHandle`**: `maxBlockSize` moves to constructor init 
alongside a new `effectiveBlockSize` field; both `maxBlockSize` references in 
`flushToDiskIfRequired` swap to `effectiveBlockSize`. INFO log line when the 
dynamic cap is below the configured ceiling. `@VisibleForTesting` getters for 
`effectiveBlockSize` and `maxBlockSize`.
   
   ### Behavior
   
   - **Avro engine writes**: unaffected — the heap pressure addressed here is 
most acute on Spark engines where `prepareRecord` materializes the Avro graph.
   - **Spark engine writes**: gate trips earlier on tight executors, buffer 
stays bounded by per-task heap, downstream serialization OOMs prevented.
   - **Flink**: unchanged.
   - `canWrite` / `estimatedNumberOfBytesWritten` still use `maxBlockSize` 
indirectly to roll log-file groups; this change is intentionally scoped to the 
flush gate only. Lowering log-file-group rolling thresholds is a follow-up if 
warranted.
   - Small Spark executors will see smaller, more numerous log blocks — 
intentional tradeoff vs. OOM. Read-path compaction merges blocks, so on-disk 
file count is unaffected.
   
   ### Impact
   
   **On Spark**: a job running with `hoodie.logfile.data.block.max.size=256m` 
(default) on a 2 GB / 4 core executor with `spark.memory.fraction=0.6` and the 
default `hoodie.memory.logfile.append.fraction=0.6` will now cap the buffer at 
`2GB * 0.4 / 4 * 0.6 = 120 MB` instead of 256 MB. On a generous 16 GB / 4 core 
executor, the dynamic ceiling (~960 MB) exceeds the 256 MB configured ceiling, 
so the configured value wins. **No regression on appropriately-sized 
executors.**
   
   **On Flink**: zero behavior change. The cap depends on engine-property 
exposure that Flink does not provide.
   
   ### Risk level (write none, low medium or high below)
   
   low
   
   ### Documentation Update
   
   - New advanced config `hoodie.memory.logfile.append.fraction` will appear in 
the auto-generated config docs (via `markAdvanced()` + 
`withDocumentation(...)`).
   - No migration steps required. Existing deployments get the new behavior 
automatically on Spark and continue to behave identically on Flink.
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Change Logs and Impact were stated clearly consistent with the 
[contributor's guide](https://hudi.apache.org/contribute/how-to-contribute).
   - [x] Adequate tests were added if applicable. See `TestHoodieAppendHandle` 
(5 new tests covering fallback, cap, floor, ceiling-wins, gate-fires-early) and 
new `TestIOUtils` (5 tests covering empty-when-any-prop-missing, formula, 
parameterized floor, `SINGLE_TASK_CORES` accounting).
   - [x] CI passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to