This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 694bbdd7 [Api-draft] Add seatunnel kafka source connectors (#1949)
694bbdd7 is described below
commit 694bbdd709110e647628378a44bdf3347754dfe9
Author: TrickyZerg <[email protected]>
AuthorDate: Thu May 26 16:04:51 2022 +0800
[Api-draft] Add seatunnel kafka source connectors (#1949)
* add seatunnel kafka connector
* fix ParallelSource can't stop when thread pool have error.
---
.../api/source/SourceSplitEnumerator.java | 2 +-
.../apache/seatunnel/flink/FlinkEnvironment.java | 13 ++-
.../connectors/seatunnel/kafka/config/Config.java | 10 ++
.../kafka/sink/KafkaNoTransactionSender.java | 6 +-
.../seatunnel/kafka/sink/KafkaProduceSender.java | 6 +-
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 10 +-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 8 +-
.../kafka/sink/KafkaTransactionSender.java | 10 +-
.../seatunnel/kafka/source/ConsumerMetadata.java | 73 ++++++++++++
.../seatunnel/kafka/source/KafkaSource.java | 57 +++++++---
.../seatunnel/kafka/source/KafkaSourceReader.java | 117 +++++++++++++++++--
.../seatunnel/kafka/source/KafkaSourceSplit.java | 46 +++++++-
.../kafka/source/KafkaSourceSplitEnumerator.java | 126 +++++++++++++++------
.../state/{KafkaState.java => KafkaSinkState.java} | 2 +-
.../{KafkaState.java => KafkaSourceState.java} | 23 ++--
.../core/starter/flink/env/FlinkEnvironment.java | 13 ++-
.../flink/execution/FlinkTaskExecution.java | 27 +----
.../spark/execution/SparkTaskExecution.java | 21 +---
.../translation/source/ParallelSource.java | 22 +++-
.../translation/spark/sink/SparkDataWriter.java | 2 +-
20 files changed, 442 insertions(+), 152 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 4b97cee6..3b1917bd 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -37,7 +37,7 @@ public interface SourceSplitEnumerator<SplitT extends
SourceSplit, StateT> exten
/**
* The method is executed by the engine only once.
*/
- void run();
+ void run() throws Exception;
/**
* Called to close the enumerator, in case it holds on to any resources,
like threads or network
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index a2d57234..eaf6780d 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.flink.util.EnvironmentUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
@@ -87,10 +88,10 @@ public class FlinkEnvironment implements RuntimeEnv {
@Override
public FlinkEnvironment prepare() {
- if (isStreaming()) {
- createStreamEnvironment();
- createStreamTableEnvironment();
- } else {
+ // Batch/Streaming both use data stream api in SeaTunnel New API
+ createStreamEnvironment();
+ createStreamTableEnvironment();
+ if (!isStreaming()) {
createExecutionEnvironment();
createBatchTableEnvironment();
}
@@ -201,6 +202,10 @@ public class FlinkEnvironment implements RuntimeEnv {
int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
environment.setMaxParallelism(max);
}
+
+ if (this.jobMode.equals(JobMode.BATCH)) {
+ environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ }
}
public ExecutionEnvironment getBatchEnvironment() {
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 1eebcdb5..923348f4 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -23,10 +23,20 @@ public class Config {
*/
public static final String TOPIC = "topic";
+ /**
+ * The topic of kafka is java pattern or list.
+ */
+ public static final String PATTERN = "pattern";
+
/**
* The server address of kafka cluster.
*/
public static final String BOOTSTRAP_SERVER = "bootstrap.server";
public static final String KAFKA_CONFIG_PREFIX = "kafka.";
+
+ /**
+ * consumer group of kafka client consume message.
+ */
+ public static final String CONSUMER_GROUP = "consumer.group";
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index 3691acd9..4760830f 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -63,12 +63,12 @@ public class KafkaNoTransactionSender<K, V> implements
KafkaProduceSender<K, V>
}
@Override
- public void abortTransaction(List<KafkaState> kafkaStates) {
+ public void abortTransaction(List<KafkaSinkState> kafkaStates) {
// no-op
}
@Override
- public List<KafkaState> snapshotState() {
+ public List<KafkaSinkState> snapshotState() {
kafkaProducer.flush();
return Collections.emptyList();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
index 9836cb9e..f1d5ce51 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -52,13 +52,13 @@ public interface KafkaProduceSender<K, V> extends
AutoCloseable {
*
* @param kafkaStates kafka states about the transaction info.
*/
- void abortTransaction(List<KafkaState> kafkaStates);
+ void abortTransaction(List<KafkaSinkState> kafkaStates);
/**
* Get the current kafka state of the sender.
*
* @return kafka state List, or empty if no state is available.
*/
- List<KafkaState> snapshotState();
+ List<KafkaSinkState> snapshotState();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 5e63d79c..70cfacdb 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -27,7 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -42,7 +42,7 @@ import java.util.Optional;
* This class contains the method to create {@link KafkaSinkWriter} and {@link
KafkaSinkCommitter}.
*/
@AutoService(SeaTunnelSink.class)
-public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaState,
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
+public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, KafkaSinkState,
KafkaCommitInfo, KafkaAggregatedCommitInfo> {
private Config pluginConfig;
private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
@@ -58,17 +58,17 @@ public class KafkaSink implements
SeaTunnelSink<SeaTunnelRow, KafkaState, KafkaC
}
@Override
- public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState>
createWriter(SinkWriter.Context context) {
+ public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState>
createWriter(SinkWriter.Context context) {
return new KafkaSinkWriter(context, seaTunnelRowTypeInfo,
pluginConfig, Collections.emptyList());
}
@Override
- public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaState>
restoreWriter(SinkWriter.Context context, List<KafkaState> states) {
+ public SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState>
restoreWriter(SinkWriter.Context context, List<KafkaSinkState> states) {
return new KafkaSinkWriter(context, seaTunnelRowTypeInfo,
pluginConfig, states);
}
@Override
- public Optional<Serializer<KafkaState>> getWriterStateSerializer() {
+ public Optional<Serializer<KafkaSinkState>> getWriterStateSerializer() {
return Optional.of(new DefaultSerializer<>());
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6d3a122c..6605f5bc 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -25,7 +25,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -40,7 +40,7 @@ import java.util.Properties;
/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to
Kafka.
*/
-public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow,
KafkaCommitInfo, KafkaState> {
+public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow,
KafkaCommitInfo, KafkaSinkState> {
private final SinkWriter.Context context;
private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
@@ -61,7 +61,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
SinkWriter.Context context,
SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
Config pluginConfig,
- List<KafkaState> kafkaStates) {
+ List<KafkaSinkState> kafkaStates) {
this.context = context;
this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
this.pluginConfig = pluginConfig;
@@ -77,7 +77,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
@Override
- public List<KafkaState> snapshotState() {
+ public List<KafkaSinkState> snapshotState() {
return kafkaProducerSender.snapshotState();
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index 766313c4..cf7da852 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import com.google.common.collect.Lists;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -73,11 +73,11 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
}
@Override
- public void abortTransaction(List<KafkaState> kafkaStates) {
+ public void abortTransaction(List<KafkaSinkState> kafkaStates) {
if (kafkaStates.isEmpty()) {
return;
}
- for (KafkaState kafkaState : kafkaStates) {
+ for (KafkaSinkState kafkaState : kafkaStates) {
// create the transaction producer
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Abort kafka transaction: {}",
kafkaState.getTransactionId());
@@ -89,8 +89,8 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
}
@Override
- public List<KafkaState> snapshotState() {
- return Lists.newArrayList(new KafkaState(transactionId,
kafkaProperties));
+ public List<KafkaSinkState> snapshotState() {
+ return Lists.newArrayList(new KafkaSinkState(transactionId,
kafkaProperties));
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
new file mode 100644
index 00000000..f52b6146
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Kafka consumer metadata, include topic, bootstrap server etc.
+ */
+public class ConsumerMetadata implements Serializable {
+
+ private String topic;
+ private boolean isPattern = false;
+ private String bootstrapServer;
+ private Properties properties;
+ private String consumerGroup;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public boolean isPattern() {
+ return isPattern;
+ }
+
+ public void setPattern(boolean pattern) {
+ isPattern = pattern;
+ }
+
+ public String getBootstrapServer() {
+ return bootstrapServer;
+ }
+
+ public void setBootstrapServer(String bootstrapServer) {
+ this.bootstrapServer = bootstrapServer;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 82db59e0..5d3f98b9 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVER;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import org.apache.seatunnel.api.common.PrepareFailException;
@@ -32,19 +34,23 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import com.google.auto.service.AutoService;
+
import java.util.Properties;
-public class KafkaSource implements SeaTunnelSource<SeaTunnelRow,
KafkaSourceSplit, KafkaState> {
+@AutoService(SeaTunnelSource.class)
+public class KafkaSource implements SeaTunnelSource<SeaTunnelRow,
KafkaSourceSplit, KafkaSourceState> {
+ private static final String DEFAULT_CONSUMER_GROUP =
"SeaTunnel-Consumer-Group";
- private String topic;
- private String bootstrapServer;
- private Properties properties;
+ private final ConsumerMetadata metadata = new ConsumerMetadata();
+ private SeaTunnelRowTypeInfo typeInfo;
@Override
public String getPluginName() {
@@ -57,36 +63,51 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
}
- this.topic = config.getString(TOPIC);
- this.bootstrapServer = config.getString(BOOTSTRAP_SERVER);
- // TODO add user custom properties
- this.properties = new Properties();
+ this.metadata.setTopic(config.getString(TOPIC));
+ if (config.hasPath(PATTERN)) {
+ this.metadata.setPattern(config.getBoolean(PATTERN));
+ }
+ this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
+ this.metadata.setProperties(new Properties());
+
+ if (config.hasPath(CONSUMER_GROUP)) {
+ this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP));
+ } else {
+ this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+ }
+
+ TypesafeConfigUtils.extractSubConfig(config, "kafka.",
false).entrySet().forEach(e -> {
+ this.metadata.getProperties().put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
+ });
+
+ // TODO support user custom row type
+ this.typeInfo = new SeaTunnelRowTypeInfo(new String[]{"topic",
"raw_message"},
+ new SeaTunnelDataType[]{BasicType.STRING, BasicType.STRING});
+
}
@Override
public SeaTunnelRowTypeInfo getRowTypeInfo() {
- return new SeaTunnelRowTypeInfo(new String[]{"topic", "raw_message"},
- new SeaTunnelDataType[]{BasicType.STRING, BasicType.STRING});
+ return this.typeInfo;
}
@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit>
createReader(SourceReader.Context readerContext) throws Exception {
- return new KafkaSourceReader(this.topic, this.bootstrapServer);
+ return new KafkaSourceReader(this.metadata, this.typeInfo,
readerContext);
}
@Override
- public SourceSplitEnumerator<KafkaSourceSplit, KafkaState>
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext) throws Exception {
- return new KafkaSourceSplitEnumerator(this.topic,
this.bootstrapServer, this.properties);
+ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext) throws Exception {
+ return new KafkaSourceSplitEnumerator(this.metadata,
enumeratorContext);
}
@Override
- public SourceSplitEnumerator<KafkaSourceSplit, KafkaState>
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext, KafkaState checkpointState) throws Exception {
- // TODO support state restore
- return new KafkaSourceSplitEnumerator(this.topic,
this.bootstrapServer, this.properties);
+ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState>
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit>
enumeratorContext, KafkaSourceState checkpointState) throws Exception {
+ return new KafkaSourceSplitEnumerator(this.metadata,
enumeratorContext, checkpointState);
}
@Override
- public Serializer<KafkaState> getEnumeratorStateSerializer() {
+ public Serializer<KafkaSourceState> getEnumeratorStateSerializer() {
return new DefaultSerializer<>();
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index cb4bc889..3158858c 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -17,55 +17,148 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+
+import com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
public class KafkaSourceReader implements SourceReader<SeaTunnelRow,
KafkaSourceSplit> {
- private final String topic;
- private final String bootstrapServer;
-
- KafkaSourceReader(String topic, String bootstrapServer) {
- this.topic = topic;
- this.bootstrapServer = bootstrapServer;
+ private static final long THREAD_WAIT_TIME = 500L;
+ private static final long POLL_TIMEOUT = 10000L;
+ private static final String CLIENT_ID_PREFIX = "seatunnel";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSourceReader.class);
+
+ private final SourceReader.Context context;
+ private KafkaConsumer<byte[], byte[]> consumer;
+ private final ConsumerMetadata metadata;
+ private final Set<KafkaSourceSplit> sourceSplits;
+ private final Map<TopicPartition, Long> endOffset;
+ // TODO support user custom type
+ private SeaTunnelRowTypeInfo typeInfo;
+
+ KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowTypeInfo typeInfo,
+ SourceReader.Context context) {
+ this.metadata = metadata;
+ this.context = context;
+ this.typeInfo = typeInfo;
+ this.sourceSplits = new HashSet<>();
+ this.endOffset = new HashMap<>();
}
@Override
public void open() {
-
+ this.consumer = initConsumer(this.metadata.getBootstrapServer(),
this.metadata.getConsumerGroup(),
+ this.metadata.getProperties());
}
@Override
public void close() throws IOException {
-
+ if (consumer != null) {
+ consumer.close();
+ }
}
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-
+ if (sourceSplits.isEmpty() || sourceSplits.size() !=
this.endOffset.size()) {
+ Thread.sleep(THREAD_WAIT_TIME);
+ return;
+ }
+ Set<TopicPartition> partitions = convertToPartition(sourceSplits);
+ StringDeserializer stringDeserializer = new StringDeserializer();
+
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
false);
+ consumer.assign(partitions);
+ while (true) {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+ for (TopicPartition partition : partitions) {
+ for (ConsumerRecord<byte[], byte[]> record :
records.records(partition)) {
+
+ String v =
stringDeserializer.deserialize(partition.topic(), record.value());
+ String t = partition.topic();
+ output.collect(new SeaTunnelRow(new Object[]{t, v}));
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
+ record.offset() >= endOffset.get(partition)) {
+ break;
+ }
+ }
+ }
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the
data.
+ context.signalNoMoreElement();
+ break;
+ }
+ }
}
@Override
public List<KafkaSourceSplit> snapshotState(long checkpointId) throws
Exception {
- return null;
+ return new ArrayList<>(sourceSplits);
}
@Override
public void addSplits(List<KafkaSourceSplit> splits) {
-
+ sourceSplits.addAll(splits);
+ sourceSplits.forEach(partition -> {
+ endOffset.put(partition.getTopicPartition(),
partition.getEndOffset());
+ });
}
@Override
public void handleNoMoreSplits() {
-
+ LOGGER.info("receive no more splits message, this reader will not add
new split.");
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // TODO commit offset
+ }
+ private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer,
String consumerGroup,
+ Properties properties) {
+ Properties props = new Properties(properties);
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-consumer");
+
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+
+ // Disable auto create topics feature
+ props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
+ return new KafkaConsumer<>(props);
}
+
+ private Set<TopicPartition>
convertToPartition(Collection<KafkaSourceSplit> sourceSplits) {
+ return
sourceSplits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toSet());
+ }
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index ebaa72ec..be2348e9 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -19,10 +19,54 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+
public class KafkaSourceSplit implements SourceSplit {
+ private TopicPartition topicPartition;
+ private long endOffset = -1L;
+
+ public KafkaSourceSplit(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ public void setTopicPartition(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ }
+
@Override
public String splitId() {
- return null;
+ return topicPartition.topic() + "-" + topicPartition.partition();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaSourceSplit that = (KafkaSourceSplit) o;
+ return Objects.equals(topicPartition, that.topicPartition);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicPartition, endOffset);
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 663be8f1..fe652d25 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -18,97 +18,151 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaState;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
-public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSourceSplit, KafkaState> {
+public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> {
private static final String CLIENT_ID_PREFIX = "seatunnel";
- private final String topic;
- private final String bootstrapServer;
- private KafkaConsumer<byte[], byte[]> consumer;
+ private final ConsumerMetadata metadata;
+ private final Context<KafkaSourceSplit> context;
private AdminClient adminClient;
- KafkaSourceSplitEnumerator(String topic, String bootstrapServer,
Properties properties) {
- this.topic = topic;
- this.bootstrapServer = bootstrapServer;
+ private Set<KafkaSourceSplit> pendingSplit;
+ private Set<KafkaSourceSplit> assignedSplit;
+
+ KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context) {
+ this.metadata = metadata;
+ this.context = context;
+ }
+
+ KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context,
+ KafkaSourceState sourceState) {
+ this(metadata, context);
+ this.assignedSplit = sourceState.getAssignedSplit();
}
@Override
public void open() {
- this.consumer = initConsumer();
- this.adminClient = initAdminClient();
+ this.adminClient = initAdminClient(this.metadata.getProperties());
+ this.assignedSplit = new HashSet<>();
+ this.pendingSplit = new HashSet<>();
}
@Override
- public void run() {
-
+ public void run() throws ExecutionException, InterruptedException {
+ pendingSplit = getTopicInfo();
+ assignSplit(context.registeredReaders());
}
@Override
public void close() throws IOException {
-
+ if (this.adminClient != null) {
+ adminClient.close();
+ }
}
@Override
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
-
+ if (!splits.isEmpty()) {
+ pendingSplit.addAll(splits);
+ assignSplit(Collections.singletonList(subtaskId));
+ }
}
@Override
public int currentUnassignedSplitSize() {
- return 0;
+ return pendingSplit.size();
}
@Override
public void handleSplitRequest(int subtaskId) {
-
+ // Do nothing because Kafka source push split.
}
@Override
public void registerReader(int subtaskId) {
-
+ if (!pendingSplit.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
+ }
}
@Override
- public KafkaState snapshotState(long checkpointId) throws Exception {
- return null;
+ public KafkaSourceState snapshotState(long checkpointId) throws Exception {
+ return new KafkaSourceState(assignedSplit);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // Do nothing
+ }
+ private AdminClient initAdminClient(Properties properties) {
+ Properties props = new Properties(properties);
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.metadata.getBootstrapServer());
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-admin-client");
+ return AdminClient.create(props);
}
- private KafkaConsumer<byte[], byte[]> initConsumer() {
- Properties props = new Properties();
+ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException,
InterruptedException {
+ Collection<String> topics;
+ if (this.metadata.isPattern()) {
+ Pattern pattern = Pattern.compile(this.metadata.getTopic());
+ topics = this.adminClient.listTopics().names().get().stream()
+ .filter(t ->
pattern.matcher(t).matches()).collect(Collectors.toSet());
+ } else {
+ topics = Arrays.asList(this.metadata.getTopic().split(","));
+ }
+ Collection<TopicPartition> partitions =
+
adminClient.describeTopics(topics).allTopicNames().get().values().stream().flatMap(t
-> t.partitions().stream()
+ .map(p -> new TopicPartition(t.name(),
p.partition()))).collect(Collectors.toSet());
+ return
adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p
-> OffsetSpec.latest())))
+ .all().get().entrySet().stream().map(partition -> {
+ KafkaSourceSplit split = new
KafkaSourceSplit(partition.getKey());
+ split.setEndOffset(partition.getValue().offset());
+ return split;
+ }).collect(Collectors.toSet());
+ }
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-consumer");
+ private void assignSplit(Collection<Integer> taskIDList) {
+ Map<Integer, List<KafkaSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
+ for (int taskID : taskIDList) {
+ readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+ }
+ pendingSplit.forEach(s ->
readySplit.get(getSplitOwner(s.getTopicPartition(), taskIDList.size()))
+ .add(s));
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
+ readySplit.forEach(context::assignSplit);
- // Disable auto create topics feature
- props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
- return new KafkaConsumer<>(props);
+ assignedSplit.addAll(pendingSplit);
+ pendingSplit.clear();
}
- private AdminClient initAdminClient() {
- Properties props = new Properties();
-
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-admin-client");
- return AdminClient.create(props);
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private static int getSplitOwner(TopicPartition tp, int numReaders) {
+ int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) %
numReaders;
+ return (startIndex + tp.partition()) % numReaders;
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
similarity index 95%
copy from
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
copy to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
index 7ab11ddf..311d1e29 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
@@ -25,7 +25,7 @@ import java.util.Properties;
@Data
@AllArgsConstructor
-public class KafkaState implements Serializable {
+public class KafkaSinkState implements Serializable {
private final String transactionId;
private final Properties kafkaProperties;
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSourceState.java
similarity index 62%
rename from
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
rename to
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSourceState.java
index 7ab11ddf..e6ba535b 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaState.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSourceState.java
@@ -17,17 +17,24 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.state;
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import java.io.Serializable;
-import java.util.Properties;
+import java.util.Set;
-@Data
-@AllArgsConstructor
-public class KafkaState implements Serializable {
+public class KafkaSourceState implements Serializable {
- private final String transactionId;
- private final Properties kafkaProperties;
+ private Set<KafkaSourceSplit> assignedSplit;
+ public KafkaSourceState(Set<KafkaSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
+
+ public Set<KafkaSourceSplit> getAssignedSplit() {
+ return assignedSplit;
+ }
+
+ public void setAssignedSplit(Set<KafkaSourceSplit> assignedSplit) {
+ this.assignedSplit = assignedSplit;
+ }
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
index 337e6c8d..bd216dfa 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.flink.util.EnvironmentUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
@@ -87,10 +88,10 @@ public class FlinkEnvironment implements RuntimeEnv {
@Override
public FlinkEnvironment prepare() {
- if (isStreaming()) {
- createStreamEnvironment();
- createStreamTableEnvironment();
- } else {
+ // Batch/Streaming both use data stream api in SeaTunnel New API
+ createStreamEnvironment();
+ createStreamTableEnvironment();
+ if (!isStreaming()) {
createExecutionEnvironment();
createBatchTableEnvironment();
}
@@ -201,6 +202,10 @@ public class FlinkEnvironment implements RuntimeEnv {
int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
environment.setMaxParallelism(max);
}
+
+ if (this.jobMode.equals(JobMode.BATCH)) {
+ environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ }
}
public ExecutionEnvironment getBatchEnvironment() {
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
index a2fcd10f..c7e8759b 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
@@ -18,7 +18,8 @@
package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -46,7 +47,8 @@ public class FlinkTaskExecution {
public FlinkTaskExecution(Config config) {
this.config = config;
- this.flinkEnvironment = createFlinkEnvironment();
+ this.flinkEnvironment = (FlinkEnvironment) new
EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
+
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(flinkEnvironment, config.getConfigList("source"));
this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(flinkEnvironment, config.getConfigList("transform"));
this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
@@ -61,25 +63,4 @@ public class FlinkTaskExecution {
LOGGER.info("Flink Execution Plan:{}",
flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
}
-
- private FlinkEnvironment createFlinkEnvironment() {
- // todo: we need to split the new api into a separate module.
- // we override the environment here, since we need to create
StreamExecutionEnvironment.
- FlinkEnvironment flinkEnvironment = new FlinkEnvironment() {
- @Override
- public boolean isStreaming() {
- return true;
- }
- };
- Config envConfig = config.getConfig("env");
- JobMode jobMode = JobMode.STREAMING;
- if (envConfig.hasPath("job.mode")) {
- jobMode = envConfig.getEnum(JobMode.class, "job.mode");
- }
- SeaTunnelContext.getContext().setJobMode(jobMode);
- flinkEnvironment.setConfig(envConfig)
- .setJobMode(jobMode)
- .prepare();
- return flinkEnvironment;
- }
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
index 1e2b5aa4..5ddd0474 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
-import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.spark.SparkEnvironment;
@@ -45,7 +44,8 @@ public class SparkTaskExecution {
public SparkTaskExecution(Config config) {
this.config = config;
- this.sparkEnvironment = getSparkEnvironment(config);
+ this.sparkEnvironment = (SparkEnvironment) new
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
+
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
this.sourcePluginExecuteProcessor = new
SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
this.transformPluginExecuteProcessor = new
TransformExecuteProcessor(sparkEnvironment, config.getConfigList("transform"));
this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
@@ -59,21 +59,4 @@ public class SparkTaskExecution {
LOGGER.info("Spark Execution started");
}
-
- private SparkEnvironment getSparkEnvironment(Config config) {
- SparkEnvironment sparkEnvironment = (SparkEnvironment) new
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
-
- Config envConfig = config.getConfig("env");
- JobMode jobMode = JobMode.STREAMING;
- if (envConfig.hasPath("job.mode")) {
- jobMode = envConfig.getEnum(JobMode.class, "job.mode");
- }
- SeaTunnelContext.getContext().setJobMode(jobMode);
-
- sparkEnvironment.setJobMode(JobMode.STREAMING)
- .setConfig(config)
- .prepare();
-
- return sparkEnvironment;
- }
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index 2e55e84c..83e61f78 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public class ParallelSource<T, SplitT extends SourceSplit, StateT> implements
AutoCloseable, CheckpointListener {
@@ -101,8 +102,18 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
}
public void run(Collector<T> collector) throws Exception {
- executorService.execute(() -> splitEnumerator.run());
+ Future<?> future = executorService.submit(() -> {
+ try {
+ splitEnumerator.run();
+ } catch (Exception e) {
+ throw new RuntimeException("SourceSplitEnumerator run
failed.", e);
+ }
+ });
+
while (running) {
+ if (future.isDone()) {
+ future.get();
+ }
reader.pollNext(collector);
}
}
@@ -117,9 +128,12 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT> implements Au
executorService.shutdown();
}
- try (SourceSplitEnumerator<SplitT, StateT> closed = splitEnumerator;
- SourceReader<T, SplitT> closedReader = reader) {
- // just close the resources
+ if (splitEnumerator != null) {
+ splitEnumerator.close();
+ }
+
+ if (reader != null) {
+ reader.close();
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
index 02272ae0..8aab52db 100644
---
a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java
@@ -44,7 +44,7 @@ public class SparkDataWriter<CommitInfoT, StateT> implements
DataWriter<Internal
private CommitInfoT latestCommitInfoT;
SparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter,
- SinkCommitter<CommitInfoT> sinkCommitter,
+ @Nullable SinkCommitter<CommitInfoT> sinkCommitter,
StructType schema) {
this.sinkWriter = sinkWriter;
this.sinkCommitter = sinkCommitter;