This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a7b584d6f6 [flink] Produce real random id in SourceSplitGenerator
(#6441)
a7b584d6f6 is described below
commit a7b584d6f6d7ea3c47bb313edb51eb791e7c9061
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 21 13:08:34 2025 +0800
[flink] Produce real random id in SourceSplitGenerator (#6441)
---
.../org/apache/paimon/table/source/DataSplit.java | 16 ++++++++++
.../paimon/flink/source/FileStoreSourceSplit.java | 13 ++++++++
.../source/FileStoreSourceSplitGenerator.java | 37 +++-------------------
.../source/FileStoreSourceSplitGeneratorTest.java | 32 +++++++++++--------
4 files changed, 52 insertions(+), 46 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 079477b001..3a4c112a95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -328,6 +328,22 @@ public class DataSplit implements Split {
rawConvertible);
}
+ @Override
+ public String toString() {
+ return "{"
+ + "snapshotId="
+ + snapshotId
+ + ", partition=hash-"
+ + partition.hashCode()
+ + ", bucket="
+ + bucket
+ + ", rawConvertible="
+ + rawConvertible
+ + '}'
+ + "@"
+ + Integer.toHexString(hashCode());
+ }
+
private void writeObject(ObjectOutputStream out) throws IOException {
serialize(new DataOutputViewStreamWrapper(out));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java
index 769154b196..b9ce94c413 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java
@@ -79,4 +79,17 @@ public class FileStoreSourceSplit implements SourceSplit {
public int hashCode() {
return Objects.hash(id, split, recordsToSkip);
}
+
+ @Override
+ public String toString() {
+ return "{"
+ + "id='"
+ + id
+ + '\''
+ + ", split="
+ + split
+ + ", recordsToSkip="
+ + recordsToSkip
+ + '}';
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
index f19ec828e0..2d89bbbe52 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
@@ -18,10 +18,11 @@
package org.apache.paimon.flink.source;
-import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableScan;
import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -30,11 +31,8 @@ import java.util.stream.Collectors;
*/
public class FileStoreSourceSplitGenerator {
- /**
- * The current Id as a mutable string representation. This covers more
values than the integer
- * value range, so we should never overflow.
- */
- private final char[] currentId = "0000000000".toCharArray();
+ private final String uuid = UUID.randomUUID().toString();
+ private final AtomicInteger idCounter = new AtomicInteger(1);
public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) {
return plan.splits().stream()
@@ -42,32 +40,7 @@ public class FileStoreSourceSplitGenerator {
.collect(Collectors.toList());
}
- public List<FileStoreSourceSplit> createSplits(List<Split> splits) {
- return splits.stream()
- .map(s -> new FileStoreSourceSplit(getNextId(), s))
- .collect(Collectors.toList());
- }
-
protected final String getNextId() {
- // because we just increment numbers, we increment the char
representation directly,
- // rather than incrementing an integer and converting it to a string
representation
- // every time again (requires quite some expensive conversion logic).
- incrementCharArrayByOne(currentId, currentId.length - 1);
- return new String(currentId);
- }
-
- private static void incrementCharArrayByOne(char[] array, int pos) {
- if (pos < 0) {
- throw new RuntimeException("Produce too many splits.");
- }
-
- char c = array[pos];
- c++;
-
- if (c > '9') {
- c = '0';
- incrementCharArrayByOne(array, pos - 1);
- }
- array[pos] = c;
+ return uuid + "-" + idCounter.getAndIncrement();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
index 932b7ee951..cf63274169 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java
@@ -70,23 +70,27 @@ public class FileStoreSourceSplitGeneratorTest {
o -> ((DataSplit) ((FileStoreSourceSplit)
o).split()).bucket()));
// splitId should be the input order!
- assertSplit(splits.get(0), "0000000001", 1, 0, Arrays.asList("f0",
"f1"));
- assertSplit(splits.get(1), "0000000002", 1, 1,
Collections.singletonList("f2"));
- assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3",
"f4", "f5"));
- assertSplit(splits.get(3), "0000000004", 2, 1,
Collections.singletonList("f6"));
- assertSplit(splits.get(4), "0000000005", 3, 0,
Collections.singletonList("f7"));
- assertSplit(splits.get(5), "0000000006", 3, 1,
Collections.singletonList("f8"));
- assertSplit(splits.get(6), "0000000007", 4, 0,
Collections.singletonList("f9"));
- assertSplit(splits.get(7), "0000000008", 4, 1,
Collections.singletonList("f10"));
- assertSplit(splits.get(8), "0000000009", 5, 0,
Collections.singletonList("f11"));
- assertSplit(splits.get(9), "0000000010", 5, 1,
Collections.singletonList("f12"));
- assertSplit(splits.get(10), "0000000011", 6, 0,
Collections.singletonList("f13"));
- assertSplit(splits.get(11), "0000000012", 6, 1,
Collections.singletonList("f14"));
+ assertSplit(splits.get(0), "-1", 1, 0, Arrays.asList("f0", "f1"));
+ assertSplit(splits.get(1), "-2", 1, 1,
Collections.singletonList("f2"));
+ assertSplit(splits.get(2), "-3", 2, 0, Arrays.asList("f3", "f4",
"f5"));
+ assertSplit(splits.get(3), "-4", 2, 1,
Collections.singletonList("f6"));
+ assertSplit(splits.get(4), "-5", 3, 0,
Collections.singletonList("f7"));
+ assertSplit(splits.get(5), "-6", 3, 1,
Collections.singletonList("f8"));
+ assertSplit(splits.get(6), "-7", 4, 0,
Collections.singletonList("f9"));
+ assertSplit(splits.get(7), "-8", 4, 1,
Collections.singletonList("f10"));
+ assertSplit(splits.get(8), "-9", 5, 0,
Collections.singletonList("f11"));
+ assertSplit(splits.get(9), "-10", 5, 1,
Collections.singletonList("f12"));
+ assertSplit(splits.get(10), "-11", 6, 0,
Collections.singletonList("f13"));
+ assertSplit(splits.get(11), "-12", 6, 1,
Collections.singletonList("f14"));
}
private void assertSplit(
- FileStoreSourceSplit split, String splitId, int part, int bucket,
List<String> files) {
- assertThat(split.splitId()).isEqualTo(splitId);
+ FileStoreSourceSplit split,
+ String splitIdSuffix,
+ int part,
+ int bucket,
+ List<String> files) {
+ assertThat(split.splitId()).endsWith(splitIdSuffix);
assertThat(((DataSplit)
split.split()).partition().getInt(0)).isEqualTo(part);
assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket);
assertThat(