This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new af6ac7bb7a [flink] Fix postpone bucket without changelog producer
cannot read all records with multiple streaming readers (#6142)
af6ac7bb7a is described below
commit af6ac7bb7ab8c75383c25d196f7e27599e63fcf4
Author: tsreaper <[email protected]>
AuthorDate: Mon Aug 25 20:19:01 2025 +0800
[flink] Fix postpone bucket without changelog producer cannot read all
records with multiple streaming readers (#6142)
---
.../postpone/PostponeBucketFileStoreWrite.java | 13 ++++++++++++
.../postpone/PostponeBucketCompactSplitSource.java | 20 +++++-------------
.../source/ContinuousFileSplitEnumerator.java | 19 ++++++++++++++---
.../paimon/flink/PostponeBucketTableITCase.java | 24 ++++++++++++++++------
4 files changed, 52 insertions(+), 24 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index ac9a851cb2..4978ec6b3d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -216,4 +216,17 @@ public class PostponeBucketFileStoreWrite extends
MemoryFileStoreWrite<KeyValue>
protected Function<WriterContainer<KeyValue>, Boolean>
createWriterCleanChecker() {
return createNoConflictAwareWriterCleanChecker();
}
+
+ public static int getWriteId(String fileName) {
+ try {
+ String[] parts = fileName.split("-s-");
+ return Integer.parseInt(parts[1].substring(0,
parts[1].indexOf('-')));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Data file name "
+ + fileName
+ + " does not match the pattern. This is
unexpected.",
+ e);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index ab53f8f824..3e0cd7cf23 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -28,13 +28,13 @@ import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.operator.ReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
@@ -55,8 +55,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Source for compacting postpone bucket tables. This source scans all files
from {@code bucket =
@@ -163,27 +161,19 @@ public class PostponeBucketCompactSplitSource extends
AbstractNonCoordinatedSour
private static class SplitChannelComputer implements
ChannelComputer<Split> {
private transient int numChannels;
- private transient Pattern pattern;
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- // see PostponeBucketTableWriteOperator
- this.pattern = Pattern.compile("-s-(\\d+?)-");
}
@Override
public int channel(Split record) {
DataSplit dataSplit = (DataSplit) record;
- String fileName = dataSplit.dataFiles().get(0).fileName();
-
- Matcher matcher = pattern.matcher(fileName);
- Preconditions.checkState(
- matcher.find(),
- "Data file name %s does not match the pattern. This is
unexpected.",
- fileName);
- return ChannelComputer.select(
- dataSplit.partition(), Integer.parseInt(matcher.group(1)),
numChannels);
+ int bucketId =
+
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
+ % numChannels;
+ return ChannelComputer.select(dataSplit.partition(), bucketId,
numChannels);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 46ca1a5c92..668bd7d847 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -304,11 +306,22 @@ public class ContinuousFileSplitEnumerator
protected int assignSuggestedTask(FileStoreSourceSplit split) {
DataSplit dataSplit = ((DataSplit) split.split());
+ int parallelism = context.currentParallelism();
+
+ int bucketId;
+ if (dataSplit.bucket() == BucketMode.POSTPONE_BUCKET) {
+ bucketId =
+
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
+ % parallelism;
+ } else {
+ bucketId = dataSplit.bucket();
+ }
+
if (shuffleBucketWithPartition) {
- return ChannelComputer.select(
- dataSplit.partition(), dataSplit.bucket(),
context.currentParallelism());
+ return ChannelComputer.select(dataSplit.partition(), bucketId,
parallelism);
+ } else {
+ return ChannelComputer.select(bucketId, parallelism);
}
- return ChannelComputer.select(dataSplit.bucket(),
context.currentParallelism());
}
protected SplitAssigner createSplitAssigner(boolean unordered) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index 4a449e240a..be19cdff42 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -714,7 +714,7 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
TableEnvironment sEnv =
tableEnvironmentBuilder()
.streamingMode()
- .parallelism(1)
+ .parallelism(2)
.checkpointIntervalMs(500)
.build();
String createCatalog =
@@ -750,15 +750,27 @@ public class PostponeBucketTableITCase extends
AbstractTestBase {
bEnv.executeSql("INSERT INTO T VALUES (1, 101), (3, 31)").await();
bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+ TableResult streamingSelect = sEnv.executeSql("SELECT * FROM T");
+ Thread.sleep(1000);
+
+ bEnv.executeSql("INSERT INTO T VALUES (1, 102), (4, 42)").await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 103), (5, 53)").await();
+ bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
assertThat(collect(bEnv.executeSql("SELECT * FROM T")))
- .containsExactlyInAnyOrder("+I[1, 101]", "+I[2, 20]", "+I[3,
31]");
- TableResult streamingSelect =
- sEnv.executeSql("SELECT * FROM T /*+
OPTIONS('scan.snapshot-id' = '1') */");
+ .containsExactlyInAnyOrder(
+ "+I[1, 103]", "+I[2, 20]", "+I[3, 31]", "+I[4, 42]",
"+I[5, 53]");
JobClient client = streamingSelect.getJobClient().get();
CloseableIterator<Row> it = streamingSelect.collect();
- assertThat(collect(client, it, 5))
+ assertThat(collect(client, it, 7))
.containsExactlyInAnyOrder(
- "+I[1, 10]", "+I[2, 20]", "+I[1, 100]", "+I[1, 101]",
"+I[3, 31]");
+ "+I[1, 101]",
+ "+I[2, 20]",
+ "+I[3, 31]",
+ "+I[1, 102]",
+ "+I[4, 42]",
+ "+I[1, 103]",
+ "+I[5, 53]");
}
private List<String> collect(TableResult result) throws Exception {