This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3b573798d [Improve][Connector-V2][Kafka]Unified exception for Kafka 
source and sink connector (#3574)
3b573798d is described below

commit 3b573798db97d9aa0b41cb2828dbfc897658ad6f
Author: TaoZex <[email protected]>
AuthorDate: Sat Nov 26 20:40:01 2022 +0800

    [Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink 
connector (#3574)
    
    * [Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink 
connector
    
    * [Improve][Connector-V2][Kafka]Remove redundant exception code
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 11 +++++
 .../kafka/exception/KafkaConnectorErrorCode.java   | 52 ++++++++++++++++++++++
 .../kafka/exception/KafkaConnectorException.java   | 35 +++++++++++++++
 .../kafka/sink/KafkaInternalProducer.java          | 11 +++--
 .../connectors/seatunnel/kafka/sink/KafkaSink.java |  6 ++-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |  9 ++--
 .../kafka/source/KafkaConsumerThread.java          |  5 ++-
 .../seatunnel/kafka/source/KafkaSource.java        |  6 ++-
 .../seatunnel/kafka/source/KafkaSourceReader.java  |  6 ++-
 .../kafka/source/KafkaSourceSplitEnumerator.java   |  4 +-
 10 files changed, 133 insertions(+), 12 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index cf47f0cbc..502905018 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -97,6 +97,17 @@ This document records some common error codes and 
corresponding solutions of Sea
 | HIVE-02 | Initialize hive metastore client failed                       | 
When users encounter this error code, it means that connect to hive metastore 
service failed, please check it whether is work |
 | HIVE-03 | Get hive table information from hive metastore service failed | 
When users encounter this error code, it means that hive metastore service has 
some problems, please check it whether is work |
 
+## Kafka Connector Error Codes
+
+| code     | description                                                       
                        | solution                                              
                                                                            |
+|----------|-------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
+| KAFKA-01 | Incompatible KafkaProducer version                                
                        | When users encounter this error code, it means that 
KafkaProducer version is incompatible, please check it                        |
+| KAFKA-02 | Get transactionManager in KafkaProducer exception                 
                        | When users encounter this error code, it means that 
can not get transactionManager in KafkaProducer, please check it              |
+| KAFKA-03 | Add the split checkpoint state to reader failed                   
                        | When users encounter this error code, it means that 
add the split checkpoint state to reader failed, please retry it              |
+| KAFKA-04 | Add a split back to the split enumerator,it will only happen when 
a SourceReader failed   | When users encounter this error code, it means that 
add a split back to the split enumerator failed, please check it              | 
+| KAFKA-05 | Error occurred when the kafka consumer thread was running         
                        | When users encounter this error code, it means that 
an error occurred when the kafka consumer thread was running, please check it |
+| KAFKA-06 | Kafka failed to consume data                                      
                        | When users encounter this error code, it means that 
Kafka failed to consume data, please check config and retry it                |
+
 ## InfluxDB Connector Error Codes
 
 | code        | description                                                    
  | solution                                                                    
                                |
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorErrorCode.java
new file mode 100644
index 000000000..e2ecfe80e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorErrorCode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum KafkaConnectorErrorCode implements SeaTunnelErrorCode {
+    VERSION_INCOMPATIBLE("KAFKA-01", "Incompatible KafkaProducer version"),
+    GET_TRANSACTIONMANAGER_FAILED("KAFKA-02", "Get transactionManager in 
KafkaProducer failed"),
+    ADD_SPLIT_CHECKPOINT_FAILED("KAFKA-03", "Add the split checkpoint state to 
reader failed"),
+    ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED("KAFKA-04", "Add a split back to the 
split enumerator failed,it will only happen when a SourceReader failed"),
+    CONSUME_THREAD_RUN_ERROR("KAFKA-05", "Error occurred when the kafka 
consumer thread was running"),
+    CONSUME_DATA_FAILED("KAFKA-06", "Kafka failed to consume data");
+
+    private final String code;
+    private final String description;
+
+    KafkaConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+
+    @Override
+    public String getErrorMessage() {
+        return SeaTunnelErrorCode.super.getErrorMessage();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorException.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorException.java
new file mode 100644
index 000000000..d98785171
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class KafkaConnectorException extends SeaTunnelRuntimeException {
+    public KafkaConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public KafkaConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public KafkaConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
index 688e1284b..34a5b5ad3 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.common.utils.ReflectionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -131,7 +133,8 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
             return constructor.newInstance(producerId, epoch);
         } catch (InvocationTargetException | InstantiationException | 
IllegalAccessException |
                  NoSuchFieldException | NoSuchMethodException e) {
-            throw new RuntimeException("Incompatible KafkaProducer version", 
e);
+            throw new 
KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE,
+                    "Incompatible KafkaProducer version", e);
         }
     }
 
@@ -139,7 +142,8 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
         Optional<Object> transactionManagerOptional = 
ReflectionUtils.getField(this, KafkaProducer.class,
                 "transactionManager");
         if (!transactionManagerOptional.isPresent()) {
-            throw new RuntimeException("can't get transactionManager in 
KafkaProducer");
+            throw new 
KafkaConnectorException(KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED,
+                    "Can't get transactionManager in KafkaProducer");
         }
         return transactionManagerOptional.get();
     }
@@ -155,7 +159,8 @@ public class KafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
             Class<Enum> cl = (Class<Enum>) 
Class.forName(TRANSACTION_MANAGER_STATE_ENUM);
             return Enum.valueOf(cl, enumName);
         } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Incompatible KafkaProducer version", 
e);
+            throw new 
KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE,
+                    "Incompatible KafkaProducer version", e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index dff69940b..9e651bab1 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -21,6 +21,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOT
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -32,6 +33,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -58,7 +60,9 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow, 
KafkaSinkState, Ka
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
TOPIC.key(), BOOTSTRAP_SERVERS.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
KafkaConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s", getPluginName(), PluginType.SINK, result.getMsg())
+            );
         }
         this.pluginConfig = pluginConfig;
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index dc143ec60..f7ca6365e 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -28,7 +28,9 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
@@ -124,7 +126,8 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         try (KafkaProduceSender<?, ?> kafkaProduceSender = 
kafkaProducerSender) {
             // no-opt
         } catch (Exception e) {
-            throw new RuntimeException("Close kafka sink writer error", e);
+            throw new 
KafkaConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED,
+                    "Close kafka sink writer error", e);
         }
     }
 
@@ -177,8 +180,8 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
             List<String> rowTypeFieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
             for (String partitionKeyField : partitionKeyFields) {
                 if (!rowTypeFieldNames.contains(partitionKeyField)) {
-                    throw new IllegalArgumentException(String.format(
-                            "Partition key field not found: %s, rowType: %s", 
partitionKeyField, rowTypeFieldNames));
+                    throw new 
KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            String.format("Partition key field not found: %s, 
rowType: %s", partitionKeyField, rowTypeFieldNames));
                 }
             }
             return partitionKeyFields;
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
index 618854a38..b118c5674 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -50,7 +53,7 @@ public class KafkaConsumerThread implements Runnable {
                     task.accept(consumer);
                 }
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                throw new 
KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 01ab14cc6..d0a1931f6 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -34,6 +34,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPI
 
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -49,6 +50,7 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
@@ -89,7 +91,9 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     public void prepare(Config config) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(config, 
TOPIC.key(), BOOTSTRAP_SERVERS.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
KafkaConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s", getPluginName(), PluginType.SOURCE, result.getMsg())
+            );
         }
         this.metadata.setTopic(config.getString(TOPIC.key()));
         if (config.hasPath(PATTERN.key())) {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 7eb68237f..28e246d4b 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -144,7 +146,7 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
                     completableFuture.complete(null);
                 });
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                throw new 
KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
             }
             completableFuture.join();
         });
@@ -169,7 +171,7 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
             try {
                 pendingPartitionsQueue.put(s);
             } catch (InterruptedException e) {
-                throw new RuntimeException(e);
+                throw new 
KafkaConnectorException(KafkaConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
             }
         });
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 71760b96b..0551103e0 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -19,6 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.config.Common;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
 import lombok.extern.slf4j.Slf4j;
@@ -173,7 +175,7 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
             });
             return splits.stream().collect(Collectors.toMap(split -> 
split.getTopicPartition(), split -> split));
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new 
KafkaConnectorException(KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED,
 e);
         }
     }
 

Reply via email to