This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 38f1903be [Feature][API] Add options check before create source and
sink and transform in FactoryUtil (#4424)
38f1903be is described below
commit 38f1903be2baebc6a903012c3a3129168aec34c8
Author: Eric <[email protected]>
AuthorDate: Tue Mar 28 21:24:03 2023 +0800
[Feature][API] Add options check before create source and sink and
transform in FactoryUtil (#4424)
---
.../seatunnel/api/table/factory/FactoryUtil.java | 4 ++
.../connectors/seatunnel/kafka/config/Config.java | 7 +++
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 37 +++--------
.../seatunnel/kafka/sink/KafkaSinkCommitter.java | 7 +--
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 10 +--
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 73 ++++++++++------------
.../seatunnel/cassandra/CassandraIT.java | 3 +
.../test/resources/extractTopic_fake_to_kafka.conf | 1 +
.../test/resources/kafkasink_fake_to_kafka.conf | 1 +
.../test/resources/kafkasource_canal_to_kafka.conf | 1 +
10 files changed, 69 insertions(+), 75 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index a330b4e37..6ac939149 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -114,6 +115,7 @@ public final class FactoryUtil {
ReadonlyConfig options,
ClassLoader classLoader) {
TableFactoryContext context = new TableFactoryContext(acceptedTables,
options, classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
TableSource<T, SplitT, StateT> tableSource =
factory.createSource(context);
validateAndApplyMetadata(acceptedTables, tableSource);
return tableSource.createSource();
@@ -136,6 +138,7 @@ public final class FactoryUtil {
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(catalogTable), options,
classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createSink(context).createSink();
} catch (Throwable t) {
throw new FactoryException(
@@ -321,6 +324,7 @@ public final class FactoryUtil {
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(catalogTable), options,
classLoader);
+
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index ff051b96d..2dffda4f4 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -164,4 +164,11 @@ public class Config {
"The processing method of data format error. The
default value is fail, and the optional value is (fail, skip). "
+ "When fail is selected, data format
error will block and an exception will be thrown. "
+ "When skip is selected, data format
error will skip this line data.");
+
+ public static final Option<KafkaSemantics> SEMANTICS =
+ Options.key("semantics")
+ .enumType(KafkaSemantics.class)
+ .defaultValue(KafkaSemantics.NON)
+ .withDescription(
+ "Semantics that can be chosen
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index e09eb08e8..cbd409f99 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -20,7 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,60 +30,40 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-
/**
* Kafka Sink implementation by using SeaTunnel sink API. This class contains
the method to create
* {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
*/
@AutoService(SeaTunnelSink.class)
+@NoArgsConstructor
public class KafkaSink
implements SeaTunnelSink<
SeaTunnelRow, KafkaSinkState, KafkaCommitInfo,
KafkaAggregatedCommitInfo> {
- private Config pluginConfig;
+ private ReadonlyConfig pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
- public KafkaSink() {}
-
- public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
BOOTSTRAP_SERVERS.key());
- if (!result.isSuccess()) {
- throw new KafkaConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
+ public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = rowType;
}
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
BOOTSTRAP_SERVERS.key());
- if (!result.isSuccess()) {
- throw new KafkaConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- this.pluginConfig = pluginConfig;
+ ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+ .validate(new KafkaSinkFactory().optionRule());
+ this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 4d62c00c6..ed4e28080 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
@@ -33,11 +32,11 @@ import java.util.Properties;
@Slf4j
public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
- private final Config pluginConfig;
+ private final ReadonlyConfig pluginConfig;
private KafkaInternalProducer<?, ?> kafkaProducer;
- public KafkaSinkCommitter(Config pluginConfig) {
+ public KafkaSinkCommitter(ReadonlyConfig pluginConfig) {
this.pluginConfig = pluginConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index b0cadf736..a9a0315ab 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -47,7 +45,11 @@ public class KafkaSinkFactory implements TableSinkFactory {
Arrays.asList(
MessageFormat.JSON, MessageFormat.CANAL_JSON,
MessageFormat.TEXT),
Config.TOPIC)
- .optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS,
Config.TRANSACTION_PREFIX)
+ .optional(
+ Config.KAFKA_CONFIG,
+ Config.ASSIGN_PARTITIONS,
+ Config.TRANSACTION_PREFIX,
+ Config.SEMANTICS)
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
}
@@ -56,7 +58,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
public TableSink createSink(TableFactoryContext context) {
return () ->
new KafkaSink(
- ConfigFactory.parseMap(context.getOptions().toMap()),
+ context.getOptions(),
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6ed287f2d..c7fdb4cee 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,13 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
@@ -33,6 +30,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSer
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -45,12 +43,14 @@ import java.util.Properties;
import java.util.Random;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SEMANTICS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
@@ -71,20 +71,22 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
public KafkaSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
- Config pluginConfig,
+ ReadonlyConfig pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.seaTunnelRowType = seaTunnelRowType;
- if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
- MessageContentPartitioner.setAssignPartitions(
- pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
+ if (pluginConfig.get(ASSIGN_PARTITIONS) != null
+ &&
!CollectionUtils.isEmpty(pluginConfig.get(ASSIGN_PARTITIONS))) {
+
MessageContentPartitioner.setAssignPartitions(pluginConfig.get(ASSIGN_PARTITIONS));
}
- if (pluginConfig.hasPath(TRANSACTION_PREFIX.key())) {
- this.transactionPrefix =
pluginConfig.getString(TRANSACTION_PREFIX.key());
+
+ if (pluginConfig.get(TRANSACTION_PREFIX) != null) {
+ this.transactionPrefix = pluginConfig.get(TRANSACTION_PREFIX);
} else {
Random random = new Random();
this.transactionPrefix = String.format("SeaTunnel%04d",
random.nextInt(PREFIX_RANGE));
}
+
restoreState(kafkaStates);
this.seaTunnelRowSerializer = getSerializer(pluginConfig,
seaTunnelRowType);
if
(KafkaSemantics.EXACTLY_ONCE.equals(getKafkaSemantics(pluginConfig))) {
@@ -141,21 +143,20 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
}
- private Properties getKafkaProperties(Config pluginConfig) {
+ private Properties getKafkaProperties(ReadonlyConfig pluginConfig) {
Properties kafkaProperties = new Properties();
- if (CheckConfigUtil.isValidParam(pluginConfig, KAFKA_CONFIG.key())) {
- pluginConfig
- .getObject(KAFKA_CONFIG.key())
- .forEach((key, value) -> kafkaProperties.put(key,
value.unwrapped()));
+ if (pluginConfig.get(KAFKA_CONFIG) != null) {
+ pluginConfig.get(KAFKA_CONFIG).forEach((key, value) ->
kafkaProperties.put(key, value));
}
- if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
+
+ if (pluginConfig.get(ASSIGN_PARTITIONS) != null) {
kafkaProperties.put(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
"org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
}
+
kafkaProperties.put(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-
pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
pluginConfig.get(BOOTSTRAP_SERVERS));
kafkaProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
kafkaProperties.put(
@@ -164,24 +165,18 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
- Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(pluginConfig);
- MessageFormat messageFormat = readonlyConfig.get(FORMAT);
+ ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+ MessageFormat messageFormat = pluginConfig.get(FORMAT);
String delimiter = DEFAULT_FIELD_DELIMITER;
- if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
- delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
- }
- String topic = null;
- if (pluginConfig.hasPath(TOPIC.key())) {
- topic = pluginConfig.getString(TOPIC.key());
+
+ if (pluginConfig.get(FIELD_DELIMITER) != null) {
+ delimiter = pluginConfig.get(FIELD_DELIMITER);
}
- if (pluginConfig.hasPath(PARTITION.key())) {
+
+ String topic = pluginConfig.get(TOPIC);
+ if (pluginConfig.get(PARTITION) != null) {
return DefaultSeaTunnelRowSerializer.create(
- topic,
- pluginConfig.getInt(PARTITION.key()),
- seaTunnelRowType,
- messageFormat,
- delimiter);
+ topic, pluginConfig.get(PARTITION), seaTunnelRowType,
messageFormat, delimiter);
} else {
return DefaultSeaTunnelRowSerializer.create(
topic,
@@ -192,9 +187,9 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
}
- private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
- if (pluginConfig.hasPath("semantics")) {
- return pluginConfig.getEnum(KafkaSemantics.class, "semantics");
+ private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
+ if (pluginConfig.get(SEMANTICS) != null) {
+ return pluginConfig.get(SEMANTICS);
}
return KafkaSemantics.NON;
}
@@ -211,10 +206,10 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
private List<String> getPartitionKeyFields(
- Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
- List<String> partitionKeyFields =
- pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
+ ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+
+ if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
+ List<String> partitionKeyFields =
pluginConfig.get(PARTITION_KEY_FIELDS);
List<String> rowTypeFieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
for (String partitionKeyField : partitionKeyFields) {
if (!rowTypeFieldNames.contains(partitionKeyField)) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
index d991cea1b..ad548295b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
@@ -68,6 +68,7 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
@@ -141,10 +142,12 @@ public class CassandraIT extends TestSuiteBase implements
TestResource {
session.execute(
SimpleStatement.builder(config.getString(SOURCE_TABLE))
.setKeyspace(KEYSPACE)
+ .setTimeout(Duration.ofSeconds(10))
.build());
session.execute(
SimpleStatement.builder(config.getString(SINK_TABLE))
.setKeyspace(KEYSPACE)
+ .setTimeout(Duration.ofSeconds(10))
.build());
} catch (Exception e) {
throw new RuntimeException("Initializing Cassandra table failed!",
e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
index 25f0bd75e..59bd7e9e8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
@@ -73,6 +73,7 @@ sink {
source_table_name = "fake1"
bootstrap.servers = "kafkaCluster:9092"
topic = "${c_string}"
+ format = json
partition_key_fields = ["c_map","c_string"]
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
index 086136bf5..a4ad964d3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
@@ -63,6 +63,7 @@ sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic"
+ format = json
partition_key_fields = ["c_map","c_string"]
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
index 2b8e219aa..367b13c0f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
@@ -53,5 +53,6 @@ sink {
bootstrap.servers = "kafkaCluster:9092"
topic = "test-canal-sink"
format = canal_json
+ partition = 0
}
}
\ No newline at end of file