This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0344765bdf3a65a4fe2ce37a2d5ec3f5b3561429 Author: luokey <854194...@qq.com> AuthorDate: Fri Feb 3 00:56:07 2023 -0500 [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815) (cherry picked from commit 3282caa22420f3012698ec8cce376ad986034300) --- .../java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java | 5 +++-- .../org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java index 7af12487587..72f99422a8e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -135,8 +135,9 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> { * (partition + curBucket) % numPartitions == this taskID belongs to this task. */ public boolean isBucketToLoad(int bucketNumber, String partition) { - int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE; - return BucketIdentifier.mod(globalHash, parallelism) == taskID; + final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism; + int globalIndex = partitionIndex + bucketNumber; + return BucketIdentifier.mod(globalIndex, parallelism) == taskID; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java index 5fa3d1ab9a0..4e0c08b1046 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java @@ -42,7 +42,8 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner< @Override public int partition(HoodieKey key, int numPartitions) { int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum); - int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE; - return BucketIdentifier.mod(globalHash, numPartitions); + int partitionIndex = (key.getPartitionPath().hashCode() & Integer.MAX_VALUE) % numPartitions; + int globalIndex = partitionIndex + curBucket; + return BucketIdentifier.mod(globalIndex, numPartitions); } }