This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a818f9318a [flink] Rename to unordered in FlinkSourceBuilder
a818f9318a is described below
commit a818f9318a5f5c39671c4c5ba5010f52bbce2c4f
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jul 11 11:35:36 2025 +0800
[flink] Rename to unordered in FlinkSourceBuilder
---
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../source/ContinuousFileSplitEnumerator.java | 8 ++---
.../flink/source/ContinuousFileStoreSource.java | 8 ++---
.../paimon/flink/source/FlinkSourceBuilder.java | 38 +++++++++++++++-------
.../align/AlignedContinuousFileStoreSource.java | 6 ++--
.../flink/source/operator/MonitorSource.java | 15 ++++-----
.../flink/source/FlinkSourceBuilderTest.java | 12 +++----
7 files changed, 51 insertions(+), 38 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index fe6b9005e5..964116dbe7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -123,7 +123,7 @@ public class CoreOptions implements Serializable {
+ "if there is no primary
key, the full row will be used.")
.build());
- public static final ConfigOption<Boolean> BUCKET_APPEND_ORDERD =
+ public static final ConfigOption<Boolean> BUCKET_APPEND_ORDERED =
key("bucket-append-ordered")
.booleanType()
.defaultValue(true)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 38c593e75d..46ca1a5c92 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -95,7 +95,7 @@ public class ContinuousFileSplitEnumerator
@Nullable Long nextSnapshotId,
long discoveryInterval,
StreamTableScan scan,
- boolean unawareBucket,
+ boolean unordered,
int splitMaxPerTask,
boolean shuffleBucketWithPartition,
int maxSnapshotCount) {
@@ -108,7 +108,7 @@ public class ContinuousFileSplitEnumerator
this.scan = scan;
this.splitMaxPerTask = splitMaxPerTask;
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
- this.splitAssigner = createSplitAssigner(unawareBucket);
+ this.splitAssigner = createSplitAssigner(unordered);
this.shuffleBucketWithPartition = shuffleBucketWithPartition;
addSplits(remainSplits);
@@ -311,8 +311,8 @@ public class ContinuousFileSplitEnumerator
return ChannelComputer.select(dataSplit.bucket(),
context.currentParallelism());
}
- protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
- return unawareBucket
+ protected SplitAssigner createSplitAssigner(boolean unordered) {
+ return unordered
? new FIFOSplitAssigner(Collections.emptyList())
: new PreAssignSplitAssigner(
this.splitMaxPerTask, context,
Collections.emptyList());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 2c1273e811..f0194dc4cc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -44,7 +44,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 4L;
protected final Map<String, String> options;
- protected final boolean unawareBucket;
+ protected final boolean unordered;
public ContinuousFileStoreSource(
ReadBuilder readBuilder, Map<String, String> options, @Nullable
Long limit) {
@@ -55,11 +55,11 @@ public class ContinuousFileStoreSource extends FlinkSource {
ReadBuilder readBuilder,
Map<String, String> options,
@Nullable Long limit,
- boolean unawareBucket,
+ boolean unordered,
@Nullable NestedProjectedRowData rowData) {
super(readBuilder, limit, rowData);
this.options = options;
- this.unawareBucket = unawareBucket;
+ this.unordered = unordered;
}
@Override
@@ -109,7 +109,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
nextSnapshotId,
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
- unawareBucket,
+ unordered,
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 846f5f645e..b5ef9c469f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -84,7 +84,7 @@ public class FlinkSourceBuilder {
private final Table table;
private final Options conf;
- private final boolean unawareBucket;
+ private final boolean unordered;
private String sourceName;
private Boolean sourceBounded;
private StreamExecutionEnvironment env;
@@ -100,12 +100,26 @@ public class FlinkSourceBuilder {
this.table = table;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
- this.unawareBucket =
- (table instanceof FileStoreTable
- && ((FileStoreTable) table).bucketMode()
- == BucketMode.BUCKET_UNAWARE)
- || (table.primaryKeys().isEmpty()
- &&
!this.conf.get(CoreOptions.BUCKET_APPEND_ORDERD));
+ this.unordered = unordered(table);
+ }
+
+ private static boolean unordered(Table table) {
+ if (!(table instanceof FileStoreTable)) {
+ return false;
+ }
+
+ if (!table.primaryKeys().isEmpty()) {
+ return false;
+ }
+
+ BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
+ if (bucketMode == BucketMode.BUCKET_UNAWARE) {
+ return true;
+ } else if (bucketMode == BucketMode.HASH_FIXED) {
+ return
!Options.fromMap(table.options()).get(CoreOptions.BUCKET_APPEND_ORDERED);
+ }
+
+ return false;
}
public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
@@ -208,7 +222,7 @@ public class FlinkSourceBuilder {
createReadBuilder(projectedRowType()),
table.options(),
limit,
- unawareBucket,
+ unordered,
outerProject()));
}
@@ -219,7 +233,7 @@ public class FlinkSourceBuilder {
createReadBuilder(projectedRowType()),
table.options(),
limit,
- unawareBucket,
+ unordered,
outerProject()));
}
@@ -271,8 +285,8 @@ public class FlinkSourceBuilder {
}
@VisibleForTesting
- public boolean isUnawareBucket() {
- return unawareBucket;
+ public boolean isUnordered() {
+ return unordered;
}
/** Build source {@link DataStream} with {@link RowData}. */
@@ -360,7 +374,7 @@ public class FlinkSourceBuilder {
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
watermarkStrategy == null,
conf.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
- unawareBucket,
+ unordered,
outerProject(),
isBounded,
limit);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 59845901ff..076175b31f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -50,9 +50,9 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
ReadBuilder readBuilder,
Map<String, String> options,
@Nullable Long limit,
- boolean unawareBucket,
+ boolean unordered,
@Nullable NestedProjectedRowData rowData) {
- super(readBuilder, options, limit, unawareBucket, rowData);
+ super(readBuilder, options, limit, unordered, rowData);
}
@Override
@@ -81,7 +81,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
nextSnapshotId,
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
- unawareBucket,
+ unordered,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index d664b526ff..9833a10f86 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -223,11 +223,11 @@ public class MonitorSource extends
AbstractNonCoordinatedSource<Split> {
long monitorInterval,
boolean emitSnapshotWatermark,
boolean shuffleBucketWithPartition,
- boolean unawareBucket,
+ boolean unordered,
NestedProjectedRowData nestedProjectedRowData,
boolean isBounded,
@Nullable Long limit) {
- SingleOutputStreamOperator<Split> singleOutputStreamOperator =
+ SingleOutputStreamOperator<Split> operator =
env.fromSource(
new MonitorSource(
readBuilder,
@@ -240,10 +240,9 @@ public class MonitorSource extends
AbstractNonCoordinatedSource<Split> {
.forceNonParallel();
DataStream<Split> sourceDataStream =
- unawareBucket
- ? shuffleUnawareBucket(singleOutputStreamOperator)
- : shuffleNonUnawareBucket(
- singleOutputStreamOperator,
shuffleBucketWithPartition);
+ unordered
+ ? shuffleUnordered(operator)
+ : shuffleOrdered(operator, shuffleBucketWithPartition);
return sourceDataStream.transform(
name + "-Reader",
@@ -251,12 +250,12 @@ public class MonitorSource extends
AbstractNonCoordinatedSource<Split> {
new ReadOperator(readBuilder::newRead, nestedProjectedRowData,
limit));
}
- private static DataStream<Split> shuffleUnawareBucket(
+ private static DataStream<Split> shuffleUnordered(
SingleOutputStreamOperator<Split> singleOutputStreamOperator) {
return singleOutputStreamOperator.rebalance();
}
- private static DataStream<Split> shuffleNonUnawareBucket(
+ private static DataStream<Split> shuffleOrdered(
SingleOutputStreamOperator<Split> singleOutputStreamOperator,
boolean shuffleBucketWithPartition) {
return singleOutputStreamOperator.partitionCustom(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
index b19a1a330c..bc2ccb0fed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
@@ -87,31 +87,31 @@ public class FlinkSourceBuilderTest {
// pk table && bucket-append-ordered is true
Table table = createTable("t1", true, 2, true);
FlinkSourceBuilder builder = new FlinkSourceBuilder(table);
- assertFalse(builder.isUnawareBucket());
+ assertFalse(builder.isUnordered());
// pk table && bucket-append-ordered is false
table = createTable("t2", true, 2, false);
builder = new FlinkSourceBuilder(table);
- assertFalse(builder.isUnawareBucket());
+ assertFalse(builder.isUnordered());
// pk table && bucket num == -1 && bucket-append-ordered is false
table = createTable("t3", true, -1, false);
builder = new FlinkSourceBuilder(table);
- assertFalse(builder.isUnawareBucket());
+ assertFalse(builder.isUnordered());
// append table && bucket num != 1 && bucket-append-ordered is true
table = createTable("t4", false, 2, true);
builder = new FlinkSourceBuilder(table);
- assertFalse(builder.isUnawareBucket());
+ assertFalse(builder.isUnordered());
// append table && bucket num == -1
table = createTable("t5", false, -1, true);
builder = new FlinkSourceBuilder(table);
- assertTrue(builder.isUnawareBucket());
+ assertTrue(builder.isUnordered());
// append table && bucket-append-ordered is false
table = createTable("t6", false, 2, false);
builder = new FlinkSourceBuilder(table);
- assertTrue(builder.isUnawareBucket());
+ assertTrue(builder.isUnordered());
}
}