This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ea8f9258ec1 [HUDI-6863] Revert auto-tuning of dedup parallelism (#9722)
ea8f9258ec1 is described below

commit ea8f9258ec1b2c281d88e58855750a3aa229f6f3
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Fri Sep 15 18:18:20 2023 -0700

    [HUDI-6863] Revert auto-tuning of dedup parallelism (#9722)
    
    Before this PR, the auto-tuning logic for dedup parallelism dictates the 
write parallelism so that the user-configured 
`hoodie.upsert.shuffle.parallelism` is ignored.  This commit reverts #6802 to 
fix the issue.
---
 .../org/apache/hudi/table/action/commit/HoodieWriteHelper.java     | 7 ++-----
 .../client/functional/TestHoodieClientOnCopyOnWriteStorage.java    | 6 +++---
 2 files changed, 5 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index d7640c28e50..b56ac08e16f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -60,9 +60,6 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
       HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int 
parallelism, String schemaStr, TypedProperties props, HoodieRecordMerger 
merger) {
     boolean isIndexingGlobal = index.isGlobal();
     final SerializableSchema schema = new SerializableSchema(schemaStr);
-    // Auto-tunes the parallelism for reduce transformation based on the 
number of data partitions
-    // in engine-specific representation
-    int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), 
parallelism));
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their 
partitionPath
@@ -74,7 +71,7 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
     }).reduceByKey((rec1, rec2) -> {
       HoodieRecord<T> reducedRecord;
       try {
-        reducedRecord =  merger.merge(rec1, schema.get(), rec2, schema.get(), 
props).get().getLeft();
+        reducedRecord = merger.merge(rec1, schema.get(), rec2, schema.get(), 
props).get().getLeft();
       } catch (IOException e) {
         throw new HoodieException(String.format("Error to merge two records, 
%s, %s", rec1, rec2), e);
       }
@@ -82,6 +79,6 @@ public class HoodieWriteHelper<T, R> extends 
BaseWriteHelper<T, HoodieData<Hoodi
       HoodieKey reducedKey = choosePrev ? rec1.getKey() : rec2.getKey();
       HoodieOperation operation = choosePrev ? rec1.getOperation() : 
rec2.getOperation();
       return reducedRecord.newInstance(reducedKey, operation);
-    }, reduceParallelism).map(Pair::getRight);
+    }, parallelism).map(Pair::getRight);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 5fcc4c0adf3..764be044bc2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -479,12 +479,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    int dedupParallelism = records.getNumPartitions() + 100;
+    int dedupParallelism = records.getNumPartitions() + 2;
     HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd =
         (HoodieData<HoodieRecord<RawTripTestPayload>>) 
HoodieWriteHelper.newInstance()
             .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), 
HoodiePreCombineAvroRecordMerger.INSTANCE);
     List<HoodieRecord<RawTripTestPayload>> dedupedRecs = 
dedupedRecsRdd.collectAsList();
-    assertEquals(records.getNumPartitions(), 
dedupedRecsRdd.getNumPartitions());
+    assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
     assertEquals(dedupedRecs.get(0).getPartitionPath(), 
recordThree.getPartitionPath());
     assertNodupesWithinPartition(dedupedRecs);
@@ -496,7 +496,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
         (HoodieData<HoodieRecord<RawTripTestPayload>>) 
HoodieWriteHelper.newInstance()
             .deduplicateRecords(records, index, dedupParallelism, 
writeConfig.getSchema(), writeConfig.getProps(), 
HoodiePreCombineAvroRecordMerger.INSTANCE);
     dedupedRecs = dedupedRecsRdd.collectAsList();
-    assertEquals(records.getNumPartitions(), 
dedupedRecsRdd.getNumPartitions());
+    assertEquals(dedupParallelism, dedupedRecsRdd.getNumPartitions());
     assertEquals(2, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 

Reply via email to