This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e3f196e2 [Improve] [Connector-V2] Fix Kafka sink can't run 
EXACTLY_ONCE semantics (#3724)
5e3f196e2 is described below

commit 5e3f196e2962d6db43ca5029ffcb97e72ebe7ca0
Author: Hisoka <[email protected]>
AuthorDate: Fri Dec 16 12:24:08 2022 +0800

    [Improve] [Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics 
(#3724)
    
    * [Improve] [Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics
---
 .../file/sink/commit/FileSinkAggregatedCommitter.java         |  2 +-
 .../seatunnel/kafka/sink/KafkaNoTransactionSender.java        |  4 +---
 .../connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java   | 10 ++++++++++
 .../connectors/seatunnel/kafka/sink/KafkaSinkWriter.java      |  6 +++---
 .../seatunnel/kafka/sink/KafkaTransactionSender.java          |  7 +++----
 .../seatunnel/kafka/source/KafkaConsumerThread.java           |  2 +-
 .../seatunnel/kafka/state/KafkaAggregatedCommitInfo.java      | 11 +++++++----
 .../checkpoint/operation/CheckpointFinishedOperation.java     |  8 ++++++--
 8 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 3dbe562e1..04c79a835 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -53,7 +53,7 @@ public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<File
                     FileSystemUtils.deleteFile(entry.getKey());
                 }
             } catch (Exception e) {
-                log.error("commit aggregatedCommitInfo error ", e);
+                log.error("commit aggregatedCommitInfo error, 
aggregatedCommitInfo = {} ", aggregatedCommitInfo, e);
                 errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
             }
         });
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index 59a16ef0a..854c5d848 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -76,8 +76,6 @@ public class KafkaNoTransactionSender<K, V> implements 
KafkaProduceSender<K, V>
     @Override
     public void close() {
         kafkaProducer.flush();
-        try (KafkaProducer<?, ?> closedKafkaProducer = kafkaProducer) {
-            // close the producer
-        }
+        kafkaProducer.close();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 8e6740e03..5cdcd0b5a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
@@ -54,6 +55,10 @@ public class KafkaSinkCommitter implements 
SinkCommitter<KafkaCommitInfo> {
             producer.commitTransaction();
             producer.flush();
         }
+        if (this.kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
         return commitInfos;
     }
 
@@ -66,6 +71,10 @@ public class KafkaSinkCommitter implements 
SinkCommitter<KafkaCommitInfo> {
             KafkaProducer<?, ?> producer = getProducer(commitInfo);
             producer.abortTransaction();
         }
+        if (this.kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer = null;
+        }
     }
 
     private KafkaInternalProducer<?, ?> getProducer(KafkaCommitInfo 
commitInfo) {
@@ -73,6 +82,7 @@ public class KafkaSinkCommitter implements 
SinkCommitter<KafkaCommitInfo> {
             
this.kafkaProducer.setTransactionalId(commitInfo.getTransactionId());
         } else {
             Properties kafkaProperties = commitInfo.getKafkaProperties();
+            kafkaProperties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
"sink-committer-" + this.hashCode());
             
kafkaProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
commitInfo.getTransactionId());
             kafkaProducer =
                     new 
KafkaInternalProducer<>(commitInfo.getKafkaProperties(), 
commitInfo.getTransactionId());
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 41b1ae3aa..4cbdf10d5 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -127,11 +127,11 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     @Override
     public void close() {
-        try (KafkaProduceSender<?, ?> kafkaProduceSender = 
kafkaProducerSender) {
-            // no-opt
+        try {
+            kafkaProducerSender.close();
         } catch (Exception e) {
             throw new 
KafkaConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED,
-                    "Close kafka sink writer error", e);
+                "Close kafka sink writer error", e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index a3eaba00b..2ce8b9a46 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -24,7 +24,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
@@ -107,9 +106,9 @@ public class KafkaTransactionSender<K, V> implements 
KafkaProduceSender<K, V> {
 
     @Override
     public void close() {
-        kafkaProducer.flush();
-        try (KafkaProducer<?, ?> closedProducer = kafkaProducer) {
-            // no-op
+        if (kafkaProducer != null) {
+            kafkaProducer.flush();
+            kafkaProducer.close();
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
index b118c5674..5d2c40979 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -68,7 +68,7 @@ public class KafkaConsumerThread implements Runnable {
         properties.forEach((key, value) -> 
props.setProperty(String.valueOf(key), String.valueOf(value)));
         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-consumer-" + this.hashCode());
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-consumer-" + this.hashCode());
 
         props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
             ByteArrayDeserializer.class.getName());
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
index c55c3efd1..9ae5e88ba 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaAggregatedCommitInfo.java
@@ -17,11 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.state;
 
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
 import java.io.Serializable;
+import java.util.List;
 
-/**
- * Right now, we don't need aggregated commit in kafka.
- * Todo: we need to add a default implementation of this state.
- */
+@Data
+@AllArgsConstructor
 public class KafkaAggregatedCommitInfo implements Serializable {
+    List<KafkaCommitInfo> commitInfos;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index ff5c92300..7eef0f7e8 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupContext;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import 
org.apache.seatunnel.engine.server.serializable.CheckpointDataSerializerHook;
 import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
@@ -77,13 +78,16 @@ public class CheckpointFinishedOperation extends 
TaskOperation {
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
             try {
-                Task task = 
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
-                    .getTaskGroup().getTask(taskLocation.getTaskID());
+                TaskGroupContext groupContext = 
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation());
+                Task task = 
groupContext.getTaskGroup().getTask(taskLocation.getTaskID());
+                ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+                
Thread.currentThread().setContextClassLoader(groupContext.getClassLoader());
                 if (successful) {
                     task.notifyCheckpointComplete(checkpointId);
                 } else {
                     task.notifyCheckpointAborted(checkpointId);
                 }
+                Thread.currentThread().setContextClassLoader(classLoader);
             } catch (Exception e) {
                 sneakyThrow(e);
             }

Reply via email to