This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 612d0297a [Improve][Connector-V2][StarRocks] Unified exception for 
StarRocks source and sink (#3593)
612d0297a is described below

commit 612d0297a09bc46e3fbd5f6cb2ee2ab43ea8c586
Author: john <[email protected]>
AuthorDate: Mon Dec 5 18:32:31 2022 +0800

    [Improve][Connector-V2][StarRocks] Unified exception for StarRocks source 
and sink (#3593)
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  6 ++++
 .../starrocks/client/StarRocksSinkManager.java     | 11 +++---
 .../client/StarRocksStreamLoadVisitor.java         | 25 +++++++------
 .../exception/StarRocksConnectorErrorCode.java     | 42 ++++++++++++++++++++++
 .../StarRocksConnectorException.java}              | 30 ++++++++--------
 .../serialize/StarRocksBaseSerializer.java         |  4 ++-
 .../serialize/StarRocksDelimiterParser.java        |  9 +++--
 .../seatunnel/starrocks/sink/StarRocksSink.java    |  6 +++-
 .../starrocks/sink/StarRocksSinkWriter.java        |  6 ++--
 9 files changed, 100 insertions(+), 39 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 4b5ef6b10..efca58358 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -181,3 +181,9 @@ problems encountered by users.
 | PULSAR-05 | Get last cursor of pulsar topic failed                          
| When users encounter this error code, it means that get last cursor of pulsar 
topic failed, please check it                          |
 | PULSAR-06 | Get partition information of pulsar topic failed                
| When users encounter this error code, it means that Get partition information 
of pulsar topic failed, please check it                |
 
+## StarRocks Connector Error Codes
+
+| code    | description                 | solution                             
                                                                                
                                            |
+|---------|-----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| STARROCKS-01 | Flush batch data to sink connector failed        | When users 
encounter this error code, it means that flush batch data to sink connector 
failed, please check it                                          |
+| STARROCKS-02 | Writing records to StarRocks failed. | When users encounter 
this error code, it means that writing records to StarRocks failed, please 
check data from files whether is correct |
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
index ab05da5b5..f51feff7f 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
 
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import com.google.common.base.Strings;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -116,10 +118,10 @@ public class StarRocksSinkManager {
             } catch (Exception e) {
                 log.warn("Writing records to StarRocks failed, retry times = 
{}", i, e);
                 if (i >= sinkConfig.getMaxRetries()) {
-                    throw new IOException("Writing records to StarRocks 
failed.", e);
+                    throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.WRITE_RECORDS_FAILED, 
"The number of retries was exceeded, writing records to StarRocks failed.", e);
                 }
 
-                if (e instanceof StarRocksStreamLoadFailedException && 
((StarRocksStreamLoadFailedException) e).needReCreateLabel()) {
+                if (e instanceof StarRocksConnectorException && 
((StarRocksConnectorException) e).needReCreateLabel()) {
                     String newLabel = createBatchLabel();
                     log.warn(String.format("Batch label changed from [%s] to 
[%s]", tuple.getLabel(), newLabel));
                     tuple.setLabel(newLabel);
@@ -131,8 +133,7 @@ public class StarRocksSinkManager {
                     Thread.sleep(backoff);
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
-                    throw new IOException(
-                            "Unable to flush; interrupted while doing another 
attempt.", e);
+                    throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, e);
                 }
             }
         }
@@ -143,7 +144,7 @@ public class StarRocksSinkManager {
 
     private void checkFlushException() {
         if (flushException != null) {
-            throw new RuntimeException("Writing records to StarRocks failed.", 
flushException);
+            throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
flushException);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
index 7b277b3a9..7d5c1d71b 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
 
 import org.apache.commons.codec.binary.Base64;
@@ -62,7 +65,7 @@ public class StarRocksStreamLoadVisitor {
     public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws 
IOException {
         String host = getAvailableHost();
         if (null == host) {
-            throw new IOException("None of the host in `load_url` could be 
connected.");
+            throw new 
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host 
in `load_url` could be connected.");
         }
         String loadUrl = new StringBuilder(host)
                 .append("/api/")
@@ -78,7 +81,7 @@ public class StarRocksStreamLoadVisitor {
         final String keyStatus = "Status";
         if (null == loadResult || !loadResult.containsKey(keyStatus)) {
             LOG.error("unknown result status. {}", loadResult);
-            throw new IOException("Unable to flush data to StarRocks: unknown 
result status. " + loadResult);
+            throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED,  
"Unable to flush data to StarRocks: unknown result status. " + loadResult);
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug(new StringBuilder("StreamLoad 
response:\n").append(JsonUtils.toJsonString(loadResult)).toString());
@@ -101,7 +104,7 @@ public class StarRocksStreamLoadVisitor {
                 errorBuilder.append(JsonUtils.toJsonString(loadResult));
                 errorBuilder.append('\n');
             }
-            throw new IOException(errorBuilder.toString());
+            throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
errorBuilder.toString());
         } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
             LOG.debug(new StringBuilder("StreamLoad 
response:\n").append(JsonUtils.toJsonString(loadResult)).toString());
             // has to block-checking the state to get the final result
@@ -149,7 +152,7 @@ public class StarRocksStreamLoadVisitor {
             bos.put("]".getBytes(StandardCharsets.UTF_8));
             return bos.array();
         }
-        throw new RuntimeException("Failed to join rows data, unsupported 
`format` from stream load properties:");
+        throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
"Failed to join rows data, unsupported `format` from stream load properties:");
     }
 
     @SuppressWarnings("unchecked")
@@ -165,12 +168,12 @@ public class StarRocksStreamLoadVisitor {
                 String queryLoadStateUrl = new 
StringBuilder(host).append("/api/").append(sinkConfig.getDatabase()).append("/get_load_state?label=").append(label).toString();
                 Map<String, Object> result = 
httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label));
                 if (result == null) {
-                    throw new IOException(String.format("Failed to flush data 
to StarRocks, Error " +
+                    throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
String.format("Failed to flush data to StarRocks, Error " +
                             "could not get the final state of label[%s].\n", 
label), null);
                 }
                 String labelState = (String) result.get("state");
                 if (null == labelState) {
-                    throw new IOException(String.format("Failed to flush data 
to StarRocks, Error " +
+                    throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
String.format("Failed to flush data to StarRocks, Error " +
                             "could not get the final state of label[%s]. 
response[%s]\n", label, JsonUtils.toJsonString(result)), null);
                 }
                 LOG.info(String.format("Checking label[%s] state[%s]\n", 
label, labelState));
@@ -181,15 +184,15 @@ public class StarRocksStreamLoadVisitor {
                     case RESULT_LABEL_PREPARE:
                         continue;
                     case RESULT_LABEL_ABORTED:
-                        throw new 
StarRocksStreamLoadFailedException(String.format("Failed to flush data to 
StarRocks, Error " +
-                                "label[%s] state[%s]\n", label, labelState), 
null, true);
+                        throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
String.format("Failed to flush data to StarRocks, Error " +
+                                "label[%s] state[%s]\n", label, labelState), 
true);
                     case RESULT_LABEL_UNKNOWN:
                     default:
-                        throw new 
StarRocksStreamLoadFailedException(String.format("Failed to flush data to 
StarRocks, Error " +
-                                "label[%s] state[%s]\n", label, labelState), 
null);
+                        throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, 
String.format("Failed to flush data to StarRocks, Error " +
+                                "label[%s] state[%s]\n", label, labelState));
                 }
             } catch (IOException e) {
-                throw new IOException(e);
+                throw new 
StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorErrorCode.java
new file mode 100644
index 000000000..4a6a362d9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.starrocks.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum StarRocksConnectorErrorCode implements SeaTunnelErrorCode {
+  FLUSH_DATA_FAILED("STARROCKS-01", "Flush batch data to sink connector 
failed"),
+  WRITE_RECORDS_FAILED("STARROCKS-02", "Writing records to StarRocks failed.");
+    private final String code;
+    private final String description;
+
+    StarRocksConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorException.java
similarity index 51%
rename from 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
rename to 
seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorException.java
index 626b38d3f..60713e088 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadFailedException.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/exception/StarRocksConnectorException.java
@@ -15,35 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
+package org.apache.seatunnel.connectors.seatunnel.starrocks.exception;
 
-import java.io.IOException;
-import java.util.Map;
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 
-public class StarRocksStreamLoadFailedException extends IOException {
+public class StarRocksConnectorException extends SeaTunnelRuntimeException {
 
-    static final long serialVersionUID = 1L;
-
-    private final Map<String, Object> response;
     private boolean reCreateLabel;
 
-    public StarRocksStreamLoadFailedException(String message, Map<String, 
Object> response) {
-        super(message);
-        this.response = response;
+    public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
     }
 
-    public StarRocksStreamLoadFailedException(String message, Map<String, 
Object> response, boolean reCreateLabel) {
-        super(message);
-        this.response = response;
+    public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, boolean reCreateLabel) {
+        super(seaTunnelErrorCode, errorMessage);
         this.reCreateLabel = reCreateLabel;
     }
 
-    public Map<String, Object> getFailedResponse() {
-        return response;
+    public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public StarRocksConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
     }
 
     public boolean needReCreateLabel() {
         return reCreateLabel;
     }
-
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
index 190b18b95..7f9cc43ee 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java
@@ -18,10 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import lombok.Builder;
 
@@ -70,7 +72,7 @@ public class StarRocksBaseSerializer {
             case BYTES:
                 return new String((byte[]) val);
             default:
-                throw new UnsupportedOperationException("Unsupported dataType: 
" + dataType);
+                throw new 
StarRocksConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, dataType + " 
is not supported ");
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
index 1b7ea726f..52b7b73b0 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksDelimiterParser.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
+
 import com.google.common.base.Strings;
 
 import java.io.StringWriter;
@@ -36,14 +39,14 @@ public class StarRocksDelimiterParser {
         String hexStr = sp.substring(2);
         // check hex str
         if (hexStr.isEmpty()) {
-            throw new RuntimeException("Failed to parse delimiter: `Hex str is 
empty`");
+            throw new 
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to parse 
delimiter: `Hex str is empty`");
         }
         if (hexStr.length() % 2 != 0) {
-            throw new RuntimeException("Failed to parse delimiter: `Hex str 
length error`");
+            throw new 
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to parse 
delimiter: `Hex str is empty`");
         }
         for (char hexChar : hexStr.toUpperCase().toCharArray()) {
             if (HEX_STRING.indexOf(hexChar) == -1) {
-                throw new RuntimeException("Failed to parse delimiter: `Hex 
str format error`");
+                throw new 
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to parse 
delimiter: `Hex str is empty`");
             }
         }
         // transform to separator
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
index a1748c6ea..16f303742 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java
@@ -24,6 +24,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkCon
 import static 
org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -34,6 +35,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -55,7 +57,9 @@ public class StarRocksSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
         this.pluginConfig = pluginConfig;
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
NODE_URLS.key(), DATABASE.key(), TABLE.key(), USERNAME.key(), PASSWORD.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
StarRocksConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SINK, result.getMsg()));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
index 441af0296..6d0d44dee 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java
@@ -19,9 +19,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.sink;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager;
 import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer;
@@ -73,7 +75,7 @@ public class StarRocksSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             }
         } catch (IOException e) {
             log.error("Close starRocks manager failed.", e);
-            throw new IOException("Close starRocks manager failed.", e);
+            throw new 
StarRocksConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e);
         }
     }
 
@@ -84,6 +86,6 @@ public class StarRocksSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         if 
(SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
             return new StarRocksJsonSerializer(seaTunnelRowType);
         }
-        throw new RuntimeException("Failed to create row serializer, 
unsupported `format` from stream load properties.");
+        throw new 
StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to create 
row serializer, unsupported `format` from stream load properties.");
     }
 }

Reply via email to