This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c825b37dfb [refactor] Refactor flink source to get rid of confusing 
BucketMode argument (#5704)
c825b37dfb is described below

commit c825b37dfbf12a16b9fd5ecd04fa021769e28be5
Author: yuzelin <[email protected]>
AuthorDate: Fri Jun 6 22:46:28 2025 +0800

    [refactor] Refactor flink source to get rid of confusing BucketMode 
argument (#5704)
---
 .../source/ContinuousFileSplitEnumerator.java      |  9 ++++-----
 .../flink/source/ContinuousFileStoreSource.java    | 11 +++++------
 .../paimon/flink/source/FlinkSourceBuilder.java    | 13 ++++++------
 .../paimon/flink/source/SystemTableSource.java     |  3 +--
 .../AlignedContinuousFileSplitEnumerator.java      |  7 +++----
 .../align/AlignedContinuousFileStoreSource.java    |  7 +++----
 .../flink/source/operator/MonitorSource.java       |  7 ++-----
 .../source/ContinuousFileSplitEnumeratorTest.java  | 23 +++++++++++-----------
 .../AlignedContinuousFileSplitEnumeratorTest.java  |  9 +--------
 .../source/align/AlignedSourceReaderTest.java      |  7 +------
 10 files changed, 37 insertions(+), 59 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 79597d0cd2..4db86da117 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
 import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
 import org.apache.paimon.flink.source.assigners.SplitAssigner;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -94,7 +93,7 @@ public class ContinuousFileSplitEnumerator
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
             StreamTableScan scan,
-            BucketMode bucketMode,
+            boolean unawareBucket,
             int splitMaxPerTask,
             boolean shuffleBucketWithPartition,
             int maxSnapshotCount) {
@@ -105,7 +104,7 @@ public class ContinuousFileSplitEnumerator
         this.readersAwaitingSplit = new LinkedHashSet<>();
         this.splitGenerator = new FileStoreSourceSplitGenerator();
         this.scan = scan;
-        this.splitAssigner = createSplitAssigner(bucketMode);
+        this.splitAssigner = createSplitAssigner(unawareBucket);
         this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
         this.shuffleBucketWithPartition = shuffleBucketWithPartition;
         addSplits(remainSplits);
@@ -309,8 +308,8 @@ public class ContinuousFileSplitEnumerator
         return ChannelComputer.select(dataSplit.bucket(), 
context.currentParallelism());
     }
 
-    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
-        return bucketMode == BucketMode.BUCKET_UNAWARE
+    protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
+        return unawareBucket
                 ? new FIFOSplitAssigner(Collections.emptyList())
                 : new PreAssignSplitAssigner(1, context, 
Collections.emptyList());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 879f262bbd..3587313748 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.NestedProjectedRowData;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -45,22 +44,22 @@ public class ContinuousFileStoreSource extends FlinkSource {
     private static final long serialVersionUID = 4L;
 
     protected final Map<String, String> options;
-    protected final BucketMode bucketMode;
+    protected final boolean unawareBucket;
 
     public ContinuousFileStoreSource(
             ReadBuilder readBuilder, Map<String, String> options, @Nullable 
Long limit) {
-        this(readBuilder, options, limit, BucketMode.HASH_FIXED, null);
+        this(readBuilder, options, limit, false, null);
     }
 
     public ContinuousFileStoreSource(
             ReadBuilder readBuilder,
             Map<String, String> options,
             @Nullable Long limit,
-            BucketMode bucketMode,
+            boolean unawareBucket,
             @Nullable NestedProjectedRowData rowData) {
         super(readBuilder, limit, rowData);
         this.options = options;
-        this.bucketMode = bucketMode;
+        this.unawareBucket = unawareBucket;
     }
 
     @Override
@@ -110,7 +109,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 nextSnapshotId,
                 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                 scan,
-                bucketMode,
+                unawareBucket,
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
                 
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
                 options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 542480cd19..feb9176c85 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -82,7 +82,7 @@ public class FlinkSourceBuilder {
 
     private final Table table;
     private final Options conf;
-    private final BucketMode bucketMode;
+    private final boolean unawareBucket;
     private String sourceName;
     private Boolean sourceBounded;
     private StreamExecutionEnvironment env;
@@ -96,10 +96,9 @@ public class FlinkSourceBuilder {
 
     public FlinkSourceBuilder(Table table) {
         this.table = table;
-        this.bucketMode =
+        this.unawareBucket =
                 table instanceof FileStoreTable
-                        ? ((FileStoreTable) table).bucketMode()
-                        : BucketMode.HASH_FIXED;
+                        && ((FileStoreTable) table).bucketMode() == 
BucketMode.BUCKET_UNAWARE;
         this.sourceName = table.name();
         this.conf = Options.fromMap(table.options());
     }
@@ -204,7 +203,7 @@ public class FlinkSourceBuilder {
                         createReadBuilder(projectedRowType()),
                         table.options(),
                         limit,
-                        bucketMode,
+                        unawareBucket,
                         outerProject()));
     }
 
@@ -215,7 +214,7 @@ public class FlinkSourceBuilder {
                         createReadBuilder(projectedRowType()),
                         table.options(),
                         limit,
-                        bucketMode,
+                        unawareBucket,
                         outerProject()));
     }
 
@@ -349,7 +348,7 @@ public class FlinkSourceBuilder {
                         watermarkStrategy == null,
                         conf.get(
                                 
FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
-                        bucketMode,
+                        unawareBucket,
                         outerProject());
         if (parallelism != null) {
             dataStream.getTransformation().setParallelism(parallelism);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index ce0af0d450..e9331414f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -24,7 +24,6 @@ import org.apache.paimon.flink.PaimonDataStreamScanProvider;
 import org.apache.paimon.flink.Projection;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -98,7 +97,7 @@ public class SystemTableSource extends FlinkTableSource {
         if (unbounded && table instanceof DataTable) {
             source =
                     new ContinuousFileStoreSource(
-                            readBuilder, table.options(), limit, 
BucketMode.HASH_FIXED, rowData);
+                            readBuilder, table.options(), limit, false, 
rowData);
         } else {
             source =
                     new StaticFileStoreSource(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 857c1b7ab7..d1652dfa9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
 import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
 import org.apache.paimon.flink.source.assigners.SplitAssigner;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.SnapshotNotExistPlan;
@@ -92,7 +91,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
             @Nullable Long nextSnapshotId,
             long discoveryInterval,
             StreamTableScan scan,
-            BucketMode bucketMode,
+            boolean unawareBucket,
             long alignTimeout,
             int splitPerTaskMax,
             boolean shuffleBucketWithPartition,
@@ -103,7 +102,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
                 nextSnapshotId,
                 discoveryInterval,
                 scan,
-                bucketMode,
+                unawareBucket,
                 splitPerTaskMax,
                 shuffleBucketWithPartition,
                 maxSnapshotCount);
@@ -267,7 +266,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
     }
 
     @Override
-    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
+    protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
         return new AlignedSplitAssigner();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 8904720c90..29ff63a31c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 
@@ -51,9 +50,9 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
             ReadBuilder readBuilder,
             Map<String, String> options,
             @Nullable Long limit,
-            BucketMode bucketMode,
+            boolean unawareBucket,
             @Nullable NestedProjectedRowData rowData) {
-        super(readBuilder, options, limit, bucketMode, rowData);
+        super(readBuilder, options, limit, unawareBucket, rowData);
     }
 
     @Override
@@ -82,7 +81,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 nextSnapshotId,
                 
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                 scan,
-                bucketMode,
+                unawareBucket,
                 
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
                 
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index e5063172ca..228d0d5a44 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -24,7 +24,6 @@ import 
org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
 import org.apache.paimon.flink.source.SimpleSourceSplit;
 import org.apache.paimon.flink.source.SplitListState;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -55,8 +54,6 @@ import java.util.NavigableMap;
 import java.util.OptionalLong;
 import java.util.TreeMap;
 
-import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
-
 /**
  * This is the single (non-parallel) monitoring task, it is responsible for:
  *
@@ -211,7 +208,7 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
             long monitorInterval,
             boolean emitSnapshotWatermark,
             boolean shuffleBucketWithPartition,
-            BucketMode bucketMode,
+            boolean unawareBucket,
             NestedProjectedRowData nestedProjectedRowData) {
         SingleOutputStreamOperator<Split> singleOutputStreamOperator =
                 env.fromSource(
@@ -223,7 +220,7 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
                         .forceNonParallel();
 
         DataStream<Split> sourceDataStream =
-                bucketMode == BUCKET_UNAWARE
+                unawareBucket
                         ? shuffleUnawareBucket(singleOutputStreamOperator)
                         : shuffleNonUnawareBucket(
                                 singleOutputStreamOperator, 
shuffleBucketWithPartition);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index e3912cc707..05e3d3b0b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -274,7 +273,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .build();
         enumerator.start();
 
@@ -316,7 +315,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .build();
         enumerator.start();
 
@@ -375,7 +374,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .build();
         enumerator.start();
 
@@ -431,7 +430,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .build();
         enumerator.start();
 
@@ -470,7 +469,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .build();
         enumerator.start();
         enumerator.handleSplitRequest(1, "test-host");
@@ -766,7 +765,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .build();
         enumerator.start();
 
@@ -815,7 +814,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                         .setInitialSplits(Collections.emptyList())
                         .setDiscoveryInterval(1)
                         .setScan(scan)
-                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .unawareBucket(true)
                         .withMaxSnapshotCount(1)
                         .build();
         enumerator.start();
@@ -899,7 +898,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
         private long discoveryInterval = Long.MAX_VALUE;
 
         private StreamTableScan scan;
-        private BucketMode bucketMode = BucketMode.HASH_FIXED;
+        private boolean unawareBucket = false;
         private int maxSnapshotCount = -1;
 
         public Builder setSplitEnumeratorContext(
@@ -923,8 +922,8 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
             return this;
         }
 
-        public Builder withBucketMode(BucketMode bucketMode) {
-            this.bucketMode = bucketMode;
+        public Builder unawareBucket(boolean unawareBucket) {
+            this.unawareBucket = unawareBucket;
             return this;
         }
 
@@ -940,7 +939,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
                     null,
                     discoveryInterval,
                     scan,
-                    bucketMode,
+                    unawareBucket,
                     10,
                     false,
                     maxSnapshotCount);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index a3c26ce80d..43dd09682b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -30,7 +30,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -208,7 +207,6 @@ public class AlignedContinuousFileSplitEnumeratorTest 
extends FileSplitEnumerato
         private long discoveryInterval = Long.MAX_VALUE;
 
         private StreamTableScan scan;
-        private BucketMode bucketMode = BucketMode.HASH_FIXED;
 
         private long timeout = 30000L;
 
@@ -233,11 +231,6 @@ public class AlignedContinuousFileSplitEnumeratorTest 
extends FileSplitEnumerato
             return this;
         }
 
-        public Builder withBucketMode(BucketMode bucketMode) {
-            this.bucketMode = bucketMode;
-            return this;
-        }
-
         public Builder setAlignedTimeout(long timeout) {
             this.timeout = timeout;
             return this;
@@ -250,7 +243,7 @@ public class AlignedContinuousFileSplitEnumeratorTest 
extends FileSplitEnumerato
                     null,
                     discoveryInterval,
                     scan,
-                    bucketMode,
+                    false,
                     timeout,
                     10,
                     false,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index f243a6a095..6ea389c161 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.StreamTableCommit;
@@ -127,11 +126,7 @@ public class AlignedSourceReaderTest extends 
FileStoreSourceReaderTest {
 
         AlignedContinuousFileStoreSource alignedSource =
                 new AlignedContinuousFileStoreSource(
-                        table.newReadBuilder(),
-                        table.options(),
-                        null,
-                        BucketMode.HASH_DYNAMIC,
-                        null);
+                        table.newReadBuilder(), table.options(), null, false, 
null);
         SourceOperatorFactory<RowData> sourceOperatorFactory =
                 new SourceOperatorFactory<>(alignedSource, 
WatermarkStrategy.noWatermarks());
         StreamTaskMailboxTestHarnessBuilder<RowData> builder =

Reply via email to