This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3b573798d [Improve][Connector-V2][Kafka]Unified exception for Kafka
source and sink connector (#3574)
3b573798d is described below
commit 3b573798db97d9aa0b41cb2828dbfc897658ad6f
Author: TaoZex <[email protected]>
AuthorDate: Sat Nov 26 20:40:01 2022 +0800
[Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink
connector (#3574)
* [Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink
connector
* [Improve][Connector-V2][Kafka]Remove redundant exception code
---
.../connector-v2/Error-Quick-Reference-Manual.md | 11 +++++
.../kafka/exception/KafkaConnectorErrorCode.java | 52 ++++++++++++++++++++++
.../kafka/exception/KafkaConnectorException.java | 35 +++++++++++++++
.../kafka/sink/KafkaInternalProducer.java | 11 +++--
.../connectors/seatunnel/kafka/sink/KafkaSink.java | 6 ++-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 9 ++--
.../kafka/source/KafkaConsumerThread.java | 5 ++-
.../seatunnel/kafka/source/KafkaSource.java | 6 ++-
.../seatunnel/kafka/source/KafkaSourceReader.java | 6 ++-
.../kafka/source/KafkaSourceSplitEnumerator.java | 4 +-
10 files changed, 133 insertions(+), 12 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index cf47f0cbc..502905018 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -97,6 +97,17 @@ This document records some common error codes and
corresponding solutions of Sea
| HIVE-02 | Initialize hive metastore client failed |
When users encounter this error code, it means that connect to hive metastore
service failed, please check it whether is work |
| HIVE-03 | Get hive table information from hive metastore service failed |
When users encounter this error code, it means that hive metastore service has
some problems, please check it whether is work |
+## Kafka Connector Error Codes
+
+| code | description
| solution
|
+|----------|-------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
+| KAFKA-01 | Incompatible KafkaProducer version
| When users encounter this error code, it means that
KafkaProducer version is incompatible, please check it |
+| KAFKA-02 | Get transactionManager in KafkaProducer exception
| When users encounter this error code, it means that
can not get transactionManager in KafkaProducer, please check it |
+| KAFKA-03 | Add the split checkpoint state to reader failed
| When users encounter this error code, it means that
add the split checkpoint state to reader failed, please retry it |
+| KAFKA-04 | Add a split back to the split enumerator,it will only happen when
a SourceReader failed | When users encounter this error code, it means that
add a split back to the split enumerator failed, please check it |
+| KAFKA-05 | Error occurred when the kafka consumer thread was running
| When users encounter this error code, it means that
an error occurred when the kafka consumer thread was running, please check it |
+| KAFKA-06 | Kafka failed to consume data
| When users encounter this error code, it means that
Kafka failed to consume data, please check config and retry it |
+
## InfluxDB Connector Error Codes
| code | description
| solution
|
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorErrorCode.java
new file mode 100644
index 000000000..e2ecfe80e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorErrorCode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum KafkaConnectorErrorCode implements SeaTunnelErrorCode {
+ VERSION_INCOMPATIBLE("KAFKA-01", "Incompatible KafkaProducer version"),
+ GET_TRANSACTIONMANAGER_FAILED("KAFKA-02", "Get transactionManager in
KafkaProducer failed"),
+ ADD_SPLIT_CHECKPOINT_FAILED("KAFKA-03", "Add the split checkpoint state to
reader failed"),
+ ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED("KAFKA-04", "Add a split back to the
split enumerator failed,it will only happen when a SourceReader failed"),
+ CONSUME_THREAD_RUN_ERROR("KAFKA-05", "Error occurred when the kafka
consumer thread was running"),
+ CONSUME_DATA_FAILED("KAFKA-06", "Kafka failed to consume data");
+
+ private final String code;
+ private final String description;
+
+ KafkaConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return SeaTunnelErrorCode.super.getErrorMessage();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorException.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorException.java
new file mode 100644
index 000000000..d98785171
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/exception/KafkaConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class KafkaConnectorException extends SeaTunnelRuntimeException {
+ public KafkaConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public KafkaConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public KafkaConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
index 688e1284b..34a5b5ad3 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.common.utils.ReflectionUtils;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -131,7 +133,8 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
return constructor.newInstance(producerId, epoch);
} catch (InvocationTargetException | InstantiationException |
IllegalAccessException |
NoSuchFieldException | NoSuchMethodException e) {
- throw new RuntimeException("Incompatible KafkaProducer version",
e);
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE,
+ "Incompatible KafkaProducer version", e);
}
}
@@ -139,7 +142,8 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
Optional<Object> transactionManagerOptional =
ReflectionUtils.getField(this, KafkaProducer.class,
"transactionManager");
if (!transactionManagerOptional.isPresent()) {
- throw new RuntimeException("can't get transactionManager in
KafkaProducer");
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED,
+ "Can't get transactionManager in KafkaProducer");
}
return transactionManagerOptional.get();
}
@@ -155,7 +159,8 @@ public class KafkaInternalProducer<K, V> extends
KafkaProducer<K, V> {
Class<Enum> cl = (Class<Enum>)
Class.forName(TRANSACTION_MANAGER_STATE_ENUM);
return Enum.valueOf(cl, enumName);
} catch (ClassNotFoundException e) {
- throw new RuntimeException("Incompatible KafkaProducer version",
e);
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.VERSION_INCOMPATIBLE,
+ "Incompatible KafkaProducer version", e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index dff69940b..9e651bab1 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -21,6 +21,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOT
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -32,6 +33,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -58,7 +60,9 @@ public class KafkaSink implements SeaTunnelSink<SeaTunnelRow,
KafkaSinkState, Ka
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
TOPIC.key(), BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ throw new
KafkaConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s", getPluginName(), PluginType.SINK, result.getMsg())
+ );
}
this.pluginConfig = pluginConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index dc143ec60..f7ca6365e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -28,7 +28,9 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
@@ -124,7 +126,8 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
try (KafkaProduceSender<?, ?> kafkaProduceSender =
kafkaProducerSender) {
// no-opt
} catch (Exception e) {
- throw new RuntimeException("Close kafka sink writer error", e);
+ throw new
KafkaConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED,
+ "Close kafka sink writer error", e);
}
}
@@ -177,8 +180,8 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
List<String> rowTypeFieldNames =
Arrays.asList(seaTunnelRowType.getFieldNames());
for (String partitionKeyField : partitionKeyFields) {
if (!rowTypeFieldNames.contains(partitionKeyField)) {
- throw new IllegalArgumentException(String.format(
- "Partition key field not found: %s, rowType: %s",
partitionKeyField, rowTypeFieldNames));
+ throw new
KafkaConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format("Partition key field not found: %s,
rowType: %s", partitionKeyField, rowTypeFieldNames));
}
}
return partitionKeyFields;
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
index 618854a38..b118c5674 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -50,7 +53,7 @@ public class KafkaConsumerThread implements Runnable {
task.accept(consumer);
}
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 01ab14cc6..d0a1931f6 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -34,6 +34,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPI
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -49,6 +50,7 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
@@ -89,7 +91,9 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config,
TOPIC.key(), BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
KafkaConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message:
%s", getPluginName(), PluginType.SOURCE, result.getMsg())
+ );
}
this.metadata.setTopic(config.getString(TOPIC.key()));
if (config.hasPath(PATTERN.key())) {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 7eb68237f..28e246d4b 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -144,7 +146,7 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
completableFuture.complete(null);
});
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.CONSUME_DATA_FAILED, e);
}
completableFuture.join();
});
@@ -169,7 +171,7 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
try {
pendingPartitionsQueue.put(s);
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
}
});
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 71760b96b..0551103e0 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -19,6 +19,8 @@ package
org.apache.seatunnel.connectors.seatunnel.kafka.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.config.Common;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import lombok.extern.slf4j.Slf4j;
@@ -173,7 +175,7 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
});
return splits.stream().collect(Collectors.toMap(split ->
split.getTopicPartition(), split -> split));
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new
KafkaConnectorException(KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED,
e);
}
}