This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c825b37dfb [refactor] Refactor flink source to get rid of confusing
BucketMode argument (#5704)
c825b37dfb is described below
commit c825b37dfbf12a16b9fd5ecd04fa021769e28be5
Author: yuzelin <[email protected]>
AuthorDate: Fri Jun 6 22:46:28 2025 +0800
[refactor] Refactor flink source to get rid of confusing BucketMode
argument (#5704)
---
.../source/ContinuousFileSplitEnumerator.java | 9 ++++-----
.../flink/source/ContinuousFileStoreSource.java | 11 +++++------
.../paimon/flink/source/FlinkSourceBuilder.java | 13 ++++++------
.../paimon/flink/source/SystemTableSource.java | 3 +--
.../AlignedContinuousFileSplitEnumerator.java | 7 +++----
.../align/AlignedContinuousFileStoreSource.java | 7 +++----
.../flink/source/operator/MonitorSource.java | 7 ++-----
.../source/ContinuousFileSplitEnumeratorTest.java | 23 +++++++++++-----------
.../AlignedContinuousFileSplitEnumeratorTest.java | 9 +--------
.../source/align/AlignedSourceReaderTest.java | 7 +------
10 files changed, 37 insertions(+), 59 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 79597d0cd2..4db86da117 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -94,7 +93,7 @@ public class ContinuousFileSplitEnumerator
@Nullable Long nextSnapshotId,
long discoveryInterval,
StreamTableScan scan,
- BucketMode bucketMode,
+ boolean unawareBucket,
int splitMaxPerTask,
boolean shuffleBucketWithPartition,
int maxSnapshotCount) {
@@ -105,7 +104,7 @@ public class ContinuousFileSplitEnumerator
this.readersAwaitingSplit = new LinkedHashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
- this.splitAssigner = createSplitAssigner(bucketMode);
+ this.splitAssigner = createSplitAssigner(unawareBucket);
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
this.shuffleBucketWithPartition = shuffleBucketWithPartition;
addSplits(remainSplits);
@@ -309,8 +308,8 @@ public class ContinuousFileSplitEnumerator
return ChannelComputer.select(dataSplit.bucket(),
context.currentParallelism());
}
- protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
- return bucketMode == BucketMode.BUCKET_UNAWARE
+ protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
+ return unawareBucket
? new FIFOSplitAssigner(Collections.emptyList())
: new PreAssignSplitAssigner(1, context,
Collections.emptyList());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 879f262bbd..3587313748 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.NestedProjectedRowData;
import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.StreamTableScan;
@@ -45,22 +44,22 @@ public class ContinuousFileStoreSource extends FlinkSource {
private static final long serialVersionUID = 4L;
protected final Map<String, String> options;
- protected final BucketMode bucketMode;
+ protected final boolean unawareBucket;
public ContinuousFileStoreSource(
ReadBuilder readBuilder, Map<String, String> options, @Nullable
Long limit) {
- this(readBuilder, options, limit, BucketMode.HASH_FIXED, null);
+ this(readBuilder, options, limit, false, null);
}
public ContinuousFileStoreSource(
ReadBuilder readBuilder,
Map<String, String> options,
@Nullable Long limit,
- BucketMode bucketMode,
+ boolean unawareBucket,
@Nullable NestedProjectedRowData rowData) {
super(readBuilder, limit, rowData);
this.options = options;
- this.bucketMode = bucketMode;
+ this.unawareBucket = unawareBucket;
}
@Override
@@ -110,7 +109,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
nextSnapshotId,
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
- bucketMode,
+ unawareBucket,
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 542480cd19..feb9176c85 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -82,7 +82,7 @@ public class FlinkSourceBuilder {
private final Table table;
private final Options conf;
- private final BucketMode bucketMode;
+ private final boolean unawareBucket;
private String sourceName;
private Boolean sourceBounded;
private StreamExecutionEnvironment env;
@@ -96,10 +96,9 @@ public class FlinkSourceBuilder {
public FlinkSourceBuilder(Table table) {
this.table = table;
- this.bucketMode =
+ this.unawareBucket =
table instanceof FileStoreTable
- ? ((FileStoreTable) table).bucketMode()
- : BucketMode.HASH_FIXED;
+ && ((FileStoreTable) table).bucketMode() ==
BucketMode.BUCKET_UNAWARE;
this.sourceName = table.name();
this.conf = Options.fromMap(table.options());
}
@@ -204,7 +203,7 @@ public class FlinkSourceBuilder {
createReadBuilder(projectedRowType()),
table.options(),
limit,
- bucketMode,
+ unawareBucket,
outerProject()));
}
@@ -215,7 +214,7 @@ public class FlinkSourceBuilder {
createReadBuilder(projectedRowType()),
table.options(),
limit,
- bucketMode,
+ unawareBucket,
outerProject()));
}
@@ -349,7 +348,7 @@ public class FlinkSourceBuilder {
watermarkStrategy == null,
conf.get(
FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
- bucketMode,
+ unawareBucket,
outerProject());
if (parallelism != null) {
dataStream.getTransformation().setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index ce0af0d450..e9331414f5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -24,7 +24,6 @@ import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.Projection;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
@@ -98,7 +97,7 @@ public class SystemTableSource extends FlinkTableSource {
if (unbounded && table instanceof DataTable) {
source =
new ContinuousFileStoreSource(
- readBuilder, table.options(), limit,
BucketMode.HASH_FIXED, rowData);
+ readBuilder, table.options(), limit, false,
rowData);
} else {
source =
new StaticFileStoreSource(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 857c1b7ab7..d1652dfa9e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.assigners.AlignedSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
@@ -92,7 +91,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
@Nullable Long nextSnapshotId,
long discoveryInterval,
StreamTableScan scan,
- BucketMode bucketMode,
+ boolean unawareBucket,
long alignTimeout,
int splitPerTaskMax,
boolean shuffleBucketWithPartition,
@@ -103,7 +102,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
nextSnapshotId,
discoveryInterval,
scan,
- bucketMode,
+ unawareBucket,
splitPerTaskMax,
shuffleBucketWithPartition,
maxSnapshotCount);
@@ -267,7 +266,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
}
@Override
- protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
+ protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
return new AlignedSplitAssigner();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 8904720c90..29ff63a31c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.options.Options;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
@@ -51,9 +50,9 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
ReadBuilder readBuilder,
Map<String, String> options,
@Nullable Long limit,
- BucketMode bucketMode,
+ boolean unawareBucket,
@Nullable NestedProjectedRowData rowData) {
- super(readBuilder, options, limit, bucketMode, rowData);
+ super(readBuilder, options, limit, unawareBucket, rowData);
}
@Override
@@ -82,7 +81,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
nextSnapshotId,
options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
scan,
- bucketMode,
+ unawareBucket,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index e5063172ca..228d0d5a44 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -24,7 +24,6 @@ import
org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;
import org.apache.paimon.flink.utils.JavaTypeInfo;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -55,8 +54,6 @@ import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
-import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
-
/**
* This is the single (non-parallel) monitoring task, it is responsible for:
*
@@ -211,7 +208,7 @@ public class MonitorSource extends
AbstractNonCoordinatedSource<Split> {
long monitorInterval,
boolean emitSnapshotWatermark,
boolean shuffleBucketWithPartition,
- BucketMode bucketMode,
+ boolean unawareBucket,
NestedProjectedRowData nestedProjectedRowData) {
SingleOutputStreamOperator<Split> singleOutputStreamOperator =
env.fromSource(
@@ -223,7 +220,7 @@ public class MonitorSource extends
AbstractNonCoordinatedSource<Split> {
.forceNonParallel();
DataStream<Split> sourceDataStream =
- bucketMode == BUCKET_UNAWARE
+ unawareBucket
? shuffleUnawareBucket(singleOutputStreamOperator)
: shuffleNonUnawareBucket(
singleOutputStreamOperator,
shuffleBucketWithPartition);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index e3912cc707..05e3d3b0b6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
@@ -274,7 +273,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.build();
enumerator.start();
@@ -316,7 +315,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.build();
enumerator.start();
@@ -375,7 +374,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.build();
enumerator.start();
@@ -431,7 +430,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.build();
enumerator.start();
@@ -470,7 +469,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.build();
enumerator.start();
enumerator.handleSplitRequest(1, "test-host");
@@ -766,7 +765,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.build();
enumerator.start();
@@ -815,7 +814,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
- .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .unawareBucket(true)
.withMaxSnapshotCount(1)
.build();
enumerator.start();
@@ -899,7 +898,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
private long discoveryInterval = Long.MAX_VALUE;
private StreamTableScan scan;
- private BucketMode bucketMode = BucketMode.HASH_FIXED;
+ private boolean unawareBucket = false;
private int maxSnapshotCount = -1;
public Builder setSplitEnumeratorContext(
@@ -923,8 +922,8 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
return this;
}
- public Builder withBucketMode(BucketMode bucketMode) {
- this.bucketMode = bucketMode;
+ public Builder unawareBucket(boolean unawareBucket) {
+ this.unawareBucket = unawareBucket;
return this;
}
@@ -940,7 +939,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
null,
discoveryInterval,
scan,
- bucketMode,
+ unawareBucket,
10,
false,
maxSnapshotCount);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index a3c26ce80d..43dd09682b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -30,7 +30,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.source.StreamTableScan;
@@ -208,7 +207,6 @@ public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumerato
private long discoveryInterval = Long.MAX_VALUE;
private StreamTableScan scan;
- private BucketMode bucketMode = BucketMode.HASH_FIXED;
private long timeout = 30000L;
@@ -233,11 +231,6 @@ public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumerato
return this;
}
- public Builder withBucketMode(BucketMode bucketMode) {
- this.bucketMode = bucketMode;
- return this;
- }
-
public Builder setAlignedTimeout(long timeout) {
this.timeout = timeout;
return this;
@@ -250,7 +243,7 @@ public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumerato
null,
discoveryInterval,
scan,
- bucketMode,
+ false,
timeout,
10,
false,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index f243a6a095..6ea389c161 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.StreamTableCommit;
@@ -127,11 +126,7 @@ public class AlignedSourceReaderTest extends
FileStoreSourceReaderTest {
AlignedContinuousFileStoreSource alignedSource =
new AlignedContinuousFileStoreSource(
- table.newReadBuilder(),
- table.options(),
- null,
- BucketMode.HASH_DYNAMIC,
- null);
+ table.newReadBuilder(), table.options(), null, false,
null);
SourceOperatorFactory<RowData> sourceOperatorFactory =
new SourceOperatorFactory<>(alignedSource,
WatermarkStrategy.noWatermarks());
StreamTaskMailboxTestHarnessBuilder<RowData> builder =