Davis-Zhang-Onehouse opened a new pull request, #18871:
URL: https://github.com/apache/hudi/pull/18871

   ### Describe the issue this Pull Request addresses
   
   `HoodieStreamerWriteStatusValidator.validate()` fires **two** separate Spark 
actions on the same persisted error-table `WriteStatus` RDD whenever 
`isErrorTableWriteUnificationEnabled` is on 
(`hudi-utilities/.../StreamSync.java:1450-1452`):
   
   ```java
   totalRecords        += errorTableWriteStatusRDDOpt.map(s -> 
s.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
   totalErroredRecords += errorTableWriteStatusRDDOpt.map(s -> 
s.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
   ```
   
   Each action re-deserializes every cached `WriteStatus` block via Kryo and 
submits its own stage. In real-world production workloads with memory-strained 
executors and multi-hundred-MB cached partitions, allocation profiling shows 
Kryo `ObjectField.read` / `ObjectArraySerializer.read` accounting for ~13–14% 
of allocated bytes during this code path. The second `mapToDouble().sum()` 
doubles that cost for two scalar projections of the same `WriteStatus`, 
contributing to user-memory churn and executor OOM kills in the 
commit-validation window.
   
   ### Summary and Changelog
   
   Fold both counters into a **single** Spark action using 
`JavaRDD.aggregate(...)`, extracted into a `@VisibleForTesting static 
StreamSync.sumRecordAndErrorCounts(JavaRDD<WriteStatus>)` helper. The reduction 
is mathematically identical (commutative/associative long sums); behavior is 
preserved across all cases — unification on/off and RDD present/absent.
   
   `aggregate` is used in preference to `mapPartitions(...).reduce(...)` so 
that a 0-partition RDD — which `BaseErrorTableWriter.upsert(...)` can return 
for an empty commit (`sc.emptyRDD()`) — yields `(0L, 0L)` instead of throwing 
`UnsupportedOperationException` as `JavaRDD.reduce` would. The mutable 
`long[2]` accumulator keeps per-record allocations at zero.
   
   Adds `TestStreamSyncWriteStatusValidation` covering multi-partition (with a 
forced empty partition), empty-partition, and zero-partition (`sc.emptyRDD()`) 
cases.
   
   No code copied.
   
   ### Impact
   
   - **Performance:** halves Kryo deserialization of the cached error-table 
`WriteStatus` RDD during commit validation (~7% of allocated bytes per commit 
cycle on the observed workloads) and removes one stage submission per 
error-table commit. Compounds across the many error-table commits a 
long-running streamer makes.
   - **No public API change.** Single private method in `StreamSync.java`; the 
helper is package-private and `@VisibleForTesting`.
   - **No behavioral change** in the happy path; strict superset of the old 
behavior in the 0-partition edge case (the previous code was already safe there 
because `DoubleRDDFunctions.sum` uses `fold(0.0, _+_)`; the naive 
`mapPartitions(...).reduce(...)` rewrite would *not* have been, hence 
`aggregate`).
   
   ### Risk Level
   
   low — mathematically identical reduction; no public API surface affected; 
new test covers the empty/0-partition edge cases.
   
   ### Documentation Update
   
   none — no new config or user-facing change.
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Enough context is provided in the sections above
   - [x] Adequate tests were added if applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to