This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e87366513 [flink] Refactor CDC related classes to support record types 
other than CdcRecord (#2876)
e87366513 is described below

commit e87366513beeed7b8b687dff90dd42432f782cc3
Author: tsreaper <[email protected]>
AuthorDate: Wed Feb 21 11:41:38 2024 +0800

    [flink] Refactor CDC related classes to support record types other than 
CdcRecord (#2876)
---
 .../table/sink/FixedBucketRowKeyExtractor.java     |   3 -
 .../paimon/table/sink/KeyAndBucketExtractor.java   |   3 +
 .../table/sink/FixedBucketRowKeyExtractorTest.java |   3 +-
 .../flink/sink/cdc/CdcDynamicBucketSink.java       |  44 ++-----
 .../flink/sink/cdc/CdcDynamicBucketSinkBase.java   | 128 +++++++++++++++++++++
 ...java => CdcFixedBucketChannelComputerBase.java} |  40 ++++---
 .../flink/sink/cdc/CdcRecordChannelComputer.java   |  35 +-----
 .../sink/cdc/CdcWithBucketChannelComputer.java     |  56 ---------
 .../cdc/CdcMultiplexRecordChannelComputerTest.java |   3 +-
 .../sink/cdc/CdcRecordChannelComputerTest.java     |  16 ++-
 10 files changed, 181 insertions(+), 150 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
index 62b5dc215..95a2c023a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
@@ -25,8 +25,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** {@link KeyAndBucketExtractor} for {@link InternalRow}. */
 public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
 
@@ -40,7 +38,6 @@ public class FixedBucketRowKeyExtractor extends 
RowKeyExtractor {
     public FixedBucketRowKeyExtractor(TableSchema schema) {
         super(schema);
         numBuckets = new CoreOptions(schema.options()).bucket();
-        checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
         sameBucketKeyAndTrimmedPrimaryKey = 
schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
         bucketKeyProjection =
                 CodeGenUtils.newProjection(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
index 66378b873..0b0b1a154 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.types.RowKind;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /**
  * Utility interface to extract partition keys, bucket id, primary keys for 
file store ({@code
  * trimmedPrimaryKey}) and primary keys for external log system ({@code 
logPrimaryKey}) from the
@@ -46,6 +48,7 @@ public interface KeyAndBucketExtractor<T> {
     }
 
     static int bucket(int hashcode, int numBuckets) {
+        checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
         return Math.abs(hashcode % numBuckets);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
index 08f9d5e00..99f9cc63d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
@@ -67,7 +67,8 @@ public class FixedBucketRowKeyExtractorTest {
 
     @Test
     public void testIllegalBucket() {
-        assertThatThrownBy(() -> extractor("", "", "a", -1))
+        GenericRow row = GenericRow.of(5, 6, 7);
+        assertThatThrownBy(() -> bucket(extractor("", "", "a", -1), row))
                 .hasMessageContaining("Num bucket is illegal");
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
index ec72405b0..574ff685f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -18,57 +18,27 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.DynamicBucketSink;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.utils.SerializableFunction;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 
-/** Sink for dynamic bucket table. */
-public class CdcDynamicBucketSink extends DynamicBucketSink<CdcRecord> {
+/** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */
+public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase<CdcRecord> {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     public CdcDynamicBucketSink(FileStoreTable table) {
-        super(table, null);
+        super(table);
     }
 
     @Override
-    protected ChannelComputer<CdcRecord> assignerChannelComputer(Integer 
numAssigners) {
-        return new CdcAssignerChannelComputer(table.schema(), numAssigners);
-    }
-
-    @Override
-    protected ChannelComputer<Tuple2<CdcRecord, Integer>> channelComputer2() {
-        return new CdcWithBucketChannelComputer(table.schema());
-    }
-
-    @Override
-    protected SerializableFunction<TableSchema, 
PartitionKeyExtractor<CdcRecord>>
-            extractorFunction() {
-        return schema -> {
-            CdcRecordKeyAndBucketExtractor extractor = new 
CdcRecordKeyAndBucketExtractor(schema);
-            return new PartitionKeyExtractor<CdcRecord>() {
-                @Override
-                public BinaryRow partition(CdcRecord record) {
-                    extractor.setRecord(record);
-                    return extractor.partition();
-                }
-
-                @Override
-                public BinaryRow trimmedPrimaryKey(CdcRecord record) {
-                    extractor.setRecord(record);
-                    return extractor.trimmedPrimaryKey();
-                }
-            };
-        };
+    protected KeyAndBucketExtractor<CdcRecord> createExtractor(TableSchema 
schema) {
+        return new CdcRecordKeyAndBucketExtractor(schema);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSinkBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSinkBase.java
new file mode 100644
index 000000000..30a262819
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSinkBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.DynamicBucketSink;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+
+/** CDC sink for dynamic bucket table. */
+public abstract class CdcDynamicBucketSinkBase<T> extends DynamicBucketSink<T> 
{
+
+    public CdcDynamicBucketSinkBase(FileStoreTable table) {
+        super(table, null);
+    }
+
+    @Override
+    protected ChannelComputer<T> assignerChannelComputer(Integer numAssigners) 
{
+        return new AssignerChannelComputer(numAssigners);
+    }
+
+    @Override
+    protected ChannelComputer<Tuple2<T, Integer>> channelComputer2() {
+        return new RecordWithBucketChannelComputer();
+    }
+
+    @Override
+    protected SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction() {
+        return schema -> {
+            KeyAndBucketExtractor<T> extractor = createExtractor(schema);
+            return new PartitionKeyExtractor<T>() {
+                @Override
+                public BinaryRow partition(T record) {
+                    extractor.setRecord(record);
+                    return extractor.partition();
+                }
+
+                @Override
+                public BinaryRow trimmedPrimaryKey(T record) {
+                    extractor.setRecord(record);
+                    return extractor.trimmedPrimaryKey();
+                }
+            };
+        };
+    }
+
+    protected abstract KeyAndBucketExtractor<T> createExtractor(TableSchema 
schema);
+
+    private class AssignerChannelComputer implements ChannelComputer<T> {
+
+        private Integer numAssigners;
+
+        private transient int numChannels;
+        private transient KeyAndBucketExtractor<T> extractor;
+
+        public AssignerChannelComputer(Integer numAssigners) {
+            this.numAssigners = numAssigners;
+        }
+
+        @Override
+        public void setup(int numChannels) {
+            this.numChannels = numChannels;
+            this.numAssigners = MathUtils.min(numAssigners, numChannels);
+            this.extractor = createExtractor(table.schema());
+        }
+
+        @Override
+        public int channel(T record) {
+            extractor.setRecord(record);
+            int partitionHash = extractor.partition().hashCode();
+            int keyHash = extractor.trimmedPrimaryKey().hashCode();
+            return computeAssigner(partitionHash, keyHash, numChannels, 
numAssigners);
+        }
+
+        @Override
+        public String toString() {
+            return "shuffle by key hash";
+        }
+    }
+
+    private class RecordWithBucketChannelComputer implements 
ChannelComputer<Tuple2<T, Integer>> {
+
+        private transient int numChannels;
+        private transient KeyAndBucketExtractor<T> extractor;
+
+        @Override
+        public void setup(int numChannels) {
+            this.numChannels = numChannels;
+            this.extractor = createExtractor(table.schema());
+        }
+
+        @Override
+        public int channel(Tuple2<T, Integer> record) {
+            extractor.setRecord(record.f0);
+            return ChannelComputer.select(extractor.partition(), record.f1, 
numChannels);
+        }
+
+        @Override
+        public String toString() {
+            return "shuffle by partition & bucket";
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketChannelComputerBase.java
similarity index 56%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketChannelComputerBase.java
index c93bd52be..685f02dc1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketChannelComputerBase.java
@@ -18,45 +18,47 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.utils.MathUtils;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 
-import static org.apache.paimon.index.BucketAssigner.computeAssigner;
-
-/** Hash key of a {@link CdcRecord}. */
-public class CdcAssignerChannelComputer implements ChannelComputer<CdcRecord> {
-
-    private static final long serialVersionUID = 1L;
+/**
+ * {@link ChannelComputer} for distributing CDC records into writers for 
fixed-bucket mode tables.
+ */
+public abstract class CdcFixedBucketChannelComputerBase<T> implements 
ChannelComputer<T> {
 
-    private final TableSchema schema;
-    private Integer numAssigners;
+    protected final TableSchema schema;
 
     private transient int numChannels;
-    private transient CdcRecordKeyAndBucketExtractor extractor;
+    private transient KeyAndBucketExtractor<T> extractor;
 
-    public CdcAssignerChannelComputer(TableSchema schema, Integer 
numAssigners) {
+    public CdcFixedBucketChannelComputerBase(TableSchema schema) {
         this.schema = schema;
-        this.numAssigners = numAssigners;
     }
 
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.numAssigners = MathUtils.min(numAssigners, numChannels);
-        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+        this.extractor = createExtractor();
     }
 
+    protected abstract KeyAndBucketExtractor<T> createExtractor();
+
     @Override
-    public int channel(CdcRecord record) {
+    public int channel(T record) {
         extractor.setRecord(record);
-        int partitionHash = extractor.partition().hashCode();
-        int keyHash = extractor.trimmedPrimaryKey().hashCode();
-        return computeAssigner(partitionHash, keyHash, numChannels, 
numAssigners);
+        return channel(extractor.partition(), extractor.bucket());
+    }
+
+    @VisibleForTesting
+    int channel(BinaryRow partition, int bucket) {
+        return ChannelComputer.select(partition, bucket, numChannels);
     }
 
     @Override
     public String toString() {
-        return "shuffle by key hash";
+        return "shuffle by bucket";
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
index f90cb4ad8..630d0e223 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
@@ -18,43 +18,20 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 
-/** {@link ChannelComputer} for {@link CdcRecord}. */
-public class CdcRecordChannelComputer implements ChannelComputer<CdcRecord> {
+/** {@link CdcFixedBucketChannelComputerBase} for {@link CdcRecord}. */
+public class CdcRecordChannelComputer extends 
CdcFixedBucketChannelComputerBase<CdcRecord> {
 
-    private static final long serialVersionUID = 1L;
-
-    private final TableSchema schema;
-
-    private transient int numChannels;
-    private transient KeyAndBucketExtractor<CdcRecord> extractor;
+    private static final long serialVersionUID = 2L;
 
     public CdcRecordChannelComputer(TableSchema schema) {
-        this.schema = schema;
-    }
-
-    @Override
-    public void setup(int numChannels) {
-        this.numChannels = numChannels;
-        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
-    }
-
-    @Override
-    public int channel(CdcRecord record) {
-        extractor.setRecord(record);
-        return channel(extractor.partition(), extractor.bucket());
-    }
-
-    public int channel(BinaryRow partition, int bucket) {
-        return ChannelComputer.select(partition, bucket, numChannels);
+        super(schema);
     }
 
     @Override
-    public String toString() {
-        return "shuffle by bucket";
+    protected KeyAndBucketExtractor<CdcRecord> createExtractor() {
+        return new CdcRecordKeyAndBucketExtractor(schema);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
deleted file mode 100644
index 198a3b0e0..000000000
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.ChannelComputer;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/** Hash key of a {@link CdcRecord} with bucket. */
-public class CdcWithBucketChannelComputer implements 
ChannelComputer<Tuple2<CdcRecord, Integer>> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final TableSchema schema;
-
-    private transient int numChannels;
-    private transient CdcRecordKeyAndBucketExtractor extractor;
-
-    public CdcWithBucketChannelComputer(TableSchema schema) {
-        this.schema = schema;
-    }
-
-    @Override
-    public void setup(int numChannels) {
-        this.numChannels = numChannels;
-        this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
-    }
-
-    @Override
-    public int channel(Tuple2<CdcRecord, Integer> record) {
-        extractor.setRecord(record.f0);
-        return ChannelComputer.select(extractor.partition(), record.f1, 
numChannels);
-    }
-
-    @Override
-    public String toString() {
-        return "shuffle by partition & bucket";
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 2ae627552..658f1abf1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -75,6 +76,7 @@ public class CdcMultiplexRecordChannelComputerTest {
         catalog.createDatabase(databaseName, true);
 
         Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, ThreadLocalRandom.current().nextInt(1, 
5));
         conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, 
Duration.ofMillis(10));
 
         RowType rowTypeWithPartition =
@@ -130,7 +132,6 @@ public class CdcMultiplexRecordChannelComputerTest {
 
     @Test
     public void testSchemaNoPartition() {
-
         ThreadLocalRandom random = ThreadLocalRandom.current();
         int numInputs = random.nextInt(1000) + 1;
         List<Map<String, String>> input = new ArrayList<>();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
index ccfc4adad..9a19013e2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -49,11 +51,15 @@ public class CdcRecordChannelComputerTest {
 
     @Test
     public void testSchemaWithPartition() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
                         new String[] {"pt", "k", "v"});
 
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, random.nextInt(1, 5));
+
         SchemaManager schemaManager =
                 new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
         TableSchema schema =
@@ -62,10 +68,9 @@ public class CdcRecordChannelComputerTest {
                                 rowType.getFields(),
                                 Collections.singletonList("pt"),
                                 Arrays.asList("pt", "k"),
-                                new HashMap<>(),
+                                options.toMap(),
                                 ""));
 
-        ThreadLocalRandom random = ThreadLocalRandom.current();
         int numInputs = random.nextInt(1000) + 1;
         List<Map<String, String>> input = new ArrayList<>();
         for (int i = 0; i < numInputs; i++) {
@@ -81,11 +86,15 @@ public class CdcRecordChannelComputerTest {
 
     @Test
     public void testSchemaNoPartition() throws Exception {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
         RowType rowType =
                 RowType.of(
                         new DataType[] {DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
                         new String[] {"k", "v"});
 
+        Options options = new Options();
+        options.set(CoreOptions.BUCKET, random.nextInt(1, 5));
+
         SchemaManager schemaManager =
                 new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
         TableSchema schema =
@@ -94,10 +103,9 @@ public class CdcRecordChannelComputerTest {
                                 rowType.getFields(),
                                 Collections.emptyList(),
                                 Collections.singletonList("k"),
-                                new HashMap<>(),
+                                options.toMap(),
                                 ""));
 
-        ThreadLocalRandom random = ThreadLocalRandom.current();
         int numInputs = random.nextInt(1000) + 1;
         List<Map<String, String>> input = new ArrayList<>();
         for (int i = 0; i < numInputs; i++) {

Reply via email to