This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a818f9318a [flink] Rename to unordered in FlinkSourceBuilder
a818f9318a is described below

commit a818f9318a5f5c39671c4c5ba5010f52bbce2c4f
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jul 11 11:35:36 2025 +0800

    [flink] Rename to unordered in FlinkSourceBuilder
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../source/ContinuousFileSplitEnumerator.java      |  8 ++---
 .../flink/source/ContinuousFileStoreSource.java    |  8 ++---
 .../paimon/flink/source/FlinkSourceBuilder.java    | 38 +++++++++++++++-------
 .../align/AlignedContinuousFileStoreSource.java    |  6 ++--
 .../flink/source/operator/MonitorSource.java       | 15 ++++-----
 .../flink/source/FlinkSourceBuilderTest.java       | 12 +++----
 7 files changed, 51 insertions(+), 38 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index fe6b9005e5..964116dbe7 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -123,7 +123,7 @@ public class CoreOptions implements Serializable {
                                                     + "if there is no primary 
key, the full row will be used.")
                                     .build());
 
-    public static final ConfigOption<Boolean> BUCKET_APPEND_ORDERD =
+    public static final ConfigOption<Boolean> BUCKET_APPEND_ORDERED =
             key("bucket-append-ordered")
                     .booleanType()
                     .defaultValue(true)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 38c593e75d..46ca1a5c92 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -95,7 +95,7 @@ public class ContinuousFileSplitEnumerator
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
             StreamTableScan scan,
-            boolean unawareBucket,
+            boolean unordered,
             int splitMaxPerTask,
             boolean shuffleBucketWithPartition,
             int maxSnapshotCount) {
@@ -108,7 +108,7 @@ public class ContinuousFileSplitEnumerator
         this.scan = scan;
         this.splitMaxPerTask = splitMaxPerTask;
         this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
-        this.splitAssigner = createSplitAssigner(unawareBucket);
+        this.splitAssigner = createSplitAssigner(unordered);
         this.shuffleBucketWithPartition = shuffleBucketWithPartition;
         addSplits(remainSplits);
 
@@ -311,8 +311,8 @@ public class ContinuousFileSplitEnumerator
         return ChannelComputer.select(dataSplit.bucket(), 
context.currentParallelism());
     }
 
-    protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
-        return unawareBucket
+    protected SplitAssigner createSplitAssigner(boolean unordered) {
+        return unordered
                 ? new FIFOSplitAssigner(Collections.emptyList())
                 : new PreAssignSplitAssigner(
                         this.splitMaxPerTask, context, 
Collections.emptyList());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 2c1273e811..f0194dc4cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -44,7 +44,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
     private static final long serialVersionUID = 4L;
 
     protected final Map<String, String> options;
-    protected final boolean unawareBucket;
+    protected final boolean unordered;
 
     public ContinuousFileStoreSource(
             ReadBuilder readBuilder, Map<String, String> options, @Nullable 
Long limit) {
@@ -55,11 +55,11 @@ public class ContinuousFileStoreSource extends FlinkSource {
             ReadBuilder readBuilder,
             Map<String, String> options,
             @Nullable Long limit,
-            boolean unawareBucket,
+            boolean unordered,
             @Nullable NestedProjectedRowData rowData) {
         super(readBuilder, limit, rowData);
         this.options = options;
-        this.unawareBucket = unawareBucket;
+        this.unordered = unordered;
     }
 
     @Override
@@ -109,7 +109,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 nextSnapshotId,
                 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                 scan,
-                unawareBucket,
+                unordered,
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
                 
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
                 options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 846f5f645e..b5ef9c469f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -84,7 +84,7 @@ public class FlinkSourceBuilder {
 
     private final Table table;
     private final Options conf;
-    private final boolean unawareBucket;
+    private final boolean unordered;
     private String sourceName;
     private Boolean sourceBounded;
     private StreamExecutionEnvironment env;
@@ -100,12 +100,26 @@ public class FlinkSourceBuilder {
         this.table = table;
         this.sourceName = table.name();
         this.conf = Options.fromMap(table.options());
-        this.unawareBucket =
-                (table instanceof FileStoreTable
-                                && ((FileStoreTable) table).bucketMode()
-                                        == BucketMode.BUCKET_UNAWARE)
-                        || (table.primaryKeys().isEmpty()
-                                && 
!this.conf.get(CoreOptions.BUCKET_APPEND_ORDERD));
+        this.unordered = unordered(table);
+    }
+
+    private static boolean unordered(Table table) {
+        if (!(table instanceof FileStoreTable)) {
+            return false;
+        }
+
+        if (!table.primaryKeys().isEmpty()) {
+            return false;
+        }
+
+        BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
+        if (bucketMode == BucketMode.BUCKET_UNAWARE) {
+            return true;
+        } else if (bucketMode == BucketMode.HASH_FIXED) {
+            return 
!Options.fromMap(table.options()).get(CoreOptions.BUCKET_APPEND_ORDERED);
+        }
+
+        return false;
     }
 
     public FlinkSourceBuilder env(StreamExecutionEnvironment env) {
@@ -208,7 +222,7 @@ public class FlinkSourceBuilder {
                         createReadBuilder(projectedRowType()),
                         table.options(),
                         limit,
-                        unawareBucket,
+                        unordered,
                         outerProject()));
     }
 
@@ -219,7 +233,7 @@ public class FlinkSourceBuilder {
                         createReadBuilder(projectedRowType()),
                         table.options(),
                         limit,
-                        unawareBucket,
+                        unordered,
                         outerProject()));
     }
 
@@ -271,8 +285,8 @@ public class FlinkSourceBuilder {
     }
 
     @VisibleForTesting
-    public boolean isUnawareBucket() {
-        return unawareBucket;
+    public boolean isUnordered() {
+        return unordered;
     }
 
     /** Build source {@link DataStream} with {@link RowData}. */
@@ -360,7 +374,7 @@ public class FlinkSourceBuilder {
                         
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                         watermarkStrategy == null,
                         
conf.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
-                        unawareBucket,
+                        unordered,
                         outerProject(),
                         isBounded,
                         limit);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 59845901ff..076175b31f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -50,9 +50,9 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
             ReadBuilder readBuilder,
             Map<String, String> options,
             @Nullable Long limit,
-            boolean unawareBucket,
+            boolean unordered,
             @Nullable NestedProjectedRowData rowData) {
-        super(readBuilder, options, limit, unawareBucket, rowData);
+        super(readBuilder, options, limit, unordered, rowData);
     }
 
     @Override
@@ -81,7 +81,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 nextSnapshotId,
                 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                 scan,
-                unawareBucket,
+                unordered,
                 
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
                 
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index d664b526ff..9833a10f86 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -223,11 +223,11 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
             long monitorInterval,
             boolean emitSnapshotWatermark,
             boolean shuffleBucketWithPartition,
-            boolean unawareBucket,
+            boolean unordered,
             NestedProjectedRowData nestedProjectedRowData,
             boolean isBounded,
             @Nullable Long limit) {
-        SingleOutputStreamOperator<Split> singleOutputStreamOperator =
+        SingleOutputStreamOperator<Split> operator =
                 env.fromSource(
                                 new MonitorSource(
                                         readBuilder,
@@ -240,10 +240,9 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
                         .forceNonParallel();
 
         DataStream<Split> sourceDataStream =
-                unawareBucket
-                        ? shuffleUnawareBucket(singleOutputStreamOperator)
-                        : shuffleNonUnawareBucket(
-                                singleOutputStreamOperator, 
shuffleBucketWithPartition);
+                unordered
+                        ? shuffleUnordered(operator)
+                        : shuffleOrdered(operator, shuffleBucketWithPartition);
 
         return sourceDataStream.transform(
                 name + "-Reader",
@@ -251,12 +250,12 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
                 new ReadOperator(readBuilder::newRead, nestedProjectedRowData, 
limit));
     }
 
-    private static DataStream<Split> shuffleUnawareBucket(
+    private static DataStream<Split> shuffleUnordered(
             SingleOutputStreamOperator<Split> singleOutputStreamOperator) {
         return singleOutputStreamOperator.rebalance();
     }
 
-    private static DataStream<Split> shuffleNonUnawareBucket(
+    private static DataStream<Split> shuffleOrdered(
             SingleOutputStreamOperator<Split> singleOutputStreamOperator,
             boolean shuffleBucketWithPartition) {
         return singleOutputStreamOperator.partitionCustom(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
index b19a1a330c..bc2ccb0fed 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkSourceBuilderTest.java
@@ -87,31 +87,31 @@ public class FlinkSourceBuilderTest {
         // pk table && bucket-append-ordered is true
         Table table = createTable("t1", true, 2, true);
         FlinkSourceBuilder builder = new FlinkSourceBuilder(table);
-        assertFalse(builder.isUnawareBucket());
+        assertFalse(builder.isUnordered());
 
         // pk table && bucket-append-ordered is false
         table = createTable("t2", true, 2, false);
         builder = new FlinkSourceBuilder(table);
-        assertFalse(builder.isUnawareBucket());
+        assertFalse(builder.isUnordered());
 
         // pk table && bucket num == -1 && bucket-append-ordered is false
         table = createTable("t3", true, -1, false);
         builder = new FlinkSourceBuilder(table);
-        assertFalse(builder.isUnawareBucket());
+        assertFalse(builder.isUnordered());
 
         // append table && bucket num != 1 && bucket-append-ordered is true
         table = createTable("t4", false, 2, true);
         builder = new FlinkSourceBuilder(table);
-        assertFalse(builder.isUnawareBucket());
+        assertFalse(builder.isUnordered());
 
         // append table && bucket num == -1
         table = createTable("t5", false, -1, true);
         builder = new FlinkSourceBuilder(table);
-        assertTrue(builder.isUnawareBucket());
+        assertTrue(builder.isUnordered());
 
         // append table && bucket-append-ordered is false
         table = createTable("t6", false, 2, false);
         builder = new FlinkSourceBuilder(table);
-        assertTrue(builder.isUnawareBucket());
+        assertTrue(builder.isUnordered());
     }
 }

Reply via email to