This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e87366513 [flink] Refactor CDC related classes to support record types
other than CdcRecord (#2876)
e87366513 is described below
commit e87366513beeed7b8b687dff90dd42432f782cc3
Author: tsreaper <[email protected]>
AuthorDate: Wed Feb 21 11:41:38 2024 +0800
[flink] Refactor CDC related classes to support record types other than
CdcRecord (#2876)
---
.../table/sink/FixedBucketRowKeyExtractor.java | 3 -
.../paimon/table/sink/KeyAndBucketExtractor.java | 3 +
.../table/sink/FixedBucketRowKeyExtractorTest.java | 3 +-
.../flink/sink/cdc/CdcDynamicBucketSink.java | 44 ++-----
.../flink/sink/cdc/CdcDynamicBucketSinkBase.java | 128 +++++++++++++++++++++
...java => CdcFixedBucketChannelComputerBase.java} | 40 ++++---
.../flink/sink/cdc/CdcRecordChannelComputer.java | 35 +-----
.../sink/cdc/CdcWithBucketChannelComputer.java | 56 ---------
.../cdc/CdcMultiplexRecordChannelComputerTest.java | 3 +-
.../sink/cdc/CdcRecordChannelComputerTest.java | 16 ++-
10 files changed, 181 insertions(+), 150 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
index 62b5dc215..95a2c023a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
@@ -25,8 +25,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** {@link KeyAndBucketExtractor} for {@link InternalRow}. */
public class FixedBucketRowKeyExtractor extends RowKeyExtractor {
@@ -40,7 +38,6 @@ public class FixedBucketRowKeyExtractor extends
RowKeyExtractor {
public FixedBucketRowKeyExtractor(TableSchema schema) {
super(schema);
numBuckets = new CoreOptions(schema.options()).bucket();
- checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
sameBucketKeyAndTrimmedPrimaryKey =
schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
bucketKeyProjection =
CodeGenUtils.newProjection(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
index 66378b873..0b0b1a154 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.types.RowKind;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/**
* Utility interface to extract partition keys, bucket id, primary keys for
file store ({@code
* trimmedPrimaryKey}) and primary keys for external log system ({@code
logPrimaryKey}) from the
@@ -46,6 +48,7 @@ public interface KeyAndBucketExtractor<T> {
}
static int bucket(int hashcode, int numBuckets) {
+ checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
return Math.abs(hashcode % numBuckets);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
index 08f9d5e00..99f9cc63d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractorTest.java
@@ -67,7 +67,8 @@ public class FixedBucketRowKeyExtractorTest {
@Test
public void testIllegalBucket() {
- assertThatThrownBy(() -> extractor("", "", "a", -1))
+ GenericRow row = GenericRow.of(5, 6, 7);
+ assertThatThrownBy(() -> bucket(extractor("", "", "a", -1), row))
.hasMessageContaining("Num bucket is illegal");
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
index ec72405b0..574ff685f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -18,57 +18,27 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.DynamicBucketSink;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.utils.SerializableFunction;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-/** Sink for dynamic bucket table. */
-public class CdcDynamicBucketSink extends DynamicBucketSink<CdcRecord> {
+/** {@link CdcDynamicBucketSinkBase} for {@link CdcRecord}. */
+public class CdcDynamicBucketSink extends CdcDynamicBucketSinkBase<CdcRecord> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
public CdcDynamicBucketSink(FileStoreTable table) {
- super(table, null);
+ super(table);
}
@Override
- protected ChannelComputer<CdcRecord> assignerChannelComputer(Integer
numAssigners) {
- return new CdcAssignerChannelComputer(table.schema(), numAssigners);
- }
-
- @Override
- protected ChannelComputer<Tuple2<CdcRecord, Integer>> channelComputer2() {
- return new CdcWithBucketChannelComputer(table.schema());
- }
-
- @Override
- protected SerializableFunction<TableSchema,
PartitionKeyExtractor<CdcRecord>>
- extractorFunction() {
- return schema -> {
- CdcRecordKeyAndBucketExtractor extractor = new
CdcRecordKeyAndBucketExtractor(schema);
- return new PartitionKeyExtractor<CdcRecord>() {
- @Override
- public BinaryRow partition(CdcRecord record) {
- extractor.setRecord(record);
- return extractor.partition();
- }
-
- @Override
- public BinaryRow trimmedPrimaryKey(CdcRecord record) {
- extractor.setRecord(record);
- return extractor.trimmedPrimaryKey();
- }
- };
- };
+ protected KeyAndBucketExtractor<CdcRecord> createExtractor(TableSchema
schema) {
+ return new CdcRecordKeyAndBucketExtractor(schema);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSinkBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSinkBase.java
new file mode 100644
index 000000000..30a262819
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSinkBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.DynamicBucketSink;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import static org.apache.paimon.index.BucketAssigner.computeAssigner;
+
+/** CDC sink for dynamic bucket table. */
+public abstract class CdcDynamicBucketSinkBase<T> extends DynamicBucketSink<T>
{
+
+ public CdcDynamicBucketSinkBase(FileStoreTable table) {
+ super(table, null);
+ }
+
+ @Override
+ protected ChannelComputer<T> assignerChannelComputer(Integer numAssigners)
{
+ return new AssignerChannelComputer(numAssigners);
+ }
+
+ @Override
+ protected ChannelComputer<Tuple2<T, Integer>> channelComputer2() {
+ return new RecordWithBucketChannelComputer();
+ }
+
+ @Override
+ protected SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
extractorFunction() {
+ return schema -> {
+ KeyAndBucketExtractor<T> extractor = createExtractor(schema);
+ return new PartitionKeyExtractor<T>() {
+ @Override
+ public BinaryRow partition(T record) {
+ extractor.setRecord(record);
+ return extractor.partition();
+ }
+
+ @Override
+ public BinaryRow trimmedPrimaryKey(T record) {
+ extractor.setRecord(record);
+ return extractor.trimmedPrimaryKey();
+ }
+ };
+ };
+ }
+
+ protected abstract KeyAndBucketExtractor<T> createExtractor(TableSchema
schema);
+
+ private class AssignerChannelComputer implements ChannelComputer<T> {
+
+ private Integer numAssigners;
+
+ private transient int numChannels;
+ private transient KeyAndBucketExtractor<T> extractor;
+
+ public AssignerChannelComputer(Integer numAssigners) {
+ this.numAssigners = numAssigners;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.numAssigners = MathUtils.min(numAssigners, numChannels);
+ this.extractor = createExtractor(table.schema());
+ }
+
+ @Override
+ public int channel(T record) {
+ extractor.setRecord(record);
+ int partitionHash = extractor.partition().hashCode();
+ int keyHash = extractor.trimmedPrimaryKey().hashCode();
+ return computeAssigner(partitionHash, keyHash, numChannels,
numAssigners);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by key hash";
+ }
+ }
+
+ private class RecordWithBucketChannelComputer implements
ChannelComputer<Tuple2<T, Integer>> {
+
+ private transient int numChannels;
+ private transient KeyAndBucketExtractor<T> extractor;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = createExtractor(table.schema());
+ }
+
+ @Override
+ public int channel(Tuple2<T, Integer> record) {
+ extractor.setRecord(record.f0);
+ return ChannelComputer.select(extractor.partition(), record.f1,
numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by partition & bucket";
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketChannelComputerBase.java
similarity index 56%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketChannelComputerBase.java
index c93bd52be..685f02dc1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAssignerChannelComputer.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcFixedBucketChannelComputerBase.java
@@ -18,45 +18,47 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.utils.MathUtils;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-import static org.apache.paimon.index.BucketAssigner.computeAssigner;
-
-/** Hash key of a {@link CdcRecord}. */
-public class CdcAssignerChannelComputer implements ChannelComputer<CdcRecord> {
-
- private static final long serialVersionUID = 1L;
+/**
+ * {@link ChannelComputer} for distributing CDC records into writers for
fixed-bucket mode tables.
+ */
+public abstract class CdcFixedBucketChannelComputerBase<T> implements
ChannelComputer<T> {
- private final TableSchema schema;
- private Integer numAssigners;
+ protected final TableSchema schema;
private transient int numChannels;
- private transient CdcRecordKeyAndBucketExtractor extractor;
+ private transient KeyAndBucketExtractor<T> extractor;
- public CdcAssignerChannelComputer(TableSchema schema, Integer
numAssigners) {
+ public CdcFixedBucketChannelComputerBase(TableSchema schema) {
this.schema = schema;
- this.numAssigners = numAssigners;
}
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.numAssigners = MathUtils.min(numAssigners, numChannels);
- this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
+ this.extractor = createExtractor();
}
+ protected abstract KeyAndBucketExtractor<T> createExtractor();
+
@Override
- public int channel(CdcRecord record) {
+ public int channel(T record) {
extractor.setRecord(record);
- int partitionHash = extractor.partition().hashCode();
- int keyHash = extractor.trimmedPrimaryKey().hashCode();
- return computeAssigner(partitionHash, keyHash, numChannels,
numAssigners);
+ return channel(extractor.partition(), extractor.bucket());
+ }
+
+ @VisibleForTesting
+ int channel(BinaryRow partition, int bucket) {
+ return ChannelComputer.select(partition, bucket, numChannels);
}
@Override
public String toString() {
- return "shuffle by key hash";
+ return "shuffle by bucket";
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
index f90cb4ad8..630d0e223 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
@@ -18,43 +18,20 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-/** {@link ChannelComputer} for {@link CdcRecord}. */
-public class CdcRecordChannelComputer implements ChannelComputer<CdcRecord> {
+/** {@link CdcFixedBucketChannelComputerBase} for {@link CdcRecord}. */
+public class CdcRecordChannelComputer extends
CdcFixedBucketChannelComputerBase<CdcRecord> {
- private static final long serialVersionUID = 1L;
-
- private final TableSchema schema;
-
- private transient int numChannels;
- private transient KeyAndBucketExtractor<CdcRecord> extractor;
+ private static final long serialVersionUID = 2L;
public CdcRecordChannelComputer(TableSchema schema) {
- this.schema = schema;
- }
-
- @Override
- public void setup(int numChannels) {
- this.numChannels = numChannels;
- this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
- }
-
- @Override
- public int channel(CdcRecord record) {
- extractor.setRecord(record);
- return channel(extractor.partition(), extractor.bucket());
- }
-
- public int channel(BinaryRow partition, int bucket) {
- return ChannelComputer.select(partition, bucket, numChannels);
+ super(schema);
}
@Override
- public String toString() {
- return "shuffle by bucket";
+ protected KeyAndBucketExtractor<CdcRecord> createExtractor() {
+ return new CdcRecordKeyAndBucketExtractor(schema);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
deleted file mode 100644
index 198a3b0e0..000000000
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcWithBucketChannelComputer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.cdc;
-
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.ChannelComputer;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/** Hash key of a {@link CdcRecord} with bucket. */
-public class CdcWithBucketChannelComputer implements
ChannelComputer<Tuple2<CdcRecord, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- private final TableSchema schema;
-
- private transient int numChannels;
- private transient CdcRecordKeyAndBucketExtractor extractor;
-
- public CdcWithBucketChannelComputer(TableSchema schema) {
- this.schema = schema;
- }
-
- @Override
- public void setup(int numChannels) {
- this.numChannels = numChannels;
- this.extractor = new CdcRecordKeyAndBucketExtractor(schema);
- }
-
- @Override
- public int channel(Tuple2<CdcRecord, Integer> record) {
- extractor.setRecord(record.f0);
- return ChannelComputer.select(extractor.partition(), record.f1,
numChannels);
- }
-
- @Override
- public String toString() {
- return "shuffle by partition & bucket";
- }
-}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index 2ae627552..658f1abf1 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
@@ -75,6 +76,7 @@ public class CdcMultiplexRecordChannelComputerTest {
catalog.createDatabase(databaseName, true);
Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, ThreadLocalRandom.current().nextInt(1,
5));
conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME,
Duration.ofMillis(10));
RowType rowTypeWithPartition =
@@ -130,7 +132,6 @@ public class CdcMultiplexRecordChannelComputerTest {
@Test
public void testSchemaNoPartition() {
-
ThreadLocalRandom random = ThreadLocalRandom.current();
int numInputs = random.nextInt(1000) + 1;
List<Map<String, String>> input = new ArrayList<>();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
index ccfc4adad..9a19013e2 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
@@ -18,9 +18,11 @@
package org.apache.paimon.flink.sink.cdc;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -49,11 +51,15 @@ public class CdcRecordChannelComputerTest {
@Test
public void testSchemaWithPartition() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.DOUBLE()},
new String[] {"pt", "k", "v"});
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, random.nextInt(1, 5));
+
SchemaManager schemaManager =
new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
TableSchema schema =
@@ -62,10 +68,9 @@ public class CdcRecordChannelComputerTest {
rowType.getFields(),
Collections.singletonList("pt"),
Arrays.asList("pt", "k"),
- new HashMap<>(),
+ options.toMap(),
""));
- ThreadLocalRandom random = ThreadLocalRandom.current();
int numInputs = random.nextInt(1000) + 1;
List<Map<String, String>> input = new ArrayList<>();
for (int i = 0; i < numInputs; i++) {
@@ -81,11 +86,15 @@ public class CdcRecordChannelComputerTest {
@Test
public void testSchemaNoPartition() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(),
DataTypes.DOUBLE()},
new String[] {"k", "v"});
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, random.nextInt(1, 5));
+
SchemaManager schemaManager =
new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
TableSchema schema =
@@ -94,10 +103,9 @@ public class CdcRecordChannelComputerTest {
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("k"),
- new HashMap<>(),
+ options.toMap(),
""));
- ThreadLocalRandom random = ThreadLocalRandom.current();
int numInputs = random.nextInt(1000) + 1;
List<Map<String, String>> input = new ArrayList<>();
for (int i = 0; i < numInputs; i++) {