This is an automated email from the ASF dual-hosted git repository.
nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7af8cdf2c20e perf(streamer): fold validate() error-table WriteStatus
sums into one pass (#18871)
7af8cdf2c20e is described below
commit 7af8cdf2c20e8ac5c57adaaad9fe4279974a5877
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Thu May 28 12:06:10 2026 -0700
perf(streamer): fold validate() error-table WriteStatus sums into one pass
(#18871)
* perf(streamer): fold validate() error-table WriteStatus sums into one pass
---
.../apache/hudi/utilities/streamer/StreamSync.java | 40 ++++++++++-
.../TestStreamSyncWriteStatusValidation.java | 83 ++++++++++++++++++++++
2 files changed, 120 insertions(+), 3 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 2b1406c7deab..edeafb2755e3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -1403,6 +1403,39 @@ public class StreamSync implements Serializable,
Closeable {
}
}
+ /**
+ * Sums {@link WriteStatus#getTotalRecords()} and {@link
WriteStatus#getTotalErrorRecords()} over the
+ * given RDD in a single Spark action, returned as a {@code (totalRecords,
totalErroredRecords)} tuple.
+ *
+ * <p>Folding both counters into one {@code aggregate} pass avoids
re-deserializing every cached
+ * {@link WriteStatus} block a second time. Issuing two separate {@code
mapToDouble(...).sum()}
+ * actions on the persisted error-table {@code WriteStatus} RDD doubles Kryo
deserialization of
+ * cached partitions during commit validation and adds gratuitous heap
pressure on memory-strained
+ * executors.
+ *
+ * <p>{@code aggregate} is used (instead of {@code
mapPartitions(...).reduce(...)}) so that a
+ * 0-partition RDD (e.g. {@code sc.emptyRDD()}, which {@link
BaseErrorTableWriter#upsert} can
+ * return for an empty commit) returns {@code (0L, 0L)} rather than raising
+ * {@code UnsupportedOperationException} as {@code reduce} would. The
mutable {@code long[2]}
+ * accumulator keeps per-record allocations at zero.
+ */
+ @VisibleForTesting
+ static Tuple2<Long, Long> sumRecordAndErrorCounts(JavaRDD<WriteStatus>
writeStatuses) {
+ long[] counts = writeStatuses.aggregate(
+ new long[]{0L, 0L},
+ (acc, status) -> {
+ acc[0] += status.getTotalRecords();
+ acc[1] += status.getTotalErrorRecords();
+ return acc;
+ },
+ (left, right) -> {
+ left[0] += right[0];
+ left[1] += right[1];
+ return left;
+ });
+ return new Tuple2<>(counts[0], counts[1]);
+ }
+
/**
* WriteStatus Validator for commits to hoodie streamer data table.
* The writes to error table is taken care as well.
@@ -1447,9 +1480,10 @@ public class StreamSync implements Serializable,
Closeable {
long totalRecords = tableTotalRecords;
long totalErroredRecords = tableTotalErroredRecords;
- if (isErrorTableWriteUnificationEnabled) {
- totalRecords += errorTableWriteStatusRDDOpt.map(status ->
status.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
- totalErroredRecords += errorTableWriteStatusRDDOpt.map(status ->
status.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
+ if (isErrorTableWriteUnificationEnabled &&
errorTableWriteStatusRDDOpt.isPresent()) {
+ Tuple2<Long, Long> errorTableCounts =
sumRecordAndErrorCounts(errorTableWriteStatusRDDOpt.get());
+ totalRecords += errorTableCounts._1;
+ totalErroredRecords += errorTableCounts._2;
}
long totalSuccessfulRecords = totalRecords - totalErroredRecords;
this.totalSuccessfulRecords.set(totalSuccessfulRecords);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncWriteStatusValidation.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncWriteStatusValidation.java
new file mode 100644
index 000000000000..793cfd5de8ba
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestStreamSyncWriteStatusValidation.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link StreamSync#sumRecordAndErrorCounts(JavaRDD)}, the
single-pass fold that replaces two
+ * separate {@code mapToDouble(...).sum()} actions in error-table commit
validation.
+ */
+class TestStreamSyncWriteStatusValidation extends
SparkClientFunctionalTestHarness {
+
+ @Test
+ void sumsTotalAndErroredRecordsAcrossPartitions() {
+ List<WriteStatus> writeStatuses = new ArrayList<>();
+ writeStatuses.add(writeStatus(10L, 3L));
+ writeStatuses.add(writeStatus(20L, 5L));
+ writeStatuses.add(writeStatus(7L, 0L));
+ // More partitions than elements forces an empty partition, exercising the
aggregate fold.
+ JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(writeStatuses, 4);
+
+ Tuple2<Long, Long> counts =
StreamSync.sumRecordAndErrorCounts(writeStatusRDD);
+
+ assertEquals(37L, counts._1, "total records summed across all partitions");
+ assertEquals(8L, counts._2, "total errored records summed across all
partitions");
+ }
+
+ @Test
+ void sumsToZeroWhenNoWriteStatusesPresent() {
+ JavaRDD<WriteStatus> emptyRDD = jsc().parallelize(new
ArrayList<WriteStatus>(), 2);
+
+ Tuple2<Long, Long> counts = StreamSync.sumRecordAndErrorCounts(emptyRDD);
+
+ assertEquals(0L, counts._1);
+ assertEquals(0L, counts._2);
+ }
+
+ @Test
+ void sumsToZeroForZeroPartitionRDD() {
+ // BaseErrorTableWriter.upsert can return sc.emptyRDD() on an empty
commit; JavaRDD.reduce throws
+ // UnsupportedOperationException on a 0-partition RDD, whereas aggregate
returns the zero value.
+ JavaRDD<WriteStatus> emptyRDD = jsc().emptyRDD();
+
+ Tuple2<Long, Long> counts = StreamSync.sumRecordAndErrorCounts(emptyRDD);
+
+ assertEquals(0L, counts._1);
+ assertEquals(0L, counts._2);
+ }
+
+ private static WriteStatus writeStatus(long totalRecords, long
totalErrorRecords) {
+ WriteStatus writeStatus = new WriteStatus();
+ writeStatus.setTotalRecords(totalRecords);
+ writeStatus.setTotalErrorRecords(totalErrorRecords);
+ return writeStatus;
+ }
+}