This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f65f44f44 [Improve][Connector-V2][Kafka] Support to specify multiple
partition keys (#3230)
f65f44f44 is described below
commit f65f44f44c4ca8bc43760b8be0666412f3284cf8
Author: Harvey Yue <[email protected]>
AuthorDate: Thu Nov 17 15:22:59 2022 +0800
[Improve][Connector-V2][Kafka] Support to specify multiple partition keys
(#3230)
---
docs/en/connector-v2/sink/Kafka.md | 35 +--
.../connectors/seatunnel/kafka/config/Config.java | 6 +-
.../serialize/DefaultSeaTunnelRowSerializer.java | 68 +++--
.../kafka/serialize/SeaTunnelRowSerializer.java | 9 -
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 2 +-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 78 ++----
.../connector-kafka-e2e}/pom.xml | 17 +-
.../e2e/connector}/kafka/KafkaContainer.java | 2 +-
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 295 +++++++++++++++++++++
.../kafka/kafkasource_earliest_to_console.conf | 5 +-
.../kafka/kafkasource_group_offset_to_console.conf | 4 +-
.../kafka/kafkasource_latest_to_console.conf | 4 +-
.../kafkasource_specific_offsets_to_console.conf | 6 +-
.../kafka/kafkasource_timestamp_to_console.conf | 6 +-
.../test/resources/kafkasink_fake_to_kafka.conf | 68 +++++
.../resources}/kafkasource_json_to_console.conf | 16 +-
.../resources}/kafkasource_text_to_console.conf | 16 +-
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../e2e/flink/v2/kafka/KafkaContainer.java | 151 -----------
.../kafka/KafkaSourceStartConfigToConsoleIT.java | 98 -------
.../e2e/flink/v2/kafka/KafkaTestBaseIT.java | 90 -------
.../kafka/kafkasource_json_to_console.conf | 91 -------
.../kafka/kafkasource_text_to_console.conf | 92 -------
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 -
.../connector-kafka-spark-e2e/pom.xml | 53 ----
.../spark/v2/kafka/KafkaSourceJsonToConsoleIT.java | 122 ---------
.../kafka/KafkaSourceStartConfigToConsoleIT.java | 98 -------
.../spark/v2/kafka/KafkaSourceTextToConsoleIT.java | 125 ---------
.../e2e/spark/v2/kafka/KafkaTestBaseIT.java | 90 -------
.../kafka/kafkasource_earliest_to_console.conf | 75 ------
.../kafka/kafkasource_group_offset_to_console.conf | 75 ------
.../kafka/kafkasource_latest_to_console.conf | 75 ------
.../kafkasource_specific_offsets_to_console.conf | 79 ------
.../kafka/kafkasource_timestamp_to_console.conf | 76 ------
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 -
35 files changed, 508 insertions(+), 1522 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index 74de8a57a..d480d5ab5 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -15,17 +15,17 @@ By default, we will use 2pc to guarantee the message is
sent to kafka exactly on
## Options
-| name | type | required | default value |
-| ------------------ | ---------------------- | -------- | ------------- |
-| topic | string | yes | - |
-| bootstrap.servers | string | yes | - |
-| kafka.* | kafka producer config | no | - |
-| semantic | string | no | NON |
-| partition_key | string | no | - |
-| partition | int | no | - |
-| assign_partitions | list | no | - |
-| transaction_prefix | string | no | - |
-| common-options | config | no | - |
+| name | type | required | default value |
+|----------------------|-----------------------| -------- | ------------- |
+| topic | string | yes | - |
+| bootstrap.servers | string | yes | - |
+| kafka.* | kafka producer config | no | - |
+| semantic | string | no | NON |
+| partition_key_fields | array | no | - |
+| partition | int | no | - |
+| assign_partitions | array | no | - |
+| transaction_prefix | string | no | - |
+| common-options | config | no | - |
### topic [string]
@@ -51,11 +51,11 @@ In AT_LEAST_ONCE, producer will wait for all outstanding
messages in the Kafka b
NON does not provide any guarantees: messages may be lost in case of issues on
the Kafka broker and messages may be duplicated.
-### partition_key [string]
+### partition_key_fields [array]
-Configure which field is used as the key of the kafka message.
+Configure which fields are used as the key of the kafka message.
-For example, if you want to use value of a field from upstream data as key,
you can assign it to the field name.
+For example, if you want to use value of fields from upstream data as key, you
can assign field names to this property.
Upstream data is the following:
@@ -66,13 +66,13 @@ Upstream data is the following:
If name is set as the key, then the hash value of the name column will
determine which partition the message is sent to.
-If the field name does not exist in the upstream data, the configured
parameter will be used as the key.
+If not set partition key fields, the null message key will be sent to.
### partition [int]
We can specify the partition, all messages will be sent to this partition.
-### assign_partitions [list]
+### assign_partitions [array]
We can decide which partition to send based on the content of the message. The
function of this parameter is to distribute information.
@@ -113,3 +113,6 @@ sink {
### 2.3.0-beta 2022-10-20
- Add Kafka Sink Connector
+### next version
+
+- [Feature] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 8cf6726fe..6030bfa39 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -101,10 +101,10 @@ public class Config {
.withDescription("We can decide which partition to send based on
the content of the message. " +
"The function of this parameter is to distribute
information.");
- public static final Option<String> PARTITION_KEY =
Options.key("partition_key")
- .stringType()
+ public static final Option<List<String>> PARTITION_KEY_FIELDS =
Options.key("partition_key_fields")
+ .listType()
.noDefaultValue()
- .withDescription("Configure which field is used as the key of the
kafka message.");
+ .withDescription("Configure which fields are used as the key of
the kafka message.");
public static final Option<StartMode> START_MODE =
Options.key("start_mode")
.objectType(StartMode.class)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 24a2b242f..b7c43acbb 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -17,42 +17,78 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
+import java.util.List;
+import java.util.function.Function;
+
public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byte[], byte[]> {
- private int partation = -1;
+ private Integer partition;
private final String topic;
- private final JsonSerializationSchema jsonSerializationSchema;
+ private final SerializationSchema keySerialization;
+ private final SerializationSchema valueSerialization;
public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType
seaTunnelRowType) {
- this.topic = topic;
- this.jsonSerializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
+ this(topic, element -> null,
createSerializationSchema(seaTunnelRowType));
}
- public DefaultSeaTunnelRowSerializer(String topic, int partation,
SeaTunnelRowType seaTunnelRowType) {
+ public DefaultSeaTunnelRowSerializer(String topic, Integer partition,
SeaTunnelRowType seaTunnelRowType) {
this(topic, seaTunnelRowType);
- this.partation = partation;
+ this.partition = partition;
+ }
+
+ public DefaultSeaTunnelRowSerializer(String topic,
+ List<String> keyFieldNames,
+ SeaTunnelRowType seaTunnelRowType) {
+ this(topic, createKeySerializationSchema(keyFieldNames,
seaTunnelRowType),
+ createSerializationSchema(seaTunnelRowType));
+ }
+
+ public DefaultSeaTunnelRowSerializer(String topic,
+ SerializationSchema keySerialization,
+ SerializationSchema
valueSerialization) {
+ this.topic = topic;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = valueSerialization;
}
@Override
public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
- if (this.partation != -1) {
- return new ProducerRecord<>(topic, this.partation, null,
jsonSerializationSchema.serialize(row));
- }
- else {
- return new ProducerRecord<>(topic, null,
jsonSerializationSchema.serialize(row));
- }
+ return new ProducerRecord<>(topic, partition,
+ keySerialization.serialize(row),
valueSerialization.serialize(row));
}
- @Override
- public ProducerRecord<byte[], byte[]> serializeRowByKey(String key,
SeaTunnelRow row) {
- //if the key is null, kafka will send message to a random partition
- return new ProducerRecord<>(topic, key == null ? null :
key.getBytes(), jsonSerializationSchema.serialize(row));
+ private static SerializationSchema
createSerializationSchema(SeaTunnelRowType rowType) {
+ return new JsonSerializationSchema(rowType);
}
+ private static SerializationSchema
createKeySerializationSchema(List<String> keyFieldNames,
+
SeaTunnelRowType seaTunnelRowType) {
+ int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+ SeaTunnelDataType[] keyFieldDataTypeArr = new
SeaTunnelDataType[keyFieldNames.size()];
+ for (int i = 0; i < keyFieldNames.size(); i++) {
+ String keyFieldName = keyFieldNames.get(i);
+ int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+ keyFieldIndexArr[i] = rowFieldIndex;
+ keyFieldDataTypeArr[i] =
seaTunnelRowType.getFieldType(rowFieldIndex);
+ }
+ SeaTunnelRowType keyType = new
SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+ SerializationSchema keySerializationSchema =
createSerializationSchema(keyType);
+
+ Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
+ Object[] keyFields = new Object[keyFieldIndexArr.length];
+ for (int i = 0; i < keyFieldIndexArr.length; i++) {
+ keyFields[i] = row.getField(keyFieldIndexArr[i]);
+ }
+ return new SeaTunnelRow(keyFields);
+ };
+ return row ->
keySerializationSchema.serialize(keyDataExtractor.apply(row));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
index d96753155..9f12591ea 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java
@@ -30,13 +30,4 @@ public interface SeaTunnelRowSerializer<K, V> {
* @return kafka record.
*/
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);
-
- /**
- * Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link
ProducerRecord}.
- *
- * @param key String
- * @param row seatunnel row
- * @return kafka record.
- */
- ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index fb9b1e5c4..fef40310e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -36,7 +36,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
.optional(Config.KAFKA_CONFIG_PREFIX,
Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
- .exclusive(Config.PARTITION, Config.PARTITION_KEY)
+ .exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 8a7598fc1..dc143ec60 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
@@ -40,11 +41,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
-import java.util.function.Function;
/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to
Kafka.
@@ -52,44 +53,21 @@ import java.util.function.Function;
public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow,
KafkaCommitInfo, KafkaSinkState> {
private final SinkWriter.Context context;
- private final Config pluginConfig;
- private final Function<SeaTunnelRow, String> partitionExtractor;
private String transactionPrefix;
private long lastCheckpointId = 0;
- private int partition;
private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]>
seaTunnelRowSerializer;
private static final int PREFIX_RANGE = 10000;
- // check config
- @Override
- public void write(SeaTunnelRow element) {
- ProducerRecord<byte[], byte[]> producerRecord = null;
- //Determine the partition of the kafka send message based on the field
name
- if (pluginConfig.hasPath(PARTITION_KEY.key())){
- String key = partitionExtractor.apply(element);
- producerRecord = seaTunnelRowSerializer.serializeRowByKey(key,
element);
- }
- else {
- producerRecord = seaTunnelRowSerializer.serializeRow(element);
- }
- kafkaProducerSender.send(producerRecord);
- }
-
public KafkaSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
- this.pluginConfig = pluginConfig;
- this.partitionExtractor = createPartitionExtractor(pluginConfig,
seaTunnelRowType);
- if (pluginConfig.hasPath(PARTITION.key())) {
- this.partition = pluginConfig.getInt(PARTITION.key());
- }
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
}
@@ -116,6 +94,12 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
}
+ @Override
+ public void write(SeaTunnelRow element) {
+ ProducerRecord<byte[], byte[]> producerRecord =
seaTunnelRowSerializer.serializeRow(element);
+ kafkaProducerSender.send(producerRecord);
+ }
+
@Override
public List<KafkaSinkState> snapshotState(long checkpointId) {
List<KafkaSinkState> states =
kafkaProducerSender.snapshotState(checkpointId);
@@ -145,8 +129,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
private Properties getKafkaProperties(Config pluginConfig) {
- Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
-
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX.key(),
false);
+ Config kafkaConfig =
TypesafeConfigUtils.extractSubConfig(pluginConfig, KAFKA_CONFIG_PREFIX.key(),
false);
Properties kafkaProperties = new Properties();
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
@@ -160,13 +143,13 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
return kafkaProperties;
}
- // todo: parse the target field from config
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- if (pluginConfig.hasPath(PARTITION.key())){
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
this.partition, seaTunnelRowType);
- }
- else {
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
seaTunnelRowType);
+ if (pluginConfig.hasPath(PARTITION.key())) {
+ return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+ pluginConfig.getInt(PARTITION.key()), seaTunnelRowType);
+ } else {
+ return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
+ getPartitionKeyFields(pluginConfig, seaTunnelRowType),
seaTunnelRowType);
}
}
@@ -188,23 +171,18 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
}
- private Function<SeaTunnelRow, String> createPartitionExtractor(Config
pluginConfig,
-
SeaTunnelRowType seaTunnelRowType) {
- if (!pluginConfig.hasPath(PARTITION_KEY.key())){
- return row -> null;
- }
- String partitionKey = pluginConfig.getString(PARTITION_KEY.key());
- List<String> fieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
- if (!fieldNames.contains(partitionKey)) {
- return row -> partitionKey;
- }
- int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
- return row -> {
- Object partitionFieldValue = row.getField(partitionFieldIndex);
- if (partitionFieldValue != null) {
- return partitionFieldValue.toString();
+ private List<String> getPartitionKeyFields(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
+ if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
+ List<String> partitionKeyFields =
pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
+ List<String> rowTypeFieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
+ for (String partitionKeyField : partitionKeyFields) {
+ if (!rowTypeFieldNames.contains(partitionKeyField)) {
+ throw new IllegalArgumentException(String.format(
+ "Partition key field not found: %s, rowType: %s",
partitionKeyField, rowTypeFieldNames));
+ }
}
- return null;
- };
+ return partitionKeyFields;
+ }
+ return Collections.emptyList();
}
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
similarity index 88%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
index 2eccfa9c5..d2ee1f526 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/pom.xml
@@ -18,39 +18,36 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-kafka-flink-e2e</artifactId>
+ <artifactId>connector-kafka-e2e</artifactId>
<dependencies>
+ <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-flink-e2e-base</artifactId>
+ <artifactId>connector-kafka</artifactId>
<version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
<scope>test</scope>
</dependency>
-
- <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-kafka</artifactId>
+ <artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
+ <artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-assert</artifactId>
+ <artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaContainer.java
similarity index 99%
rename from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaContainer.java
index 117daf78e..c3fa2cb3b 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaContainer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.spark.v2.kafka;
+package org.apache.seatunnel.e2e.connector.kafka;
import com.github.dockerjava.api.command.InspectContainerResponse;
import lombok.SneakyThrows;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
new file mode 100644
index 000000000..6d907cd09
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaIT extends TestSuiteBase implements TestResource {
+ private static final String KAFKA_IMAGE_NAME =
"confluentinc/cp-kafka:6.2.1";
+
+ private static final int KAFKA_PORT = 9093;
+
+ private static final String KAFKA_HOST = "kafkaCluster";
+
+ private KafkaProducer<byte[], byte[]> producer;
+
+ private KafkaContainer kafkaContainer;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ kafkaContainer = new
KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+
+ log.info("Write 100 records to topic test_topic_source");
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_source", SEATUNNEL_ROW_TYPE);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+
+ @TestTemplate
+ public void testSinkKafka(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafkasink_fake_to_kafka.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ String topicName = "test_topic";
+ Map<String, String> data = new HashMap<>();
+ ObjectMapper objectMapper = new ObjectMapper();
+ try (KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(kafkaConsumerConfig())) {
+ consumer.subscribe(Arrays.asList(topicName));
+ Map<TopicPartition, Long> offsets =
consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
+ Long endOffset = offsets.entrySet().iterator().next().getValue();
+ Long lastProcessedOffset = -1L;
+
+ do {
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord<String, String> record : records) {
+ if (lastProcessedOffset < record.offset()) {
+
+ data.put(record.key(), record.value());
+ }
+ lastProcessedOffset = record.offset();
+ }
+ } while (lastProcessedOffset < endOffset - 1);
+ }
+ String key = data.keySet().iterator().next();
+ ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
+ Assertions.assertTrue(objectNode.has("c_map"));
+ Assertions.assertTrue(objectNode.has("c_string"));
+ Assertions.assertEquals(10, data.size());
+ }
+
+ @TestTemplate
+ public void testSourceKafkaTextToConsole(TestContainer container) throws
IOException, InterruptedException {
+ TextSerializationSchema serializer = TextSerializationSchema.builder()
+ .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+ .delimiter(",")
+ .build();
+ generateTestData(row -> new ProducerRecord<>("test_topic_text", null,
serializer.serialize(row)), 0, 100);
+ Container.ExecResult execResult =
container.executeJob("/kafkasource_text_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testSourceKafkaJsonToConsole(TestContainer container) throws
IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ Container.ExecResult execResult =
container.executeJob("/kafkasource_json_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testSourceKafka(TestContainer container) throws IOException,
InterruptedException {
+ testKafkaLatestToConsole(container);
+ testKafkaEarliestToConsole(container);
+ testKafkaSpecificOffsetsToConsole(container);
+ testKafkaTimestampToConsole(container);
+ }
+
+ @TestTemplate
+ public void testSourceKafkaStartConfig(TestContainer container) throws
IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic_group", SEATUNNEL_ROW_TYPE);
+ generateTestData(row -> serializer.serializeRow(row), 100, 150);
+ testKafkaGroupOffsetsToConsole(container);
+ }
+
+ public void testKafkaLatestToConsole(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafka/kafkasource_latest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaEarliestToConsole(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafka/kafkasource_earliest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaSpecificOffsetsToConsole(TestContainer container)
throws IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafka/kafkasource_specific_offsets_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaGroupOffsetsToConsole(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafka/kafkasource_group_offset_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaTimestampToConsole(TestContainer container) throws
IOException, InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/kafka/kafkasource_timestamp_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ private void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ private Properties kafkaConsumerConfig() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG,
"seatunnel-kafka-sink-group");
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ return props;
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ private void generateTestData(ProducerRecordConverter converter, int
start, int end) {
+ for (int i = start; i < end; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i),
+ Collections.singletonMap("key",
Short.parseShort("1")),
+ new Byte[]{Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ ProducerRecord<byte[], byte[]> producerRecord =
converter.convert(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE = new
SeaTunnelRowType(
+ new String[]{
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ }
+ );
+
+ interface ProducerRecordConverter {
+ ProducerRecord<byte[], byte[]> convert(SeaTunnelRow row);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
similarity index 95%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 97f9ddb5d..8e52fedfc 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -20,6 +20,7 @@
env {
# You can set flink configuration here
+ job.mode = "BATCH"
execution.parallelism = 1
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
@@ -27,8 +28,8 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_source"
result_table_name = "kafka_table"
# The default format is json, which is optional
format = json
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
similarity index 96%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 2a7f9828c..d3ce8e6b3 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -27,8 +27,8 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_group"
result_table_name = "kafka_table"
# The default format is json, which is optional
format = json
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
similarity index 96%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 736104ea0..cfc992e5b 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -27,8 +27,8 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_source"
result_table_name = "kafka_table"
# The default format is json, which is optional
format = json
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
similarity index 94%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index b75756327..512119851 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -27,8 +27,8 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_source"
result_table_name = "kafka_table"
# The default format is json, which is optional
format = json
@@ -40,7 +40,7 @@ source {
}
start_mode.offsets = {
- test_topic-0 = 50
+ test_topic_source-0 = 50
}
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
similarity index 94%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index b491c6546..431205f3c 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -27,8 +27,8 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_source"
result_table_name = "kafka_table"
# The default format is json, which is optional
format = json
@@ -66,7 +66,7 @@ sink {
},
{
rule_type = MAX
- rule_value = 149
+ rule_value = 99
}
]
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
new file mode 100644
index 000000000..086136bf5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, smallint>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic"
+ partition_key_fields = ["c_map","c_string"]
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
similarity index 89%
rename from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
index cf5c67743..129149623 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
@@ -19,17 +19,21 @@
######
env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- source.parallelism = 1
+ execution.parallelism = 1
job.mode = "BATCH"
-}
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_json"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
schema = {
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
similarity index 90%
rename from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
index 94af38e64..67879791b 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
@@ -19,17 +19,21 @@
######
env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- source.parallelism = 1
+ execution.parallelism = 1
job.mode = "BATCH"
-}
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_topic_text"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
schema = {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index eb8614495..6d3b7703c 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
<module>connector-file-local-e2e</module>
<module>connector-cassandra-e2e</module>
<module>connector-http-e2e</module>
+ <module>connector-kafka-e2e</module>
</modules>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
deleted file mode 100644
index 7d7fe1920..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaContainer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.kafka;
-
-import com.github.dockerjava.api.command.InspectContainerResponse;
-import lombok.SneakyThrows;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.utility.DockerImageName;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-
-/**
- * This container wraps Confluent Kafka and Zookeeper (optionally)
- */
-public class KafkaContainer extends GenericContainer<KafkaContainer> {
-
- private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("confluentinc/cp-kafka");
-
- public static final int KAFKA_PORT = 9093;
-
- public static final int ZOOKEEPER_PORT = 2181;
-
- private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
-
- protected String externalZookeeperConnect = null;
-
- public KafkaContainer(final DockerImageName dockerImageName) {
- super(dockerImageName);
- dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
-
- withExposedPorts(KAFKA_PORT);
-
- // Use two listeners with different names, it will force Kafka to
communicate with itself via internal
- // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise
Kafka will try to use the advertised listener
- withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KAFKA_PORT +
",BROKER://0.0.0.0:9092");
- withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
- withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
-
- withEnv("KAFKA_BROKER_ID", "1");
- withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR",
DEFAULT_INTERNAL_TOPIC_RF);
- withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS",
DEFAULT_INTERNAL_TOPIC_RF);
- withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR",
DEFAULT_INTERNAL_TOPIC_RF);
- withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR",
DEFAULT_INTERNAL_TOPIC_RF);
- withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
- withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
- }
-
- public KafkaContainer withEmbeddedZookeeper() {
- externalZookeeperConnect = null;
- return self();
- }
-
- public KafkaContainer withExternalZookeeper(String connectString) {
- externalZookeeperConnect = connectString;
- return self();
- }
-
- public String getBootstrapServers() {
- return String.format("PLAINTEXT://%s:%s", getLinuxLocalIp(),
getMappedPort(KAFKA_PORT));
- }
-
- @Override
- protected void configure() {
- withEnv(
- "KAFKA_ADVERTISED_LISTENERS",
- String.format("BROKER://%s:9092", getNetwork() != null ?
getNetworkAliases().get(1) : "localhost")
- );
-
- String command = "#!/bin/bash\n";
- if (externalZookeeperConnect != null) {
- withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect);
- } else {
- addExposedPort(ZOOKEEPER_PORT);
- withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT);
- command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' >
zookeeper.properties\n";
- command += "echo 'dataDir=/var/lib/zookeeper/data' >>
zookeeper.properties\n";
- command += "echo 'dataLogDir=/var/lib/zookeeper/log' >>
zookeeper.properties\n";
- command += "zookeeper-server-start zookeeper.properties &\n";
- }
-
- // Optimization: skip the checks
- command += "echo '' > /etc/confluent/docker/ensure \n";
- // Run the original command
- command += "/etc/confluent/docker/run \n";
- withCommand("sh", "-c", command);
- }
-
- @Override
- @SneakyThrows
- protected void containerIsStarted(InspectContainerResponse containerInfo) {
- String brokerAdvertisedListener =
brokerAdvertisedListener(containerInfo);
- ExecResult result = execInContainer(
- "kafka-configs",
- "--alter",
- "--bootstrap-server",
- brokerAdvertisedListener,
- "--entity-type",
- "brokers",
- "--entity-name",
- getEnvMap().get("KAFKA_BROKER_ID"),
- "--add-config",
- "advertised.listeners=[" + String.join(",",
getBootstrapServers(), brokerAdvertisedListener) + "]"
- );
- if (result.getExitCode() != 0) {
- throw new IllegalStateException(result.toString());
- }
- }
-
- protected String brokerAdvertisedListener(InspectContainerResponse
containerInfo) {
- return String.format("BROKER://%s:%s",
containerInfo.getConfig().getHostName(), "9092");
- }
-
- public String getLinuxLocalIp() {
- String ip = "";
- try {
- Enumeration<NetworkInterface> networkInterfaces =
NetworkInterface.getNetworkInterfaces();
- while (networkInterfaces.hasMoreElements()) {
- NetworkInterface networkInterface =
networkInterfaces.nextElement();
- Enumeration<InetAddress> inetAddresses =
networkInterface.getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- InetAddress inetAddress = inetAddresses.nextElement();
- if (!inetAddress.isLoopbackAddress() && inetAddress
instanceof Inet4Address) {
- ip = inetAddress.getHostAddress();
- }
- }
- }
- } catch (SocketException ex) {
- ex.printStackTrace();
- }
- return ip;
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
deleted file mode 100644
index 1d7225ae8..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-
-@Slf4j
-public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
- @Override
- protected void generateTestData() {
- generateStepTestData(0, 100);
- }
-
- private void generateStepTestData(int start, int end) {
-
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE
- }
- );
-
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
- for (int i = start; i < end; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i)
- });
- ProducerRecord<byte[], byte[]> producerRecord =
serializer.serializeRow(row);
- producer.send(producerRecord);
- }
- }
-
- @Test
- public void testKafka() throws IOException, InterruptedException {
- testKafkaLatestToConsole();
- testKafkaEarliestToConsole();
- testKafkaSpecificOffsetsToConsole();
- testKafkaGroupOffsetsToConsole();
- testKafkaTimestampToConsole();
- }
-
- public void testKafkaLatestToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_latest_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaEarliestToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_earliest_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaSpecificOffsetsToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaGroupOffsetsToConsole() throws IOException,
InterruptedException {
- generateStepTestData(100, 150);
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_group_offset_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaTimestampToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_timestamp_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
-}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
deleted file mode 100644
index 0c339cccc..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.flink.v2.kafka;
-
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class KafkaTestBaseIT extends FlinkContainer {
- protected static final int KAFKA_PORT = 9093;
-
- protected static final String KAFKA_HOST = "kafkaCluster";
-
- protected KafkaProducer<byte[], byte[]> producer;
-
- protected KafkaContainer kafkaContainer;
-
- @BeforeEach
- public void startKafkaContainer() {
- kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
- .withNetwork(NETWORK)
- .withNetworkAliases(KAFKA_HOST)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
- kafkaContainer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
- Startables.deepStart(Stream.of(kafkaContainer)).join();
- log.info("Kafka container started");
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(() -> initKafkaProducer());
- generateTestData();
- }
-
- protected void initKafkaProducer() {
- Properties props = new Properties();
- String bootstrapServers = kafkaContainer.getBootstrapServers();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- producer = new KafkaProducer<>(props);
- }
-
- @SuppressWarnings("checkstyle:Indentation")
- protected void generateTestData() {
-
- }
-
- @AfterEach
- public void close() {
- if (producer != null) {
- producer.close();
- }
- if (kafkaContainer != null) {
- kafkaContainer.close();
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
deleted file mode 100644
index 62a1dc967..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++ /dev/null
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
- result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
- schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
- # The default format is json, which is optional
- format = json
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
-}
-
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
-
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
deleted file mode 100644
index c1b3c0d47..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
+++ /dev/null
@@ -1,92 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set flink configuration here
- execution.parallelism = 1
- #execution.checkpoint.interval = 10000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9093"
- topic = "test_topic"
- result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
- schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
- format = text
- # The default field delimiter is ","
- field_delimiter = ","
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
-}
-
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
-
- }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 6d356eb54..adc3f3610 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -36,7 +36,6 @@
<module>connector-mongodb-flink-e2e</module>
<module>connector-iceberg-flink-e2e</module>
<module>connector-neo4j-flink-e2e</module>
- <module>connector-kafka-flink-e2e</module>
</modules>
<dependencies>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
deleted file mode 100644
index 5f92a2599..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/pom.xml
+++ /dev/null
@@ -1,53 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>connector-kafka-spark-e2e</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-spark-e2e-base</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <!-- SeaTunnel connectors -->
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-kafka</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
deleted file mode 100644
index de6abf829..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.Collections;
-/**
- * This test case is used to verify that the kafka source is able to send data
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
- */
-@SuppressWarnings("checkstyle:EmptyLineSeparator")
-@Slf4j
-public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT {
-
- @SuppressWarnings("checkstyle:Indentation")
- protected void generateTestData() {
-
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date",
- "c_timestamp"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE,
BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE
- }
- );
-
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
-
- for (int i = 0; i < 100; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i),
- Collections.singletonMap("key",
Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- LocalDateTime.now()
- });
- ProducerRecord<byte[], byte[]> producerRecord =
serializer.serializeRow(row);
- producer.send(producerRecord);
- }
- }
-
- @Test
- public void testKafkaSource() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_json_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
deleted file mode 100644
index 3ff1faff3..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-
-@Slf4j
-public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
- @Override
- protected void generateTestData() {
- generateStepTestData(0, 100);
- }
-
- private void generateStepTestData(int start, int end) {
-
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE
- }
- );
-
- DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
- for (int i = start; i < end; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i)
- });
- ProducerRecord<byte[], byte[]> producerRecord =
serializer.serializeRow(row);
- producer.send(producerRecord);
- }
- }
-
- @Test
- public void testKafka() throws IOException, InterruptedException {
- testKafkaLatestToConsole();
- testKafkaEarliestToConsole();
- testKafkaSpecificOffsetsToConsole();
- testKafkaGroupOffsetsToConsole();
- testKafkaTimestampToConsole();
- }
-
- public void testKafkaLatestToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_latest_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaEarliestToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_earliest_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaSpecificOffsetsToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaGroupOffsetsToConsole() throws IOException,
InterruptedException {
- generateStepTestData(100, 150);
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_group_offset_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
- public void testKafkaTimestampToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_timestamp_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
deleted file mode 100644
index f4575f4e8..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.format.text.TextSerializationSchema;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.util.Collections;
-
-/**
- * This test case is used to verify that the kafka source is able to send data
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
- */
-@Slf4j
-public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT {
-
- @SuppressWarnings("checkstyle:Indentation")
- protected void generateTestData() {
-
- SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
- new String[]{
- "id",
- "c_map",
- "c_array",
- "c_string",
- "c_boolean",
- "c_tinyint",
- "c_smallint",
- "c_int",
- "c_bigint",
- "c_float",
- "c_double",
- "c_decimal",
- "c_bytes",
- "c_date",
- "c_timestamp"
- },
- new SeaTunnelDataType[]{
- BasicType.LONG_TYPE,
- new MapType(BasicType.STRING_TYPE,
BasicType.SHORT_TYPE),
- ArrayType.BYTE_ARRAY_TYPE,
- BasicType.STRING_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- new DecimalType(2, 1),
- PrimitiveByteArrayType.INSTANCE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_DATE_TIME_TYPE
- });
-
- TextSerializationSchema serializationSchema =
TextSerializationSchema.builder()
- .seaTunnelRowType(seatunnelRowType)
- .delimiter(",")
- .build();
-
- for (int i = 0; i < 100; i++) {
- SeaTunnelRow row = new SeaTunnelRow(
- new Object[]{
- Long.valueOf(i),
- Collections.singletonMap("key",
Short.parseShort("1")),
- new Byte[]{Byte.parseByte("1")},
- "string",
- Boolean.FALSE,
- Byte.parseByte("1"),
- Short.parseShort("1"),
- Integer.parseInt("1"),
- Long.parseLong("1"),
- Float.parseFloat("1.1"),
- Double.parseDouble("1.1"),
- BigDecimal.valueOf(11, 1),
- "test".getBytes(),
- LocalDate.now(),
- LocalDateTime.now()
- });
-
- ProducerRecord<byte[], byte[]> producerRecord = new
ProducerRecord<>("test_topic", null, serializationSchema.serialize(row));
- producer.send(producerRecord);
- }
- }
-
- @Test
- public void testKafkaSourceTextToConsole() throws IOException,
InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_text_to_console.conf");
- Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- }
-
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
deleted file mode 100644
index b6d5354a9..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.kafka;
-
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-
-@Slf4j
-public class KafkaTestBaseIT extends SparkContainer {
- protected static final int KAFKA_PORT = 9094;
-
- protected static final String KAFKA_HOST = "kafkaCluster";
-
- protected KafkaProducer<byte[], byte[]> producer;
-
- protected KafkaContainer kafkaContainer;
-
- @BeforeEach
- public void startKafkaContainer() {
- kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
- .withNetwork(NETWORK)
- .withNetworkAliases(KAFKA_HOST)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
- kafkaContainer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
- Startables.deepStart(Stream.of(kafkaContainer)).join();
- log.info("Kafka container started");
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(() -> initKafkaProducer());
- generateTestData();
- }
-
- protected void initKafkaProducer() {
- Properties props = new Properties();
- String bootstrapServers = kafkaContainer.getBootstrapServers();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- producer = new KafkaProducer<>(props);
- }
-
- @SuppressWarnings("checkstyle:Indentation")
- protected void generateTestData() {
-
- }
-
- @AfterEach
- public void close() {
- if (producer != null) {
- producer.close();
- }
- if (kafkaContainer != null) {
- kafkaContainer.close();
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
deleted file mode 100644
index c1674e192..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
- result_table_name = "kafka_table"
- # The default format is json, which is optional
- format = json
- start_mode = earliest
- schema = {
- fields {
- id = bigint
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink {
- Console {}
-
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
-
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
- }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
deleted file mode 100644
index 336d364c0..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
- result_table_name = "kafka_table"
- # The default format is json, which is optional
- format = json
- start_mode = group_offsets
- schema = {
- fields {
- id = bigint
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink {
- Console {}
-
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
-
- {
- rule_type = MIN
- rule_value = 100
- },
- {
- rule_type = MAX
- rule_value = 149
- }
- ]
- }
- ]
- }
- }
- }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
deleted file mode 100644
index 0c20ebc9a..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
- result_table_name = "kafka_table"
- # The default format is json, which is optional
- format = json
- start_mode = latest
- schema = {
- fields {
- id = bigint
- }
- }
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink {
- Console {}
-
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
-
- {
- rule_type = MIN
- rule_value = 99
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
- }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
deleted file mode 100644
index 8f6f00a68..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set spark configuration here
- job.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
- result_table_name = "kafka_table"
- # The default format is json, which is optional
- format = json
- start_mode = specific_offsets
- schema = {
- fields {
- id = bigint
- }
- }
-
- start_mode.offsets = {
- test_topic-0 = 50
- }
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink {
- Console {}
-
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
-
- {
- rule_type = MIN
- rule_value = 50
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
- }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
deleted file mode 100644
index e7ec35c7d..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- # You can set spark configuration here
- job.app.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
-}
-
-source {
- Kafka {
- bootstrap.servers = "kafkaCluster:9094"
- topic = "test_topic"
- result_table_name = "kafka_table"
- # The default format is json, which is optional
- format = json
- start_mode = timestamp
- schema = {
- fields {
- id = bigint
- }
- }
- start_mode.timestamp = 1667179890315
- }
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
- }
-
-sink {
- Console {}
-
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
-
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 149
- }
- ]
- }
- ]
- }
- }
- }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index c0289e789..7f882095b 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -35,7 +35,6 @@
<module>connector-jdbc-spark-e2e</module>
<module>connector-mongodb-spark-e2e</module>
<module>connector-neo4j-spark-e2e</module>
- <module>connector-kafka-spark-e2e</module>
<module>connector-iceberg-spark-e2e</module>
</modules>