Davis-Zhang-Onehouse opened a new pull request, #18871: URL: https://github.com/apache/hudi/pull/18871
### Describe the issue this Pull Request addresses `HoodieStreamerWriteStatusValidator.validate()` fires **two** separate Spark actions on the same persisted error-table `WriteStatus` RDD whenever `isErrorTableWriteUnificationEnabled` is on (`hudi-utilities/.../StreamSync.java:1450-1452`): ```java totalRecords += errorTableWriteStatusRDDOpt.map(s -> s.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L); totalErroredRecords += errorTableWriteStatusRDDOpt.map(s -> s.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L); ``` Each action re-deserializes every cached `WriteStatus` block via Kryo and submits its own stage. In real-world production workloads with memory-strained executors and multi-hundred-MB cached partitions, allocation profiling shows Kryo `ObjectField.read` / `ObjectArraySerializer.read` accounting for ~13–14% of allocated bytes during this code path. The second `mapToDouble().sum()` doubles that cost for two scalar projections of the same `WriteStatus`, contributing to user-memory churn and executor OOM kills in the commit-validation window. ### Summary and Changelog Fold both counters into a **single** Spark action using `JavaRDD.aggregate(...)`, extracted into a `@VisibleForTesting static StreamSync.sumRecordAndErrorCounts(JavaRDD<WriteStatus>)` helper. The reduction is mathematically identical (commutative/associative long sums); behavior is preserved across all cases — unification on/off and RDD present/absent. `aggregate` is used in preference to `mapPartitions(...).reduce(...)` so that a 0-partition RDD — which `BaseErrorTableWriter.upsert(...)` can return for an empty commit (`sc.emptyRDD()`) — yields `(0L, 0L)` instead of throwing `UnsupportedOperationException` as `JavaRDD.reduce` would. The mutable `long[2]` accumulator keeps per-record allocations at zero. Adds `TestStreamSyncWriteStatusValidation` covering multi-partition (with a forced empty partition), empty-partition, and zero-partition (`sc.emptyRDD()`) cases. No code copied. ### Impact - **Performance:** halves Kryo deserialization of the cached error-table `WriteStatus` RDD during commit validation (~7% of allocated bytes per commit cycle on the observed workloads) and removes one stage submission per error-table commit. Compounds across the many error-table commits a long-running streamer makes. - **No public API change.** Single private method in `StreamSync.java`; the helper is package-private and `@VisibleForTesting`. - **No behavioral change** in the happy path; strict superset of the old behavior in the 0-partition edge case (the previous code was already safe there because `DoubleRDDFunctions.sum` uses `fold(0.0, _+_)`; the naive `mapPartitions(...).reduce(...)` rewrite would *not* have been, hence `aggregate`). ### Risk Level low — mathematically identical reduction; no public API surface affected; new test covers the empty/0-partition edge cases. ### Documentation Update none — no new config or user-facing change. ### Contributor's checklist - [x] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [x] Enough context is provided in the sections above - [x] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
