This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a33915822 [client] Fix NPE thrown by MetadataUpdater when getting
bucket count (#2054)
a33915822 is described below
commit a339158220a13b6df236b410a70b1ee6dc5fce23
Author: yunhong <[email protected]>
AuthorDate: Sun Nov 30 15:21:12 2025 +0800
[client] Fix NPE thrown by MetadataUpdater when getting bucket count (#2054)
---
.../fluss/client/write/RoundRobinBucketAssigner.java | 7 ++++---
.../apache/fluss/client/write/StickyBucketAssigner.java | 6 ++++--
.../java/org/apache/fluss/client/write/WriterClient.java | 4 ++--
.../apache/fluss/client/write/RecordAccumulatorTest.java | 3 ++-
.../fluss/client/write/StickyStaticBucketAssignerTest.java | 14 +++++++-------
.../src/main/java/org/apache/fluss/cluster/Cluster.java | 4 ----
6 files changed, 19 insertions(+), 19 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
index 69beb3751..d87bb2669 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/RoundRobinBucketAssigner.java
@@ -31,10 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
@Internal
public class RoundRobinBucketAssigner extends DynamicBucketAssigner {
private final PhysicalTablePath physicalTablePath;
+ private final int bucketNumber;
private final AtomicInteger counter = new AtomicInteger(new
Random().nextInt());
- public RoundRobinBucketAssigner(PhysicalTablePath physicalTablePath) {
+ public RoundRobinBucketAssigner(PhysicalTablePath physicalTablePath, int
bucketNumber) {
this.physicalTablePath = physicalTablePath;
+ this.bucketNumber = bucketNumber;
}
@Override
@@ -47,8 +49,7 @@ public class RoundRobinBucketAssigner extends
DynamicBucketAssigner {
return bucketsForTable.get(bucket).getBucketId();
} else {
// no buckets are available, give a non-available bucket.
- return MathUtils.toPositive(nextValue)
- % cluster.getBucketCount(physicalTablePath.getTablePath());
+ return MathUtils.toPositive(nextValue) % bucketNumber;
}
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
index 4d427127b..12dda3567 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/StickyBucketAssigner.java
@@ -35,10 +35,12 @@ import java.util.concurrent.atomic.AtomicInteger;
public class StickyBucketAssigner extends DynamicBucketAssigner {
private final PhysicalTablePath physicalTablePath;
+ private final int bucketNumber;
private final AtomicInteger currentBucketId;
- public StickyBucketAssigner(PhysicalTablePath physicalTablePath) {
+ public StickyBucketAssigner(PhysicalTablePath physicalTablePath, int
bucketNumber) {
this.physicalTablePath = physicalTablePath;
+ this.bucketNumber = bucketNumber;
this.currentBucketId = new AtomicInteger(-1);
}
@@ -73,7 +75,7 @@ public class StickyBucketAssigner extends
DynamicBucketAssigner {
cluster.getAvailableBucketsForPhysicalTablePath(physicalTablePath);
if (availableBuckets.isEmpty()) {
int random =
MathUtils.toPositive(ThreadLocalRandom.current().nextInt());
- newBucket = random %
cluster.getBucketCount(physicalTablePath.getTablePath());
+ newBucket = random % bucketNumber;
} else if (availableBuckets.size() == 1) {
newBucket = availableBuckets.get(0).getBucketId();
} else {
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index 962369ef7..f61e6f544 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -354,9 +354,9 @@ public class WriterClient {
ConfigOptions.NoKeyAssigner noKeyAssigner =
conf.get(ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER);
if (noKeyAssigner == ROUND_ROBIN) {
- return new RoundRobinBucketAssigner(physicalTablePath);
+ return new RoundRobinBucketAssigner(physicalTablePath,
bucketNumber);
} else if (noKeyAssigner == STICKY) {
- return new StickyBucketAssigner(physicalTablePath);
+ return new StickyBucketAssigner(physicalTablePath,
bucketNumber);
} else {
throw new IllegalArgumentException(
"Unsupported append only row bucket assigner: " +
noKeyAssigner);
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index c721e974c..5aeee2d10 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -343,7 +343,8 @@ class RecordAccumulatorTest {
int batchSize = 100;
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
- StickyBucketAssigner bucketAssigner = new
StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+ StickyBucketAssigner bucketAssigner =
+ new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
RecordAccumulator accum =
createTestRecordAccumulator(
(int) Duration.ofMinutes(1).toMillis(),
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
index 467fb2af8..5ac05b0fb 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/StickyStaticBucketAssignerTest.java
@@ -65,7 +65,7 @@ class StickyStaticBucketAssignerTest {
// init cluster.
Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2,
bucket3));
StickyBucketAssigner stickyBucketAssigner =
- new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+ new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
int bucketId = stickyBucketAssigner.assignBucket(cluster);
assertThat(bucketId >= 0 && bucketId < 3).isTrue();
@@ -94,7 +94,7 @@ class StickyStaticBucketAssignerTest {
// init cluster.
Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2,
bucket3));
StickyBucketAssigner stickyBucketAssigner =
- new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+ new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
int bucketId = stickyBucketAssigner.assignBucket(cluster);
for (int i = 0; i < 3; i++) {
if (i != bucketId) {
@@ -111,7 +111,7 @@ class StickyStaticBucketAssignerTest {
// init cluster.
Cluster cluster = updateCluster(Collections.singletonList(bucket1));
StickyBucketAssigner stickyBucketAssigner =
- new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH);
+ new StickyBucketAssigner(DATA1_PHYSICAL_TABLE_PATH, 3);
int bucketId = stickyBucketAssigner.assignBucket(cluster);
for (int i = 0; i < 100; i++) {
@@ -136,7 +136,7 @@ class StickyStaticBucketAssignerTest {
Cluster cluster = updateCluster(allBuckets);
// Assure we never choose bucket 1 for tp1 because it is unavailable.
- StickyBucketAssigner stickyBucketAssigner = new
StickyBucketAssigner(tp1);
+ StickyBucketAssigner stickyBucketAssigner = new
StickyBucketAssigner(tp1, 3);
int bucketForTp1 = stickyBucketAssigner.assignBucket(cluster);
assertThat(bucketForTp1).isNotEqualTo(1);
for (int i = 0; i < 100; i++) {
@@ -145,7 +145,7 @@ class StickyStaticBucketAssignerTest {
}
// Assure we always choose bucket 1 for tp2.
- stickyBucketAssigner = new StickyBucketAssigner(tp2);
+ stickyBucketAssigner = new StickyBucketAssigner(tp2, 3);
int bucketForTp2 = stickyBucketAssigner.assignBucket(cluster);
assertThat(bucketForTp2).isEqualTo(1);
for (int i = 0; i < 100; i++) {
@@ -154,7 +154,7 @@ class StickyStaticBucketAssignerTest {
}
// Assure that we can still choose one bucket even if there are no
available buckets.
- stickyBucketAssigner = new StickyBucketAssigner(tp3);
+ stickyBucketAssigner = new StickyBucketAssigner(tp3, 3);
int bucketForTp3 = stickyBucketAssigner.assignBucket(cluster);
assertThat(bucketForTp3).isIn(0, 1, 2);
stickyBucketAssigner.onNewBatch(cluster, bucketForTp3);
@@ -165,7 +165,7 @@ class StickyStaticBucketAssignerTest {
void testMultiThreadToCallOnNewBatch() {
Cluster cluster = updateCluster(Arrays.asList(bucket1, bucket2,
bucket3));
StickyBucketAssigner stickyBucketAssigner =
- new
StickyBucketAssigner(PhysicalTablePath.of(DATA1_TABLE_PATH));
+ new
StickyBucketAssigner(PhysicalTablePath.of(DATA1_TABLE_PATH), 3);
int bucketId = stickyBucketAssigner.assignBucket(cluster);
Queue<Integer> bucketIds = new ConcurrentLinkedQueue<>();
Thread[] threads = new Thread[100];
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index ccd73e22c..1cec4540d 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -173,10 +173,6 @@ public final class Cluster {
+ " in cluster"));
}
- public int getBucketCount(TablePath tablePath) {
- return tableInfoByPath.get(tablePath).getNumBuckets();
- }
-
/** Get the bucket location for this table-bucket. */
public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket)
{
return Optional.ofNullable(availableLocationByBucket.get(tableBucket));