This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cf22950c16 [core] Support upper bound in dynamic bucket mode (#4974)
cf22950c16 is described below

commit cf22950c1670b2167994c6d50193b09745a0ddc3
Author: Yubin Li <[email protected]>
AuthorDate: Wed Feb 19 14:49:19 2025 +0800

    [core] Support upper bound in dynamic bucket mode (#4974)
---
 .../content/primary-key-table/data-distribution.md |   1 +
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 ++
 .../apache/paimon/index/HashBucketAssigner.java    |  11 +-
 .../org/apache/paimon/index/PartitionIndex.java    |  48 +++++---
 .../paimon/index/SimpleHashBucketAssigner.java     |  34 +++++-
 .../paimon/table/sink/KeyAndBucketExtractor.java   |  21 ++++
 .../paimon/index/HashBucketAssignerTest.java       | 136 ++++++++++++++++++++-
 .../paimon/index/SimpleHashBucketAssignerTest.java |  79 +++++++++++-
 .../flink/sink/HashBucketAssignerOperator.java     |  12 +-
 .../paimon/spark/commands/BucketProcessor.scala    |   5 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |   6 +-
 12 files changed, 330 insertions(+), 41 deletions(-)

diff --git a/docs/content/primary-key-table/data-distribution.md 
b/docs/content/primary-key-table/data-distribution.md
index 3a066031d4..baf3327ed9 100644
--- a/docs/content/primary-key-table/data-distribution.md
+++ b/docs/content/primary-key-table/data-distribution.md
@@ -48,6 +48,7 @@ Paimon will automatically expand the number of buckets.
 
 - Option1: `'dynamic-bucket.target-row-num'`: controls the target row number 
for one bucket.
 - Option2: `'dynamic-bucket.initial-buckets'`: controls the number of 
initialized bucket.
+- Option3: `'dynamic-bucket.max-buckets'`: controls the number of max buckets.
 
 {{< hint info >}}
 Dynamic Bucket only support single write job. Please do not start multiple 
jobs to write to the same partition
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d650670bf8..e3e1c0f673 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -290,6 +290,12 @@ under the License.
             <td>Integer</td>
             <td>Initial buckets for a partition in assigner operator for 
dynamic bucket mode.</td>
         </tr>
+        <tr>
+            <td><h5>dynamic-bucket.max-buckets</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Integer</td>
+            <td>Max buckets for a partition in dynamic bucket mode, It should 
either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper 
bound).</td>
+        </tr>
         <tr>
             <td><h5>dynamic-bucket.target-row-num</h5></td>
             <td style="word-wrap: break-word;">2000000</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index ab84aed36f..380cf84744 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1083,6 +1083,14 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Initial buckets for a partition in assigner 
operator for dynamic bucket mode.");
 
+    public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS =
+            key("dynamic-bucket.max-buckets")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "Max buckets for a partition in dynamic bucket 
mode, It should "
+                                    + "either be equal to -1 (unlimited), or 
it must be greater than 0 (fixed upper bound).");
+
     public static final ConfigOption<Integer> 
DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
             key("dynamic-bucket.assigner-parallelism")
                     .intType()
@@ -2226,6 +2234,10 @@ public class CoreOptions implements Serializable {
         return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
     }
 
+    public Integer dynamicBucketMaxBuckets() {
+        return options.get(DYNAMIC_BUCKET_MAX_BUCKETS);
+    }
+
     public Integer dynamicBucketAssignerParallelism() {
         return options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
index 60bade8177..d549f5443b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashBucketAssigner.java
@@ -45,6 +45,8 @@ public class HashBucketAssigner implements BucketAssigner {
     private final int numAssigners;
     private final int assignId;
     private final long targetBucketRowNumber;
+    private final int maxBucketsNum;
+    private int maxBucketId;
 
     private final Map<BinaryRow, PartitionIndex> partitionIndex;
 
@@ -55,7 +57,8 @@ public class HashBucketAssigner implements BucketAssigner {
             int numChannels,
             int numAssigners,
             int assignId,
-            long targetBucketRowNumber) {
+            long targetBucketRowNumber,
+            int maxBucketsNum) {
         this.snapshotManager = snapshotManager;
         this.commitUser = commitUser;
         this.indexFileHandler = indexFileHandler;
@@ -64,6 +67,7 @@ public class HashBucketAssigner implements BucketAssigner {
         this.assignId = assignId;
         this.targetBucketRowNumber = targetBucketRowNumber;
         this.partitionIndex = new HashMap<>();
+        this.maxBucketsNum = maxBucketsNum;
     }
 
     /** Assign a bucket for key hash of a record. */
@@ -84,11 +88,14 @@ public class HashBucketAssigner implements BucketAssigner {
             this.partitionIndex.put(partition, index);
         }
 
-        int assigned = index.assign(hash, this::isMyBucket);
+        int assigned = index.assign(hash, this::isMyBucket, maxBucketsNum, 
maxBucketId);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Assign " + assigned + " to the partition " + partition + 
" key hash " + hash);
         }
+        if (assigned > maxBucketId) {
+            maxBucketId = assigned;
+        }
         return assigned;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java 
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index bace2c1ac1..14c5a9fa74 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -20,6 +20,7 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 import org.apache.paimon.utils.Int2ShortHashMap;
 import org.apache.paimon.utils.IntIterator;
 
@@ -27,8 +28,8 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -57,13 +58,13 @@ public class PartitionIndex {
             long targetBucketRowNumber) {
         this.hash2Bucket = hash2Bucket;
         this.nonFullBucketInformation = bucketInformation;
-        this.totalBucket = new HashSet<>(bucketInformation.keySet());
+        this.totalBucket = new LinkedHashSet<>(bucketInformation.keySet());
         this.targetBucketRowNumber = targetBucketRowNumber;
         this.lastAccessedCommitIdentifier = Long.MIN_VALUE;
         this.accessed = true;
     }
 
-    public int assign(int hash, IntPredicate bucketFilter) {
+    public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum, 
int maxBucketId) {
         accessed = true;
 
         // 1. is it a key that has appeared before
@@ -80,29 +81,42 @@ public class PartitionIndex {
             Long number = entry.getValue();
             if (number < targetBucketRowNumber) {
                 entry.setValue(number + 1);
-                hash2Bucket.put(hash, bucket.shortValue());
+                hash2Bucket.put(hash, (short) bucket.intValue());
                 return bucket;
             } else {
                 iterator.remove();
             }
         }
 
-        // 3. create a new bucket
-        for (int i = 0; i < Short.MAX_VALUE; i++) {
-            if (bucketFilter.test(i) && !totalBucket.contains(i)) {
-                hash2Bucket.put(hash, (short) i);
-                nonFullBucketInformation.put(i, 1L);
-                totalBucket.add(i);
-                return i;
+        if (-1 == maxBucketsNum || totalBucket.isEmpty() || maxBucketId < 
maxBucketsNum - 1) {
+            // 3. create a new bucket
+            for (int i = 0; i < Short.MAX_VALUE; i++) {
+                if (bucketFilter.test(i) && !totalBucket.contains(i)) {
+                    // The new bucketId may still be larger than the upper 
bound
+                    if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
+                        nonFullBucketInformation.put(i, 1L);
+                        totalBucket.add(i);
+                        hash2Bucket.put(hash, (short) i);
+                        return i;
+                    } else {
+                        // No need to enter the next iteration when upper 
bound exceeded
+                        break;
+                    }
+                }
+            }
+            if (-1 == maxBucketsNum) {
+                throw new RuntimeException(
+                        String.format(
+                                "Too more bucket %s, you should increase 
target bucket row number %s.",
+                                maxBucketId, targetBucketRowNumber));
             }
         }
 
-        @SuppressWarnings("OptionalGetWithoutIsPresent")
-        int maxBucket = 
totalBucket.stream().mapToInt(Integer::intValue).max().getAsInt();
-        throw new RuntimeException(
-                String.format(
-                        "Too more bucket %s, you should increase target bucket 
row number %s.",
-                        maxBucket, targetBucketRowNumber));
+        // 4. exceed buckets upper bound
+        int bucket =
+                KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, 
totalBucket.size());
+        hash2Bucket.put(hash, (short) bucket);
+        return bucket;
     }
 
     public static PartitionIndex loadIndex(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
index 5f7599370d..4f49841f79 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java
@@ -20,9 +20,11 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 import org.apache.paimon.utils.Int2ShortHashMap;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,14 +34,18 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
     private final int numAssigners;
     private final int assignId;
     private final long targetBucketRowNumber;
+    private final int maxBucketsNum;
+    private int maxBucketId;
 
     private final Map<BinaryRow, SimplePartitionIndex> partitionIndex;
 
-    public SimpleHashBucketAssigner(int numAssigners, int assignId, long 
targetBucketRowNumber) {
+    public SimpleHashBucketAssigner(
+            int numAssigners, int assignId, long targetBucketRowNumber, int 
maxBucketsNum) {
         this.numAssigners = numAssigners;
         this.assignId = assignId;
         this.targetBucketRowNumber = targetBucketRowNumber;
         this.partitionIndex = new HashMap<>();
+        this.maxBucketsNum = maxBucketsNum;
     }
 
     @Override
@@ -50,7 +56,11 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
             index = new SimplePartitionIndex();
             this.partitionIndex.put(partition, index);
         }
-        return index.assign(hash);
+        int assigned = index.assign(hash);
+        if (assigned > maxBucketId) {
+            maxBucketId = assigned;
+        }
+        return assigned;
     }
 
     @Override
@@ -71,7 +81,7 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
         private int currentBucket;
 
         private SimplePartitionIndex() {
-            bucketInformation = new HashMap<>();
+            bucketInformation = new LinkedHashMap<>();
             loadNewBucket();
         }
 
@@ -82,8 +92,17 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
             }
 
             Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 
0L);
+
             if (num >= targetBucketRowNumber) {
-                loadNewBucket();
+                if (-1 == maxBucketsNum
+                        || bucketInformation.isEmpty()
+                        || maxBucketId < maxBucketsNum - 1) {
+                    loadNewBucket();
+                } else {
+                    currentBucket =
+                            KeyAndBucketExtractor.bucketWithUpperBound(
+                                    bucketInformation.keySet(), hash, 
bucketInformation.size());
+                }
             }
             bucketInformation.compute(currentBucket, (i, l) -> l == null ? 1L 
: l + 1);
             hash2Bucket.put(hash, (short) currentBucket);
@@ -93,7 +112,12 @@ public class SimpleHashBucketAssigner implements 
BucketAssigner {
         private void loadNewBucket() {
             for (int i = 0; i < Short.MAX_VALUE; i++) {
                 if (isMyBucket(i) && !bucketInformation.containsKey(i)) {
-                    currentBucket = i;
+                    // The new bucketId may still be larger than the upper 
bound
+                    if (-1 == maxBucketsNum || i <= maxBucketsNum - 1) {
+                        currentBucket = i;
+                        return;
+                    }
+                    // No need to enter the next iteration when upper bound 
exceeded
                     return;
                 }
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
index 0b0b1a154b..dcbd02508d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
@@ -21,6 +21,13 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.types.RowKind;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -31,6 +38,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * @param <T> type of record
  */
 public interface KeyAndBucketExtractor<T> {
+    Logger LOG = LoggerFactory.getLogger(KeyAndBucketExtractor.class);
 
     void setRecord(T record);
 
@@ -51,4 +59,17 @@ public interface KeyAndBucketExtractor<T> {
         checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
         return Math.abs(hashcode % numBuckets);
     }
+
+    static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int 
maxBucketsNum) {
+        checkArgument(maxBucketsNum > 0, "Num max-buckets is illegal: " + 
maxBucketsNum);
+        LOG.debug(
+                "Assign record (hashcode '{}') to new bucket exceed upper 
bound '{}' defined in '{}', Stop creating new buckets.",
+                hashcode,
+                maxBucketsNum,
+                DYNAMIC_BUCKET_MAX_BUCKETS.key());
+        return bucketsSet.stream()
+                .skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
+                .findFirst()
+                .orElse(0);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 1b4a7b1be5..b9c6a28378 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -27,9 +27,12 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamTableCommit;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -63,7 +66,21 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
                 numChannels,
                 numAssigners,
                 assignId,
-                5);
+                5,
+                -1);
+    }
+
+    private HashBucketAssigner createAssigner(
+            int numChannels, int numAssigners, int assignId, int 
maxBucketsNum) {
+        return new HashBucketAssigner(
+                table.snapshotManager(),
+                commitUser,
+                fileHandler,
+                numChannels,
+                numAssigners,
+                assignId,
+                5,
+                maxBucketsNum);
     }
 
     @Test
@@ -92,8 +109,93 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
     }
 
     @Test
-    public void testPartitionCopy() {
-        HashBucketAssigner assigner = createAssigner(1, 1, 0);
+    public void testAssignWithUpperBound() {
+        HashBucketAssigner assigner = createAssigner(2, 2, 0, 2);
+
+        // assign
+        assertThat(assigner.assign(row(1), 0)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 2)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 4)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 8)).isEqualTo(0);
+
+        // full
+        assertThat(assigner.assign(row(1), 10)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 12)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 14)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 16)).isEqualTo(0);
+        assertThat(assigner.assign(row(1), 18)).isEqualTo(0);
+
+        // another partition
+        assertThat(assigner.assign(row(2), 12)).isEqualTo(0);
+
+        // read assigned
+        assertThat(assigner.assign(row(1), 6)).isEqualTo(0);
+
+        // not mine
+        assertThatThrownBy(() -> assigner.assign(row(1), 1))
+                .hasMessageContaining("This is a bug, record assign id");
+
+        // exceed buckets upper bound
+        // partition 1
+        int hash = 18;
+        for (int i = 0; i < 200; i++) {
+            int bucket = assigner.assign(row(1), hash += 2);
+            Assertions.assertThat(bucket).isIn(0, 2);
+        }
+        // partition 2
+        hash = 12;
+        for (int i = 0; i < 200; i++) {
+            int bucket = assigner.assign(row(2), hash += 2);
+            Assertions.assertThat(bucket).isIn(0, 2);
+        }
+    }
+
+    @Test
+    public void testAssignWithUpperBoundMultiAssigners() {
+        HashBucketAssigner assigner0 = createAssigner(2, 2, 0, 3);
+        HashBucketAssigner assigner1 = createAssigner(2, 2, 1, 3);
+
+        // assigner0: assign
+        assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 4)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
+
+        // assigner0: full
+        assertThat(assigner0.assign(row(1), 10)).isEqualTo(2);
+        assertThat(assigner0.assign(row(1), 12)).isEqualTo(2);
+        assertThat(assigner0.assign(row(1), 14)).isEqualTo(2);
+        assertThat(assigner0.assign(row(1), 16)).isEqualTo(2);
+        assertThat(assigner0.assign(row(1), 18)).isEqualTo(2);
+
+        // assigner0: exceed buckets upper bound
+        int hash = 18;
+        for (int i = 0; i < 200; i++) {
+            int bucket = assigner0.assign(row(2), hash += 2);
+            Assertions.assertThat(bucket).isIn(0, 2);
+        }
+
+        // assigner1: assign
+        assertThat(assigner1.assign(row(1), 1)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 3)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 5)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 7)).isEqualTo(1);
+        assertThat(assigner1.assign(row(1), 9)).isEqualTo(1);
+
+        // assigner1: exceed buckets upper bound
+        hash = 9;
+        for (int i = 0; i < 200; i++) {
+            int bucket = assigner1.assign(row(2), hash += 2);
+            Assertions.assertThat(bucket).isIn(1);
+        }
+    }
+
+    @ParameterizedTest(name = "maxBuckets: {0}")
+    @ValueSource(ints = {-1, 1, 2})
+    public void testPartitionCopy(int maxBucketsNum) {
+        HashBucketAssigner assigner = createAssigner(1, 1, 0, maxBucketsNum);
 
         BinaryRow partition = row(1);
         assertThat(assigner.assign(partition, 0)).isEqualTo(0);
@@ -144,6 +246,34 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         assertThat(assigner0.assign(row(1), 17)).isEqualTo(3);
     }
 
+    @Test
+    public void testAssignRestoreWithUpperBound() {
+        IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
+        IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
+        commit.commit(
+                0,
+                Arrays.asList(
+                        createCommitMessage(row(1), 0, bucket0),
+                        createCommitMessage(row(1), 2, bucket2)));
+
+        HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
+        HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);
+
+        // read assigned
+        assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 4)).isEqualTo(2);
+        assertThat(assigner0.assign(row(1), 5)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 7)).isEqualTo(2);
+
+        // new assign
+        assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 11)).isEqualTo(0);
+        assertThat(assigner0.assign(row(1), 14)).isEqualTo(0);
+        assertThat(assigner2.assign(row(1), 16)).isEqualTo(2);
+        // exceed buckets upper bound
+        assertThat(assigner0.assign(row(1), 17)).isEqualTo(0);
+    }
+
     @Test
     public void testAssignDecoupled() {
         HashBucketAssigner assigner1 = createAssigner(3, 2, 1);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
index d1b26019fc..2e2e53b7ef 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.BinaryRow;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -31,7 +33,8 @@ public class SimpleHashBucketAssignerTest {
 
     @Test
     public void testAssign() {
-        SimpleHashBucketAssigner simpleHashBucketAssigner = new 
SimpleHashBucketAssigner(2, 0, 100);
+        SimpleHashBucketAssigner simpleHashBucketAssigner =
+                new SimpleHashBucketAssigner(2, 0, 100, -1);
 
         BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
         int hash = 0;
@@ -51,8 +54,71 @@ public class SimpleHashBucketAssignerTest {
     }
 
     @Test
-    public void testAssignWithSameHash() {
-        SimpleHashBucketAssigner simpleHashBucketAssigner = new 
SimpleHashBucketAssigner(2, 0, 100);
+    public void testAssignWithUpperBound() {
+        SimpleHashBucketAssigner simpleHashBucketAssigner =
+                new SimpleHashBucketAssigner(2, 0, 100, 3);
+
+        BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+        int hash = 0;
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(0);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(2);
+        }
+
+        // exceed upper bound
+        for (int i = 0; i < 200; i++) {
+            int bucket = simpleHashBucketAssigner.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isIn(0, 2);
+        }
+    }
+
+    @Test
+    public void testAssignWithUpperBoundMultiAssigners() {
+        SimpleHashBucketAssigner simpleHashBucketAssigner0 =
+                new SimpleHashBucketAssigner(2, 0, 100, 3);
+        SimpleHashBucketAssigner simpleHashBucketAssigner1 =
+                new SimpleHashBucketAssigner(2, 1, 100, 3);
+
+        BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
+        int hash = 0;
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(0);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(1);
+        }
+
+        for (int i = 0; i < 100; i++) {
+            int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isEqualTo(2);
+        }
+
+        // exceed upper bound
+        for (int i = 0; i < 200; i++) {
+            int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isIn(0, 2);
+        }
+        for (int i = 0; i < 200; i++) {
+            int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
+            Assertions.assertThat(bucket).isIn(1);
+        }
+    }
+
+    @ParameterizedTest(name = "maxBuckets: {0}")
+    @ValueSource(ints = {-1, 1, 2})
+    public void testAssignWithSameHash(int maxBucketsNum) {
+        SimpleHashBucketAssigner simpleHashBucketAssigner =
+                new SimpleHashBucketAssigner(2, 0, 100, maxBucketsNum);
 
         BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
         int hash = 0;
@@ -70,9 +136,10 @@ public class SimpleHashBucketAssignerTest {
         }
     }
 
-    @Test
-    public void testPartitionCopy() {
-        SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0, 
5);
+    @ParameterizedTest(name = "maxBuckets: {0}")
+    @ValueSource(ints = {-1, 1, 2})
+    public void testPartitionCopy(int maxBucketsNum) {
+        SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0, 
5, maxBucketsNum);
 
         BinaryRow partition = row(1);
         assertThat(assigner.assign(partition, 0)).isEqualTo(0);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
index 0c101c6d1e..5839fc98c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java
@@ -49,6 +49,7 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
     private final Integer numAssigners;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
     private final boolean overwrite;
+    private int[] maxBucketsArr;
 
     private transient BucketAssigner assigner;
     private transient PartitionKeyExtractor<T> extractor;
@@ -70,8 +71,8 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
 
-        // Each job can only have one user name and this name must be 
consistent across restarts.
-        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // Each job can only have one username and this name must be 
consistent across restarts.
+        // We cannot use job id as commit username here because user may 
change job id by creating
         // a savepoint, stop the job and then resume from savepoint.
         String commitUser =
                 StateUtils.getSingleValueFromState(
@@ -80,9 +81,11 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
         int numberTasks = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
         int taskId = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
         long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
+        Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
         this.assigner =
                 overwrite
-                        ? new SimpleHashBucketAssigner(numberTasks, taskId, 
targetRowNum)
+                        ? new SimpleHashBucketAssigner(
+                                numberTasks, taskId, targetRowNum, 
maxBucketsNum)
                         : new HashBucketAssigner(
                                 table.snapshotManager(),
                                 commitUser,
@@ -90,7 +93,8 @@ public class HashBucketAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2
                                 numberTasks,
                                 MathUtils.min(numAssigners, numberTasks),
                                 taskId,
-                                targetRowNum);
+                                targetRowNum,
+                                maxBucketsNum);
         this.extractor = extractorFunction.apply(table.schema());
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
index 57a8a8e4ab..19494fc88d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow}
 import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow => 
PaimonInternalRow, JoinedRow}
 import org.apache.paimon.disk.IOManager
-import org.apache.paimon.index.HashBucketAssigner
+import org.apache.paimon.index.{HashBucketAssigner, PartitionIndex}
 import org.apache.paimon.spark.{DataConverter, SparkRow}
 import org.apache.paimon.spark.SparkUtils.createIOManager
 import org.apache.paimon.spark.util.EncoderUtils
@@ -111,7 +111,8 @@ case class DynamicBucketProcessor(
       numSparkPartitions,
       numAssigners,
       TaskContext.getPartitionId(),
-      targetBucketRowNumber
+      targetBucketRowNumber,
+      fileStoreTable.coreOptions.dynamicBucketMaxBuckets
     )
 
     new Iterator[Row]() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 7d56fe867a..061337b56f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.crosspartition.{IndexBootstrap, 
KeyPartOrRow}
 import org.apache.paimon.data.serializer.InternalSerializers
 import org.apache.paimon.deletionvectors.DeletionVector
 import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer
-import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
+import org.apache.paimon.index.{BucketAssigner, PartitionIndex, 
SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
 import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
 import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
@@ -196,7 +196,9 @@ case class PaimonSparkWriter(table: FileStoreTable) {
                 new SimpleHashBucketAssigner(
                   numAssigners,
                   TaskContext.getPartitionId(),
-                  table.coreOptions.dynamicBucketTargetRowNum)
+                  table.coreOptions.dynamicBucketTargetRowNum,
+                  table.coreOptions.dynamicBucketMaxBuckets
+                )
               row => {
                 val sparkRow = new SparkRow(rowType, row)
                 assigner.assign(

Reply via email to