This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b73944d1d [Improve][Connector-V2][ElasticSearch] Unified exception for 
ElasticSearch source & sink connector (#3569)
b73944d1d is described below

commit b73944d1dcb836dd61d6b319f172a2f987d01198
Author: FWLamb <[email protected]>
AuthorDate: Sun Nov 27 13:42:35 2022 +0800

    [Improve][Connector-V2][ElasticSearch] Unified exception for ElasticSearch 
source & sink connector (#3569)
    
    * unified exception
    
    * resolve confilcts
    
    * resolve confilcts
    
    * resolve confilcts
    
    Co-authored-by: yangbinbin <[email protected]>
    Co-authored-by: Hisoka <[email protected]>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 12 +++++-
 .../elasticsearch/client/EsRestClient.java         | 48 ++++++++++++----------
 .../constant/ElasticsearchVersion.java             |  6 ++-
 .../exception/BulkElasticsearchException.java      | 30 --------------
 .../exception/ElasticsearchConnectorErrorCode.java | 46 +++++++++++++++++++++
 .../exception/ElasticsearchConnectorException.java | 35 ++++++++++++++++
 .../GetElasticsearchVersionException.java          | 25 -----------
 .../exception/GetIndexDocsCountException.java      | 30 --------------
 .../exception/ScrollRequestException.java          | 30 --------------
 .../serialize/ElasticsearchRowSerializer.java      | 12 +++---
 .../source/DefaultSeaTunnelRowDeserializer.java    | 26 +++++++-----
 .../sink/ElasticsearchSinkWriter.java              | 10 +++--
 .../source/ElasticsearchSourceSplitEnumerator.java | 14 ++++---
 13 files changed, 159 insertions(+), 165 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index ac72f52f1..707609d7a 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -1,6 +1,7 @@
 # Error Quick Reference Manual
 
-This document records some common error codes and corresponding solutions of 
SeaTunnel, aiming to quickly solve the problems encountered by users.
+This document records some common error codes and corresponding solutions of 
SeaTunnel, aiming to quickly solve the
+problems encountered by users.
 
 ## SeaTunnel API Error Codes
 
@@ -96,6 +97,15 @@ This document records some common error codes and 
corresponding solutions of Sea
 | HIVE-02 | Initialize hive metastore client failed                       | 
When users encounter this error code, it means that connect to hive metastore 
service failed, please check it whether is work |
 | HIVE-03 | Get hive table information from hive metastore service failed | 
When users encounter this error code, it means that hive metastore service has 
some problems, please check it whether is work |
 
+## Elasticsearch Connector Error Codes
+
+| code              | description                                   | solution 
                                                                                
                                      |
+|-------------------|-----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
+| ELASTICSEARCH-01  | Bulk es response error                        | When the 
user encounters this error code, it means that the connection was aborted, 
please check it whether is work            |
+| ELASTICSEARCH-02  | Get elasticsearch version failed              | When the 
user encounters this error code, it means that the connection was aborted, 
please check it whether is work            |
+| ELASTICSEARCH-03  | Fail to scroll request                        | When the 
user encounters this error code, it means that the connection was aborted, 
please check it whether is work            |
+| ELASTICSEARCH-04  | Get elasticsearch document index count failed | When the 
user encounters this error code, it means that the es index may not wrong or 
the connection was aborted, please check |
+
 ## Kafka Connector Error Codes
 
 | code     | description                                                       
                        | solution                                              
                                                                            |
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index e8c21f8f1..53efa36f0 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -22,10 +22,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterC
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -96,9 +94,9 @@ public class EsRestClient {
         }
 
         RestClientBuilder builder = RestClient.builder(httpHosts)
-                .setRequestConfigCallback(requestConfigBuilder -> 
requestConfigBuilder
-                        
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
-                        .setSocketTimeout(SOCKET_TIMEOUT));
+            .setRequestConfigCallback(requestConfigBuilder -> 
requestConfigBuilder
+                .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
+                .setSocketTimeout(SOCKET_TIMEOUT));
 
         if (StringUtils.isNotEmpty(username)) {
             CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
@@ -114,7 +112,7 @@ public class EsRestClient {
         try {
             Response response = restClient.performRequest(request);
             if (response == null) {
-                throw new BulkElasticsearchException("bulk es Response is 
null");
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
 "bulk es Response is null");
             }
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                 ObjectMapper objectMapper = new ObjectMapper();
@@ -124,10 +122,10 @@ public class EsRestClient {
                 boolean errors = json.get("errors").asBoolean();
                 return new BulkResponse(errors, took, entity);
             } else {
-                throw new BulkElasticsearchException(String.format("bulk es 
response status code=%d,request boy=%s", 
response.getStatusLine().getStatusCode(), requestBody));
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
 String.format("bulk es response status code=%d,request boy=%s", 
response.getStatusLine().getStatusCode(), requestBody));
             }
         } catch (IOException e) {
-            throw new BulkElasticsearchException(String.format("bulk es 
error,request boy=%s", requestBody), e);
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
 String.format("bulk es error,request boy=%s", requestBody), e);
         }
     }
 
@@ -144,11 +142,11 @@ public class EsRestClient {
             JsonNode versionNode = jsonNode.get("version");
             return versionNode.get("number").asText();
         } catch (IOException e) {
-            throw new GetElasticsearchVersionException("fail to get 
elasticsearch version.", e);
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED,
 "fail to get elasticsearch version.", e);
         }
     }
 
-    public void close()  {
+    public void close() {
         try {
             restClient.close();
         } catch (IOException e) {
@@ -199,7 +197,7 @@ public class EsRestClient {
         try {
             Response response = restClient.performRequest(request);
             if (response == null) {
-                throw new ScrollRequestException("POST " + endpoint + " 
response null");
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
 "POST " + endpoint + " response null");
             }
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                 String entity = EntityUtils.toString(response.getEntity());
@@ -213,10 +211,12 @@ public class EsRestClient {
                 ScrollResult scrollResult = 
getDocsFromScrollResponse(responseJson);
                 return scrollResult;
             } else {
-                throw new ScrollRequestException(String.format("POST %s 
response status code=%d,request boy=%s", endpoint, 
response.getStatusLine().getStatusCode(), requestBody));
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
+                    String.format("POST %s response status code=%d,request 
boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
             }
         } catch (IOException e) {
-            throw new ScrollRequestException(String.format("POST %s 
error,request boy=%s", endpoint, requestBody), e);
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
+                String.format("POST %s error,request boy=%s", endpoint, 
requestBody), e);
 
         }
     }
@@ -240,7 +240,7 @@ public class EsRestClient {
             for (Iterator<Map.Entry<String, JsonNode>> iterator = 
source.fields(); iterator.hasNext(); ) {
                 Map.Entry<String, JsonNode> entry = iterator.next();
                 String fieldName = entry.getKey();
-                if (entry.getValue() instanceof TextNode){
+                if (entry.getValue() instanceof TextNode) {
                     doc.put(fieldName, entry.getValue().textValue());
                 } else {
                     doc.put(fieldName, entry.getValue());
@@ -257,17 +257,19 @@ public class EsRestClient {
         try {
             Response response = restClient.performRequest(request);
             if (response == null) {
-                throw new GetIndexDocsCountException("GET " + endpoint + " 
response null");
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+                    "GET " + endpoint + " response null");
             }
             if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                 String entity = EntityUtils.toString(response.getEntity());
                 List<IndexDocsCount> indexDocsCounts = 
JsonUtils.toList(entity, IndexDocsCount.class);
                 return indexDocsCounts;
             } else {
-                throw new GetIndexDocsCountException(String.format("GET %s 
response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+                    String.format("GET %s response status code=%d", endpoint, 
response.getStatusLine().getStatusCode()));
             }
         } catch (IOException ex) {
-            throw new GetIndexDocsCountException(ex);
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
 ex);
         }
     }
 
@@ -284,10 +286,12 @@ public class EsRestClient {
         try {
             Response response = restClient.performRequest(request);
             if (response == null) {
-                throw new GetIndexDocsCountException("GET " + endpoint + " 
response null");
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+                    "GET " + endpoint + " response null");
             }
             if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-                throw new GetIndexDocsCountException(String.format("GET %s 
response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
+                throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+                    String.format("GET %s response status code=%d", endpoint, 
response.getStatusLine().getStatusCode()));
             }
             String entity = EntityUtils.toString(response.getEntity());
             log.info(String.format("GET %s respnse=%s", endpoint, entity));
@@ -308,7 +312,7 @@ public class EsRestClient {
 
             }
         } catch (IOException ex) {
-            throw new GetIndexDocsCountException(ex);
+            throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
 ex);
         }
         return mapping;
     }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
index 747ba7d2a..9cc1e0554 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
 
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
+
 public enum ElasticsearchVersion {
     ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
 
@@ -40,7 +43,8 @@ public enum ElasticsearchVersion {
                 return elasticsearchVersion;
             }
         }
-        throw new IllegalArgumentException(String.format("version=%d,fail fo 
find ElasticsearchVersion.", version));
+        throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED,
+            String.format("version=%d,fail fo find ElasticsearchVersion.", 
version));
     }
 
     public static ElasticsearchVersion get(String clusterVersion) {
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
deleted file mode 100644
index 14dbfb7ae..000000000
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class BulkElasticsearchException extends RuntimeException {
-
-    public BulkElasticsearchException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public BulkElasticsearchException(String message) {
-        super(message);
-    }
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
new file mode 100644
index 000000000..2fb5c11c9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -0,0 +1,46 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode {
+
+    BULK_RESPONSE_ERROR("ELASTICSEARCH-01", "Bulk es response error"),
+    GET_ES_VERSION_FAILED("ELASTICSEARCH-02", "Get elasticsearch version 
failed"),
+    SCROLL_REQUEST_ERROR("ELASTICSEARCH-03", "Fail to scroll request"),
+    GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch 
document index count failed");
+
+    private final String code;
+    private final String description;
+
+    ElasticsearchConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorException.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorException.java
new file mode 100644
index 000000000..ad41811f7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class ElasticsearchConnectorException extends SeaTunnelRuntimeException 
{
+    public ElasticsearchConnectorException(SeaTunnelErrorCode 
seaTunnelErrorCode, String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public ElasticsearchConnectorException(SeaTunnelErrorCode 
seaTunnelErrorCode, String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public ElasticsearchConnectorException(SeaTunnelErrorCode 
seaTunnelErrorCode, Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
deleted file mode 100644
index c146de07a..000000000
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class GetElasticsearchVersionException extends RuntimeException {
-
-    public GetElasticsearchVersionException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java
deleted file mode 100644
index 19925a966..000000000
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class GetIndexDocsCountException extends RuntimeException {
-
-    public GetIndexDocsCountException(String message) {
-        super(message);
-    }
-
-    public GetIndexDocsCountException(Throwable cause) {
-        super(cause);
-    }
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java
deleted file mode 100644
index f09483417..000000000
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class ScrollRequestException extends RuntimeException {
-
-    public ScrollRequestException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ScrollRequestException(String message) {
-        super(message);
-    }
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index fd7fe4a1f..27eabbf46 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -19,8 +19,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
@@ -51,14 +53,14 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
     }
 
     @Override
-    public String serializeRow(SeaTunnelRow row){
+    public String serializeRow(SeaTunnelRow row) {
         String[] fieldNames = seaTunnelRowType.getFieldNames();
         Map<String, Object> doc = new HashMap<>(fieldNames.length);
         Object[] fields = row.getFields();
         for (int i = 0; i < fieldNames.length; i++) {
-            Object value =  fields[i];
-            if (value instanceof Temporal){
-                //jackson not support jdk8 new time api
+            Object value = fields[i];
+            if (value instanceof Temporal) {
+                // jackson not support jdk8 new time api
                 doc.put(fieldNames[i], value.toString());
             } else {
                 doc.put(fieldNames[i], value);
@@ -80,7 +82,7 @@ public class ElasticsearchRowSerializer implements 
SeaTunnelRowSerializer {
             String indexDoc = objectMapper.writeValueAsString(doc);
             sb.append(indexDoc);
         } catch (JsonProcessingException e) {
-            throw new RuntimeException("Object json deserialization 
exception.", e);
+            throw new 
ElasticsearchConnectorException(CommonErrorCode.JSON_OPERATION_FAILED, "Object 
json deserialization exception.", e);
         }
 
         return sb.toString();
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 7dbdafa5b..63346937b 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -36,7 +36,9 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -57,7 +59,7 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
 
     private final ObjectMapper mapper = new ObjectMapper();
 
-    private final Map<Integer, DateTimeFormatter> dateTimeFormatterMap = new 
HashMap<Integer, DateTimeFormatter>(){
+    private final Map<Integer, DateTimeFormatter> dateTimeFormatterMap = new 
HashMap<Integer, DateTimeFormatter>() {
         {
             put("yyyy-MM-dd HH".length(), 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH"));
             put("yyyy-MM-dd HH:mm".length(), 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"));
@@ -94,8 +96,9 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
                     seaTunnelFields[i] = convertValue(seaTunnelDataType, 
value.toString());
                 }
             }
-        } catch (Exception ex){
-            throw new RuntimeException(String.format("error 
fieldName=%s,fieldValue=%s,seaTunnelDataType=%s,rowRecord=%s", fieldName, 
value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)), ex);
+        } catch (Exception ex) {
+            throw new 
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                String.format("error 
fieldName=%s,fieldValue=%s,seaTunnelDataType=%s,rowRecord=%s", fieldName, 
value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)), ex);
         }
         return new SeaTunnelRow(seaTunnelFields);
     }
@@ -132,7 +135,7 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
             BasicType<?> elementType = arrayType.getElementType();
             List<String> stringList = JsonUtils.toList(fieldValue, 
String.class);
             Object arr = Array.newInstance(elementType.getTypeClass(), 
stringList.size());
-            for (int i = 0; i < stringList.size(); i++){
+            for (int i = 0; i < stringList.size(); i++) {
                 Object convertValue = convertValue(elementType, 
stringList.get(i));
                 Array.set(arr, 0, convertValue);
             }
@@ -142,9 +145,10 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
             SeaTunnelDataType<?> keyType = mapType.getKeyType();
 
             SeaTunnelDataType<?> valueType = mapType.getValueType();
-            Map<String, String> stringMap = mapper.readValue(fieldValue,  new 
TypeReference<HashMap<String, String>>() {});
+            Map<String, String> stringMap = mapper.readValue(fieldValue, new 
TypeReference<HashMap<String, String>>() {
+            });
             Map<Object, Object> convertMap = new HashMap<Object, Object>();
-            for (Map.Entry<String, String> entry : stringMap.entrySet()){
+            for (Map.Entry<String, String> entry : stringMap.entrySet()) {
                 Object convertKey = convertValue(keyType, entry.getKey());
                 Object convertValue = convertValue(valueType, 
entry.getValue());
                 convertMap.put(convertKey, convertValue);
@@ -155,19 +159,19 @@ public class DefaultSeaTunnelRowDeserializer implements 
SeaTunnelRowDeserializer
         } else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
             return null;
         } else {
-            throw new UnsupportedOperationException("Unexpected value: " + 
fieldType);
+            throw new 
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
"Unexpected value: " + fieldType);
         }
 
     }
 
-    private LocalDateTime parseDate(String fieldValue){
+    private LocalDateTime parseDate(String fieldValue) {
         String formatDate = fieldValue.replace("T", " ");
-        if (fieldValue.length() == "yyyyMMdd".length() || fieldValue.length() 
== "yyyy-MM-dd".length()){
+        if (fieldValue.length() == "yyyyMMdd".length() || fieldValue.length() 
== "yyyy-MM-dd".length()) {
             formatDate = fieldValue + " 00:00:00";
         }
         DateTimeFormatter dateTimeFormatter = 
dateTimeFormatterMap.get(formatDate.length());
-        if (dateTimeFormatter == null){
-            throw new UnsupportedOperationException("unsupported date format");
+        if (dateTimeFormatter == null) {
+            throw new 
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, 
"unsupported date format");
         }
         return LocalDateTime.parse(formatDate, dateTimeFormatter);
     }
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 251905c77..c2cc4dd69 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -20,14 +20,15 @@ package 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
 import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
-import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
@@ -104,14 +105,15 @@ public class ElasticsearchSinkWriter implements 
SinkWriter<SeaTunnelRow, Elastic
                     String requestBody = String.join("\n", requestEsList) + 
"\n";
                     BulkResponse bulkResponse = esRestClient.bulk(requestBody);
                     if (bulkResponse.isErrors()) {
-                        throw new BulkElasticsearchException("bulk es error: " 
+ bulkResponse.getResponse());
+                        throw new 
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
+                            "bulk es error: " + bulkResponse.getResponse());
                     }
                     return bulkResponse;
                 }
                 return null;
             }, retryMaterial);
         } catch (Exception e) {
-            throw new SeaTunnelException("ElasticSearch execute batch 
statement error", e);
+            throw new 
ElasticsearchConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, 
"ElasticSearch execute batch statement error", e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index fafd749fd..daa4f6383 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -18,10 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
 import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -91,7 +93,7 @@ public class ElasticsearchSourceSplitEnumerator implements 
SourceSplitEnumerator
         }
 
         log.debug("No more splits to assign." +
-                " Sending NoMoreSplitsEvent to reader {}.", readers);
+            " Sending NoMoreSplitsEvent to reader {}.", readers);
         readers.forEach(context::signalNoMoreSplits);
     }
 
@@ -101,7 +103,7 @@ public class ElasticsearchSourceSplitEnumerator implements 
SourceSplitEnumerator
             int ownerReader = getSplitOwner(split.splitId(), readerCount);
             log.info("Assigning {} to {} reader.", split, ownerReader);
             pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>())
-                    .add(split);
+                .add(split);
         }
     }
 
@@ -116,12 +118,12 @@ public class ElasticsearchSourceSplitEnumerator 
implements SourceSplitEnumerator
             List<ElasticsearchSourceSplit> assignmentForReader = 
pendingSplit.remove(reader);
             if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
                 log.info("Assign splits {} to reader {}",
-                        assignmentForReader, reader);
+                    assignmentForReader, reader);
                 try {
                     context.assignSplit(reader, assignmentForReader);
                 } catch (Exception e) {
                     log.error("Failed to assign splits {} to reader {}",
-                            assignmentForReader, reader, e);
+                        assignmentForReader, reader, e);
                     pendingSplit.put(reader, assignmentForReader);
                 }
             }
@@ -168,13 +170,13 @@ public class ElasticsearchSourceSplitEnumerator 
implements SourceSplitEnumerator
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-        throw new UnsupportedOperationException("Unsupported 
handleSplitRequest: " + subtaskId);
+        throw new 
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, 
"Unsupported handleSplitRequest: " + subtaskId);
     }
 
     @Override
     public void registerReader(int subtaskId) {
         log.debug("Register reader {} to IoTDBSourceSplitEnumerator.",
-                subtaskId);
+            subtaskId);
         if (!pendingSplit.isEmpty()) {
             assignSplit(Collections.singletonList(subtaskId));
         }


Reply via email to