This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new ea8f9258ec1 [HUDI-6863] Revert auto-tuning of dedup parallelism (#9722) ea8f9258ec1 is described below commit ea8f9258ec1b2c281d88e58855750a3aa229f6f3 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Fri Sep 15 18:18:20 2023 -0700 [HUDI-6863] Revert auto-tuning of dedup parallelism (#9722) Before this PR, the auto-tuning logic for dedup parallelism dictates the write parallelism so that the user-configured `hoodie.upsert.shuffle.parallelism` is ignored. This commit reverts #6802 to fix the issue. --- .../org/apache/hudi/table/action/commit/HoodieWriteHelper.java | 7 ++----- .../client/functional/TestHoodieClientOnCopyOnWriteStorage.java | 6 +++--- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java index d7640c28e50..b56ac08e16f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java @@ -60,9 +60,6 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger merger) { boolean isIndexingGlobal = index.isGlobal(); final SerializableSchema schema = new SerializableSchema(schemaStr); - // Auto-tunes the parallelism for reduce transformation based on the number of data partitions - // in engine-specific representation - int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism)); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -74,7 +71,7 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi }).reduceByKey((rec1, rec2) -> { HoodieRecord<T> reducedRecord; try { - reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); + reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), props).get().getLeft(); } catch (IOException e) { throw new HoodieException(String.format("Error to merge two records, %s, %s", rec1, rec2), e); } @@ -82,6 +79,6 @@ public class HoodieWriteHelper<T, R> extends BaseWriteHelper<T, HoodieData<Hoodi HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey(); HoodieOperation operation = choosePrev ? rec1.getOperation() : rec2.getOperation(); return reducedRecord.newInstance(reducedKey, operation); - }, reduceParallelism).map(Pair::getRight); + }, parallelism).map(Pair::getRight); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 5fcc4c0adf3..764be044bc2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -479,12 +479,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // Global dedup should be done based on recordKey only HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(true); - int dedupParallelism = records.getNumPartitions() + 100; + int dedupParallelism = records.getNumPartitions() + 2; HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(1, dedupedRecs.size()); assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath()); assertNodupesWithinPartition(dedupedRecs); @@ -496,7 +496,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { (HoodieData<HoodieRecord<RawTripTestPayload>>) HoodieWriteHelper.newInstance() .deduplicateRecords(records, index, dedupParallelism, writeConfig.getSchema(), writeConfig.getProps(), HoodiePreCombineAvroRecordMerger.INSTANCE); dedupedRecs = dedupedRecsRdd.collectAsList(); - assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions()); + assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions()); assertEquals(2, dedupedRecs.size()); assertNodupesWithinPartition(dedupedRecs);