nsivabalan opened a new pull request, #18860: URL: https://github.com/apache/hudi/pull/18860
### Change Logs Adds a dynamic per-task memory ceiling on `HoodieAppendHandle`'s in-memory record buffer, applied as an upper bound on top of `hoodie.logfile.data.block.max.size`. Closes #18859. The flush gate in `flushToDiskIfRequired` now uses ``` effectiveBlockSize = min(hoodie.logfile.data.block.max.size, dynamicCeiling) ``` where `dynamicCeiling = executorMemory * (1 - spark.memory.fraction) / task_slots * hoodie.memory.logfile.append.fraction`, floored at `HoodieMemoryConfig.MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES` (16 MB — smaller than the 100 MB spillable-map floor because many append handles can be active concurrently in a single task). When the engine does not expose memory/cores (Flink's `TaskContextSupplier.getProperty` returns `Option.empty()` unconditionally; Spark also returns empty if `SparkEnv` is absent), the new sibling `IOUtils` helper returns `Option.empty()` and `effectiveBlockSize` collapses to `maxBlockSize` — no behavior change on those engines. **Stacked on #18843.** That PR makes `averageRecordSize` accurate by sizing the buffered post-`prepareRecord` record; without that, the dynamic cap is computed against an under-counted estimate and the gate still misses. Please merge #18843 first. ### New config - `hoodie.memory.logfile.append.fraction` (default `0.6`, advanced) — fraction of per-task user memory available to the append-handle buffer. Default matches `hoodie.memory.merge.fraction` and `hoodie.memory.compaction.fraction`. Users who never set it get sensible behavior. ### Implementation - **`HoodieMemoryConfig`**: new `MAX_MEMORY_FRACTION_FOR_LOG_APPEND` ConfigProperty + `MIN_MEMORY_FOR_LOG_APPEND_BUFFER_IN_BYTES` constant (16 MB). - **`IOUtils`**: new `getMaxMemoryAllowedForLogAppend(supplier, fraction, minFloor): Option<Long>`. Distinct from the existing `getMaxMemoryAllowedForMerge` in two ways: returns `Option` (lets callers fall back to a static cap rather than the spillable-map 1GB default when engine properties are absent), and accepts the floor as a parameter (so the 100 MB spillable-map floor and the smaller 16 MB log-append floor cannot collide on a shared constant). Honors `EngineProperty.SINGLE_TASK_CORES` like the existing helper. Existing merge/compaction callers untouched. - **`HoodieAppendHandle`**: `maxBlockSize` moves to constructor init alongside a new `effectiveBlockSize` field; both `maxBlockSize` references in `flushToDiskIfRequired` swap to `effectiveBlockSize`. INFO log line when the dynamic cap is below the configured ceiling. `@VisibleForTesting` getters for `effectiveBlockSize` and `maxBlockSize`. ### Behavior - **Avro engine writes**: unaffected — the heap pressure addressed here is most acute on Spark engines where `prepareRecord` materializes the Avro graph. - **Spark engine writes**: gate trips earlier on tight executors, buffer stays bounded by per-task heap, downstream serialization OOMs prevented. - **Flink**: unchanged. - `canWrite` / `estimatedNumberOfBytesWritten` still use `maxBlockSize` indirectly to roll log-file groups; this change is intentionally scoped to the flush gate only. Lowering log-file-group rolling thresholds is a follow-up if warranted. - Small Spark executors will see smaller, more numerous log blocks — intentional tradeoff vs. OOM. Read-path compaction merges blocks, so on-disk file count is unaffected. ### Impact **On Spark**: a job running with `hoodie.logfile.data.block.max.size=256m` (default) on a 2 GB / 4 core executor with `spark.memory.fraction=0.6` and the default `hoodie.memory.logfile.append.fraction=0.6` will now cap the buffer at `2GB * 0.4 / 4 * 0.6 = 120 MB` instead of 256 MB. On a generous 16 GB / 4 core executor, the dynamic ceiling (~960 MB) exceeds the 256 MB configured ceiling, so the configured value wins. **No regression on appropriately-sized executors.** **On Flink**: zero behavior change. The cap depends on engine-property exposure that Flink does not provide. ### Risk level (write none, low medium or high below) low ### Documentation Update - New advanced config `hoodie.memory.logfile.append.fraction` will appear in the auto-generated config docs (via `markAdvanced()` + `withDocumentation(...)`). - No migration steps required. Existing deployments get the new behavior automatically on Spark and continue to behave identically on Flink. ### Contributor's checklist - [x] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [x] Change Logs and Impact were stated clearly consistent with the [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute). - [x] Adequate tests were added if applicable. See `TestHoodieAppendHandle` (5 new tests covering fallback, cap, floor, ceiling-wins, gate-fires-early) and new `TestIOUtils` (5 tests covering empty-when-any-prop-missing, formula, parameterized floor, `SINGLE_TASK_CORES` accounting). - [x] CI passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
