This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4fe932341 [Improve][Connector-V2][Pulsar] Unified exception for Pulsar
source &… (#3590)
4fe932341 is described below
commit 4fe9323419e52af60dea6a9b17f8cc8cd6c0d499
Author: john <[email protected]>
AuthorDate: Sat Dec 3 21:29:07 2022 +0800
[Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &…
(#3590)
* [Improve][Connector-V2][Pulsar] Unified exception for Pulsar source and
sink
---
.../connector-v2/Error-Quick-Reference-Manual.md | 14 ++++++-
.../seatunnel/pulsar/config/PulsarConfigUtil.java | 11 +++--
.../pulsar/exception/PulsarConnectorErrorCode.java | 48 ++++++++++++++++++++++
.../pulsar/exception/PulsarConnectorException.java | 35 ++++++++++++++++
.../seatunnel/pulsar/source/PulsarSource.java | 20 ++++-----
.../source/enumerator/PulsarSplitEnumerator.java | 6 ++-
.../cursor/start/SubscriptionStartCursor.java | 4 +-
.../cursor/stop/LatestMessageStopCursor.java | 4 +-
.../enumerator/discoverer/TopicListDiscoverer.java | 4 +-
.../discoverer/TopicPatternDiscoverer.java | 6 ++-
.../pulsar/source/reader/PulsarSourceReader.java | 8 ++--
.../source/reader/PulsarSplitReaderThread.java | 4 +-
12 files changed, 138 insertions(+), 26 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 4ed61a0df..4b5ef6b10 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -168,4 +168,16 @@ problems encountered by users.
| CLICKHOUSE-03 | Can’t delete directory
| When users encounter this error code, it means that the
directory does not exist or does not have permission, please check
|
| CLICKHOUSE-04 | Ssh operation failed, such as
(login,connect,authentication,close) etc... | When users encounter this error
code, it means that the ssh request failed, please check your network
environment |
| CLICKHOUSE-05 | Get cluster list from clickhouse failed
| When users encounter this error code, it means that the
clickhouse cluster is not configured correctly, please check
|
-| CLICKHOUSE-06 | Shard key not found in table
| When users encounter this error code, it means that the shard
key of the distributed table is not configured, please check
|
\ No newline at end of file
+| CLICKHOUSE-06 | Shard key not found in table
| When users encounter this error code, it means that the shard
key of the distributed table is not configured, please check
|
+
+## Pulsar Connector Error Codes
+
+| code | description
| solution
|
+|--------------|--------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| PULSAR-01 | Open pulsar admin failed
| When users encounter this error code, it means that open pulsar admin failed,
please check it |
+| PULSAR-02 | Open pulsar client failed
| When users encounter this error code, it means that open pulsar client
failed, please check it |
+| PULSAR-03 | Pulsar authentication failed
| When users encounter this error code, it means that Pulsar Authentication
failed, please check it |
+| PULSAR-04 | Subscribe topic from pulsar failed
| When users encounter this error code, it means that Subscribe topic from
pulsar failed, please check it |
+| PULSAR-05 | Get last cursor of pulsar topic failed
| When users encounter this error code, it means that get last cursor of pulsar
topic failed, please check it |
+| PULSAR-06 | Get partition information of pulsar topic failed
| When users encounter this error code, it means that Get partition information
of pulsar topic failed, please check it |
+
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
index 85703ad56..287d38e45 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
@@ -43,7 +46,7 @@ public class PulsarConfigUtil {
try {
return builder.build();
} catch (PulsarClientException e) {
- throw new RuntimeException(e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
}
}
@@ -54,7 +57,7 @@ public class PulsarConfigUtil {
try {
return builder.build();
} catch (PulsarClientException e) {
- throw new RuntimeException(e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_CLIENT_FAILED, e);
}
}
@@ -74,10 +77,10 @@ public class PulsarConfigUtil {
try {
return
AuthenticationFactory.create(config.getAuthPluginClassName(),
config.getAuthParams());
} catch (PulsarClientException.UnsupportedAuthenticationException
e) {
- throw new RuntimeException("Failed to create the
authentication plug-in.", e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED,
e);
}
} else {
- throw new IllegalArgumentException("Authentication parameters are
required when using authentication plug-in.");
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED,
"Authentication parameters are required when using authentication plug-in.");
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
new file mode 100644
index 000000000..513e5dac6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorErrorCode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum PulsarConnectorErrorCode implements SeaTunnelErrorCode {
+
+ OPEN_PULSAR_ADMIN_FAILED("PULSAR-01", "Open pulsar admin failed"),
+ OPEN_PULSAR_CLIENT_FAILED("PULSAR-02", "Open pulsar client failed"),
+ PULSAR_AUTHENTICATION_FAILED("PULSAR-03", "Pulsar authentication failed"),
+ SUBSCRIBE_TOPIC_FAILED("PULSAR-04", "Subscribe topic from pulsar failed"),
+ GET_LAST_CURSOR_FAILED("PULSAR-05", "Get last cursor of pulsar topic
failed"),
+ GET_TOPIC_PARTITION_FAILED("PULSAR-06", "Get partition information of
pulsar topic failed");
+
+ private final String code;
+ private final String description;
+
+ PulsarConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorException.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorException.java
new file mode 100644
index 000000000..687d32896
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/exception/PulsarConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class PulsarConnectorException extends SeaTunnelRuntimeException {
+ public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public PulsarConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 769e1143a..fe35f4bac 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -33,13 +33,12 @@ import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST;
-import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
import static
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -56,6 +55,7 @@ import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfi
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
@@ -103,7 +103,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit,
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config,
SUBSCRIPTION_NAME.key(), CLIENT_SERVICE_URL.key(), ADMIN_SERVICE_URL.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s", getPluginName(),
PluginType.SOURCE, result.getMsg()));
}
// admin config
@@ -155,7 +155,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T,
PulsarPartitionSplit,
if (partitionDiscoverer instanceof TopicPatternDiscoverer
&& partitionDiscoveryIntervalMs > 0
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
- throw new IllegalArgumentException("Bounded streams do not support
dynamic partition discovery.");
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"Bounded streams do not support dynamic partition discovery.");
}
}
@@ -177,12 +177,12 @@ public class PulsarSource<T> implements
SeaTunnelSource<T, PulsarPartitionSplit,
break;
case TIMESTAMP:
if
(StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP.key()))) {
- throw new IllegalArgumentException(String.format("The '%s'
property is required when the '%s' is 'timestamp'.",
CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key()));
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format("The '%s' property is required when the '%s' is 'timestamp'.",
CURSOR_STARTUP_TIMESTAMP.key(), CURSOR_STARTUP_MODE.key()));
}
setOption(config, CURSOR_STARTUP_TIMESTAMP.key(),
config::getLong, timestamp -> this.startCursor =
StartCursor.timestamp(timestamp));
break;
default:
- throw new IllegalArgumentException(String.format("The %s mode
is not supported.", startMode));
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format("The %s mode is not supported.", startMode));
}
}
@@ -197,12 +197,12 @@ public class PulsarSource<T> implements
SeaTunnelSource<T, PulsarPartitionSplit,
break;
case TIMESTAMP:
if
(StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP.key()))) {
- throw new IllegalArgumentException(String.format("The '%s'
property is required when the '%s' is 'timestamp'.",
CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key()));
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format("The '%s' property is required when the '%s' is 'timestamp'.",
CURSOR_STOP_TIMESTAMP.key(), CURSOR_STOP_MODE.key()));
}
setOption(config, CURSOR_STARTUP_TIMESTAMP.key(),
config::getLong, timestamp -> this.stopCursor =
StopCursor.timestamp(timestamp));
break;
default:
- throw new IllegalArgumentException(String.format("The %s mode
is not supported.", stopMode));
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("The %s mode is not supported.", stopMode));
}
}
@@ -214,12 +214,12 @@ public class PulsarSource<T> implements
SeaTunnelSource<T, PulsarPartitionSplit,
String topicPattern = config.getString(TOPIC_PATTERN.key());
if (StringUtils.isNotBlank(topicPattern)) {
if (this.partitionDiscoverer != null) {
- throw new IllegalArgumentException(String.format("The
properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key()));
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(),
TOPIC_PATTERN.key()));
}
this.partitionDiscoverer = new
TopicPatternDiscoverer(Pattern.compile(topicPattern));
}
if (this.partitionDiscoverer == null) {
- throw new IllegalArgumentException(String.format("The properties
'%s' or '%s' is required.", TOPIC.key(), TOPIC_PATTERN.key()));
+ throw new
PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
String.format("The properties '%s' or '%s' is required.", TOPIC.key(),
TOPIC_PATTERN.key()));
}
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
index 8b7f6f8dd..c9d8a0d0e 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -19,8 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
@@ -107,10 +109,10 @@ public class PulsarSplitEnumerator implements
SourceSplitEnumerator<PulsarPartit
StopCursor stopCursor,
String subscriptionName,
Set<TopicPartition> assignedPartitions) {
- if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
+ if (partitionDiscoverer instanceof TopicPatternDiscoverer
&& partitionDiscoveryIntervalMs > 0
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
- throw new IllegalArgumentException("Bounded streams do not support
dynamic partition discovery.");
+ throw new
PulsarConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, "Bounded
streams do not support dynamic partition discovery.");
}
this.context = context;
this.adminConfig = adminConfig;
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
index e62d24079..a6c053c4f 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
@@ -18,6 +18,8 @@
package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -48,7 +50,7 @@ public class SubscriptionStartCursor implements StartCursor {
}
pulsarAdmin.topics().createSubscription(partition.getFullTopicName(),
subscription, CursorResetStrategy.EARLIEST == cursorResetStrategy ?
MessageId.earliest : MessageId.latest);
} catch (PulsarAdminException e) {
- throw new RuntimeException(e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
index 14aeeff28..51a7fdb5d 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -18,6 +18,8 @@
package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -41,7 +43,7 @@ public class LatestMessageStopCursor implements StopCursor {
try {
messageId = admin.topics().getLastMessageId(topic);
} catch (PulsarAdminException e) {
- throw new RuntimeException("Failed to get the last cursor", e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.GET_LAST_CURSOR_FAILED,
"Failed to get the last cursor", e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
index b362446d0..3dcf042e6 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
@@ -18,6 +18,8 @@
package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -53,7 +55,7 @@ public class TopicListDiscoverer implements PulsarDiscoverer {
return PulsarDiscoverer.toTopicPartitions(topicName,
metadata.partitions);
} catch (PulsarAdminException e) {
// This method would cause the failure for subscriber.
- throw new IllegalStateException(e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.SUBSCRIBE_TOPIC_FAILED, e);
}
})
.filter(Objects::nonNull)
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
index d5eaedac5..d6a22cdea 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
@@ -17,6 +17,8 @@
package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -71,14 +73,14 @@ public class TopicPatternDiscoverer implements
PulsarDiscoverer {
return PulsarDiscoverer.toTopicPartitions(topicName,
metadata.partitions);
} catch (PulsarAdminException e) {
// This method would cause the failure for subscriber.
- throw new IllegalStateException(e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.GET_TOPIC_PARTITION_FAILED,
e);
}
}).filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
} catch (PulsarAdminException e) {
// This method would cause the failure for subscriber.
- throw new IllegalStateException(e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.GET_TOPIC_PARTITION_FAILED,
e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
index ddb2e9147..af3cf5120 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
@@ -21,9 +21,11 @@ import
org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.common.Handover;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
@@ -113,7 +115,7 @@ public class PulsarSourceReader<T> implements
SourceReader<T, PulsarPartitionSpl
try {
reader.close();
} catch (IOException e) {
- throw new RuntimeException("Failed to close the split reader
thread.", e);
+ throw new
PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to
close the split reader thread.", e);
}
});
}
@@ -166,7 +168,7 @@ public class PulsarSourceReader<T> implements
SourceReader<T, PulsarPartitionSpl
splitReaders.put(split.splitId(), splitReaderThread);
splitReaderThread.start();
} catch (PulsarClientException e) {
- throw new RuntimeException("Failed to start the split reader
thread.", e);
+ throw new
PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to
start the split reader thread.", e);
}
});
}
@@ -216,7 +218,7 @@ public class PulsarSourceReader<T> implements
SourceReader<T, PulsarPartitionSpl
try {
splitReaders.get(splitId).close();
} catch (IOException e) {
- throw new RuntimeException("Failed to close the split
reader thread.", e);
+ throw new
PulsarConnectorException(CommonErrorCode.READER_OPERATION_FAILED, "Failed to
close the split reader thread.", e);
}
}
});
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
index 9817046b7..0fe1e5e26 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
import org.apache.seatunnel.common.Handover;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import
org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
@@ -137,7 +139,7 @@ public class PulsarSplitReaderThread extends Thread
implements Closeable {
try {
return consumerBuilder.subscribe();
} catch (PulsarClientException e) {
- throw new RuntimeException("Failed to create pulsar consumer:", e);
+ throw new
PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED,
"Failed to create pulsar consumer:", e);
}
}