This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cb25653f10 [core] Adjust 'dynamic-bucket.max-buckets' random pick
logical
cb25653f10 is described below
commit cb25653f10c546eaa71c9fbafa6c0230832ae82c
Author: JingsongLi <[email protected]>
AuthorDate: Wed Feb 19 15:09:08 2025 +0800
[core] Adjust 'dynamic-bucket.max-buckets' random pick logical
---
.../java/org/apache/paimon/utils/ListUtils.java | 34 ++++++++++++++++++++++
.../apache/paimon/index/HashBucketAssigner.java | 3 +-
.../org/apache/paimon/index/PartitionIndex.java | 19 +++++++-----
.../paimon/index/SimpleHashBucketAssigner.java | 18 ++++++++----
.../paimon/table/sink/KeyAndBucketExtractor.java | 17 -----------
.../flink/sink/HashBucketAssignerOperator.java | 1 -
6 files changed, 59 insertions(+), 33 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
new file mode 100644
index 0000000000..6919ac15f4
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Utils for {@link List}. */
+public class ListUtils {
+
+ public static <T> T pickRandomly(List<T> list) {
+ checkArgument(!list.isEmpty(), "list is empty");
+ int index = ThreadLocalRandom.current().nextInt(list.size());
+ return list.get(index);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index d549f5443b..ab3d125156 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -90,8 +90,7 @@ public class HashBucketAssigner implements BucketAssigner {
int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum,
maxBucketId);
if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Assign " + assigned + " to the partition " + partition +
" key hash " + hash);
+ LOG.debug("Assign {} to the partition {} key hash {}", assigned,
partition, hash);
}
if (assigned > maxBucketId) {
maxBucketId = assigned;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 14c5a9fa74..decc1f12f1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -20,13 +20,14 @@ package org.apache.paimon.index;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;
import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.ListUtils;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -44,7 +45,8 @@ public class PartitionIndex {
public final Map<Integer, Long> nonFullBucketInformation;
- public final Set<Integer> totalBucket;
+ public final Set<Integer> totalBucketSet;
+ public final List<Integer> totalBucketArray;
private final long targetBucketRowNumber;
@@ -58,7 +60,8 @@ public class PartitionIndex {
long targetBucketRowNumber) {
this.hash2Bucket = hash2Bucket;
this.nonFullBucketInformation = bucketInformation;
- this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet());
+ this.totalBucketSet = new LinkedHashSet<>(bucketInformation.keySet());
+ this.totalBucketArray = new ArrayList<>(totalBucketSet);
this.targetBucketRowNumber = targetBucketRowNumber;
this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
this.accessed = true;
@@ -88,14 +91,15 @@ public class PartitionIndex {
}
}
- if (-1 == maxBucketsNum || totalBucket.isEmpty() || maxBucketId <
maxBucketsNum - 1) {
+ if (-1 == maxBucketsNum || totalBucketSet.isEmpty() || maxBucketId <
maxBucketsNum - 1) {
// 3. create a new bucket
for (int i = 0; i < Short.MAX_VALUE; i++) {
- if (bucketFilter.test(i) && !totalBucket.contains(i)) {
+ if (bucketFilter.test(i) && !totalBucketSet.contains(i)) {
// The new bucketId may still be larger than the upper
bound
if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
nonFullBucketInformation.put(i, 1L);
- totalBucket.add(i);
+ totalBucketSet.add(i);
+ totalBucketArray.add(i);
hash2Bucket.put(hash, (short) i);
return i;
} else {
@@ -113,8 +117,7 @@ public class PartitionIndex {
}
// 4. exceed buckets upper bound
- int bucket =
- KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash,
totalBucket.size());
+ int bucket = ListUtils.pickRandomly(totalBucketArray);
hash2Bucket.put(hash, (short) bucket);
return bucket;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
index 4f49841f79..e5249bb0a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
@@ -20,11 +20,13 @@ package org.apache.paimon.index;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.utils.Int2ShortHashMap;
+import org.apache.paimon.utils.ListUtils;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -78,10 +80,12 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
public final Int2ShortHashMap hash2Bucket = new Int2ShortHashMap();
private final Map<Integer, Long> bucketInformation;
+ private final List<Integer> bucketList;
private int currentBucket;
private SimplePartitionIndex() {
bucketInformation = new LinkedHashMap<>();
+ bucketList = new ArrayList<>();
loadNewBucket();
}
@@ -91,7 +95,13 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
return hash2Bucket.get(hash);
}
- Long num = bucketInformation.computeIfAbsent(currentBucket, i ->
0L);
+ Long num =
+ bucketInformation.computeIfAbsent(
+ currentBucket,
+ bucket -> {
+ bucketList.add(bucket);
+ return 0L;
+ });
if (num >= targetBucketRowNumber) {
if (-1 == maxBucketsNum
@@ -99,9 +109,7 @@ public class SimpleHashBucketAssigner implements
BucketAssigner {
|| maxBucketId < maxBucketsNum - 1) {
loadNewBucket();
} else {
- currentBucket =
- KeyAndBucketExtractor.bucketWithUpperBound(
- bucketInformation.keySet(), hash,
bucketInformation.size());
+ currentBucket = ListUtils.pickRandomly(bucketList);
}
}
bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L
: l + 1);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
index dcbd02508d..283aa773ee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
@@ -24,10 +24,6 @@ import org.apache.paimon.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -59,17 +55,4 @@ public interface KeyAndBucketExtractor<T> {
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
return Math.abs(hashcode % numBuckets);
}
-
- static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int
maxBucketsNum) {
- checkArgument(maxBucketsNum > 0, "Num max-buckets is illegal: " +
maxBucketsNum);
- LOG.debug(
- "Assign record (hashcode '{}') to new bucket exceed upper
bound '{}' defined in '{}', Stop creating new buckets.",
- hashcode,
- maxBucketsNum,
- DYNAMIC_BUCKET_MAX_BUCKETS.key());
- return bucketsSet.stream()
- .skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
- .findFirst()
- .orElse(0);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 5839fc98c2..5c8c1dfe4e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -49,7 +49,6 @@ public class HashBucketAssignerOperator<T> extends
AbstractStreamOperator<Tuple2
private final Integer numAssigners;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction;
private final boolean overwrite;
- private int[] maxBucketsArr;
private transient BucketAssigner assigner;
private transient PartitionKeyExtractor<T> extractor;