This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e3f196e2 [Improve] [Connector-V2] Fix Kafka sink can't run
EXACTLY_ONCE semantics (#3724)
5e3f196e2 is described below
commit 5e3f196e2962d6db43ca5029ffcb97e72ebe7ca0
Author: Hisoka <[email protected]>
AuthorDate: Fri Dec 16 12:24:08 2022 +0800
[Improve] [Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics
(#3724)
* [Improve] [Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics
---
.../file/sink/commit/FileSinkAggregatedCommitter.java | 2 +-
.../seatunnel/kafka/sink/KafkaNoTransactionSender.java | 4 +---
.../connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java | 10 ++++++++++
.../connectors/seatunnel/kafka/sink/KafkaSinkWriter.java | 6 +++---
.../seatunnel/kafka/sink/KafkaTransactionSender.java | 7 +++----
.../seatunnel/kafka/source/KafkaConsumerThread.java | 2 +-
.../seatunnel/kafka/state/KafkaAggregatedCommitInfo.java | 11 +++++++----
.../checkpoint/operation/CheckpointFinishedOperation.java | 8 ++++++--
8 files changed, 32 insertions(+), 18 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 3dbe562e1..04c79a835 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -53,7 +53,7 @@ public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<File
FileSystemUtils.deleteFile(entry.getKey());
}
} catch (Exception e) {
- log.error("commit aggregatedCommitInfo error ", e);
+ log.error("commit aggregatedCommitInfo error,
aggregatedCommitInfo = {} ", aggregatedCommitInfo, e);
errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
}
});
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index 59a16ef0a..854c5d848 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -76,8 +76,6 @@ public class KafkaNoTransactionSender<K, V> implements
KafkaProduceSender<K, V>
@Override
public void close() {
kafkaProducer.flush();
- try (KafkaProducer<?, ?> closedKafkaProducer = kafkaProducer) {
- // close the producer
- }
+ kafkaProducer.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 8e6740e03..5cdcd0b5a 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -54,6 +55,10 @@ public class KafkaSinkCommitter implements
SinkCommitter<KafkaCommitInfo> {
producer.commitTransaction();
producer.flush();
}
+ if (this.kafkaProducer != null) {
+ kafkaProducer.close();
+ kafkaProducer = null;
+ }
return commitInfos;
}
@@ -66,6 +71,10 @@ public class KafkaSinkCommitter implements
SinkCommitter<KafkaCommitInfo> {
KafkaProducer<?, ?> producer = getProducer(commitInfo);
producer.abortTransaction();
}
+ if (this.kafkaProducer != null) {
+ kafkaProducer.close();
+ kafkaProducer = null;
+ }
}
private KafkaInternalProducer<?, ?> getProducer(KafkaCommitInfo
commitInfo) {
@@ -73,6 +82,7 @@ public class KafkaSinkCommitter implements
SinkCommitter<KafkaCommitInfo> {
this.kafkaProducer.setTransactionalId(commitInfo.getTransactionId());
} else {
Properties kafkaProperties = commitInfo.getKafkaProperties();
+ kafkaProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
"sink-committer-" + this.hashCode());
kafkaProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
commitInfo.getTransactionId());
kafkaProducer =
new
KafkaInternalProducer<>(commitInfo.getKafkaProperties(),
commitInfo.getTransactionId());
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 41b1ae3aa..4cbdf10d5 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -127,11 +127,11 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
@Override
public void close() {
- try (KafkaProduceSender<?, ?> kafkaProduceSender =
kafkaProducerSender) {
- // no-opt
+ try {
+ kafkaProducerSender.close();
} catch (Exception e) {
throw new
KafkaConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED,
- "Close kafka sink writer error", e);
+ "Close kafka sink writer error", e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index a3eaba00b..2ce8b9a46 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -107,9 +106,9 @@ public class KafkaTransactionSender<K, V> implements
KafkaProduceSender<K, V> {
@Override
public void close() {
- kafkaProducer.flush();
- try (KafkaProducer<?, ?> closedProducer = kafkaProducer) {
- // no-op
+ if (kafkaProducer != null) {
+ kafkaProducer.flush();
+ kafkaProducer.close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
index b118c5674..5d2c40979 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -68,7 +68,7 @@ public class KafkaConsumerThread implements Runnable {
properties.forEach((key, value) ->
props.setProperty(String.valueOf(key), String.valueOf(value)));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-consumer-" + this.hashCode());
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-consumer-" + this.hashCode());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
index c55c3efd1..9ae5e88ba 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
@@ -17,11 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.state;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
import java.io.Serializable;
+import java.util.List;
-/**
- * Right now, we don't need aggregated commit in kafka.
- * Todo: we need to add a default implementation of this state.
- */
+@Data
+@AllArgsConstructor
public class KafkaAggregatedCommitInfo implements Serializable {
+ List<KafkaCommitInfo> commitInfos;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index ff5c92300..7eef0f7e8 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import
org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
@@ -77,13 +78,16 @@ public class CheckpointFinishedOperation extends
TaskOperation {
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
try {
- Task task =
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
- .getTaskGroup().getTask(taskLocation.getTaskID());
+ TaskGroupContext groupContext =
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation());
+ Task task =
groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+
Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
if (successful) {
task.notifyCheckpointComplete(checkpointId);
} else {
task.notifyCheckpointAborted(checkpointId);
}
+ Thread.currentThread().setContextClassLoader(classLoader);
} catch (Exception e) {
sneakyThrow(e);
}