This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b73944d1d [Improve][Connector-V2][ElasticSearch] Unified exception for
ElasticSearch source & sink connector (#3569)
b73944d1d is described below
commit b73944d1dcb836dd61d6b319f172a2f987d01198
Author: FWLamb <[email protected]>
AuthorDate: Sun Nov 27 13:42:35 2022 +0800
[Improve][Connector-V2][ElasticSearch] Unified exception for ElasticSearch
source & sink connector (#3569)
* unified exception
* resolve confilcts
* resolve confilcts
* resolve confilcts
Co-authored-by: yangbinbin <[email protected]>
Co-authored-by: Hisoka <[email protected]>
---
.../connector-v2/Error-Quick-Reference-Manual.md | 12 +++++-
.../elasticsearch/client/EsRestClient.java | 48 ++++++++++++----------
.../constant/ElasticsearchVersion.java | 6 ++-
.../exception/BulkElasticsearchException.java | 30 --------------
.../exception/ElasticsearchConnectorErrorCode.java | 46 +++++++++++++++++++++
.../exception/ElasticsearchConnectorException.java | 35 ++++++++++++++++
.../GetElasticsearchVersionException.java | 25 -----------
.../exception/GetIndexDocsCountException.java | 30 --------------
.../exception/ScrollRequestException.java | 30 --------------
.../serialize/ElasticsearchRowSerializer.java | 12 +++---
.../source/DefaultSeaTunnelRowDeserializer.java | 26 +++++++-----
.../sink/ElasticsearchSinkWriter.java | 10 +++--
.../source/ElasticsearchSourceSplitEnumerator.java | 14 ++++---
13 files changed, 159 insertions(+), 165 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index ac72f52f1..707609d7a 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -1,6 +1,7 @@
# Error Quick Reference Manual
-This document records some common error codes and corresponding solutions of
SeaTunnel, aiming to quickly solve the problems encountered by users.
+This document records some common error codes and corresponding solutions of
SeaTunnel, aiming to quickly solve the
+problems encountered by users.
## SeaTunnel API Error Codes
@@ -96,6 +97,15 @@ This document records some common error codes and
corresponding solutions of Sea
| HIVE-02 | Initialize hive metastore client failed |
When users encounter this error code, it means that connect to hive metastore
service failed, please check it whether is work |
| HIVE-03 | Get hive table information from hive metastore service failed |
When users encounter this error code, it means that hive metastore service has
some problems, please check it whether is work |
+## Elasticsearch Connector Error Codes
+
+| code | description | solution
|
+|-------------------|-----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
+| ELASTICSEARCH-01 | Bulk es response error | When the
user encounters this error code, it means that the connection was aborted,
please check it whether is work |
+| ELASTICSEARCH-02 | Get elasticsearch version failed | When the
user encounters this error code, it means that the connection was aborted,
please check it whether is work |
+| ELASTICSEARCH-03 | Fail to scroll request | When the
user encounters this error code, it means that the connection was aborted,
please check it whether is work |
+| ELASTICSEARCH-04 | Get elasticsearch document index count failed | When the
user encounters this error code, it means that the es index may not wrong or
the connection was aborted, please check |
+
## Kafka Connector Error Codes
| code | description
| solution
|
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index e8c21f8f1..53efa36f0 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -22,10 +22,8 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterC
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -96,9 +94,9 @@ public class EsRestClient {
}
RestClientBuilder builder = RestClient.builder(httpHosts)
- .setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
-
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
- .setSocketTimeout(SOCKET_TIMEOUT));
+ .setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
+ .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
+ .setSocketTimeout(SOCKET_TIMEOUT));
if (StringUtils.isNotEmpty(username)) {
CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
@@ -114,7 +112,7 @@ public class EsRestClient {
try {
Response response = restClient.performRequest(request);
if (response == null) {
- throw new BulkElasticsearchException("bulk es Response is
null");
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
"bulk es Response is null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
ObjectMapper objectMapper = new ObjectMapper();
@@ -124,10 +122,10 @@ public class EsRestClient {
boolean errors = json.get("errors").asBoolean();
return new BulkResponse(errors, took, entity);
} else {
- throw new BulkElasticsearchException(String.format("bulk es
response status code=%d,request boy=%s",
response.getStatusLine().getStatusCode(), requestBody));
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
String.format("bulk es response status code=%d,request boy=%s",
response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
- throw new BulkElasticsearchException(String.format("bulk es
error,request boy=%s", requestBody), e);
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
String.format("bulk es error,request boy=%s", requestBody), e);
}
}
@@ -144,11 +142,11 @@ public class EsRestClient {
JsonNode versionNode = jsonNode.get("version");
return versionNode.get("number").asText();
} catch (IOException e) {
- throw new GetElasticsearchVersionException("fail to get
elasticsearch version.", e);
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED,
"fail to get elasticsearch version.", e);
}
}
- public void close() {
+ public void close() {
try {
restClient.close();
} catch (IOException e) {
@@ -199,7 +197,7 @@ public class EsRestClient {
try {
Response response = restClient.performRequest(request);
if (response == null) {
- throw new ScrollRequestException("POST " + endpoint + "
response null");
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
"POST " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
@@ -213,10 +211,12 @@ public class EsRestClient {
ScrollResult scrollResult =
getDocsFromScrollResponse(responseJson);
return scrollResult;
} else {
- throw new ScrollRequestException(String.format("POST %s
response status code=%d,request boy=%s", endpoint,
response.getStatusLine().getStatusCode(), requestBody));
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
+ String.format("POST %s response status code=%d,request
boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
- throw new ScrollRequestException(String.format("POST %s
error,request boy=%s", endpoint, requestBody), e);
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.SCROLL_REQUEST_ERROR,
+ String.format("POST %s error,request boy=%s", endpoint,
requestBody), e);
}
}
@@ -240,7 +240,7 @@ public class EsRestClient {
for (Iterator<Map.Entry<String, JsonNode>> iterator =
source.fields(); iterator.hasNext(); ) {
Map.Entry<String, JsonNode> entry = iterator.next();
String fieldName = entry.getKey();
- if (entry.getValue() instanceof TextNode){
+ if (entry.getValue() instanceof TextNode) {
doc.put(fieldName, entry.getValue().textValue());
} else {
doc.put(fieldName, entry.getValue());
@@ -257,17 +257,19 @@ public class EsRestClient {
try {
Response response = restClient.performRequest(request);
if (response == null) {
- throw new GetIndexDocsCountException("GET " + endpoint + "
response null");
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+ "GET " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
List<IndexDocsCount> indexDocsCounts =
JsonUtils.toList(entity, IndexDocsCount.class);
return indexDocsCounts;
} else {
- throw new GetIndexDocsCountException(String.format("GET %s
response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+ String.format("GET %s response status code=%d", endpoint,
response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
- throw new GetIndexDocsCountException(ex);
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
ex);
}
}
@@ -284,10 +286,12 @@ public class EsRestClient {
try {
Response response = restClient.performRequest(request);
if (response == null) {
- throw new GetIndexDocsCountException("GET " + endpoint + "
response null");
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+ "GET " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw new GetIndexDocsCountException(String.format("GET %s
response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
+ String.format("GET %s response status code=%d", endpoint,
response.getStatusLine().getStatusCode()));
}
String entity = EntityUtils.toString(response.getEntity());
log.info(String.format("GET %s respnse=%s", endpoint, entity));
@@ -308,7 +312,7 @@ public class EsRestClient {
}
} catch (IOException ex) {
- throw new GetIndexDocsCountException(ex);
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED,
ex);
}
return mapping;
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
index 747ba7d2a..9cc1e0554 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/constant/ElasticsearchVersion.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
+
public enum ElasticsearchVersion {
ES2(2), ES5(5), ES6(6), ES7(7), ES8(8);
@@ -40,7 +43,8 @@ public enum ElasticsearchVersion {
return elasticsearchVersion;
}
}
- throw new IllegalArgumentException(String.format("version=%d,fail fo
find ElasticsearchVersion.", version));
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.GET_ES_VERSION_FAILED,
+ String.format("version=%d,fail fo find ElasticsearchVersion.",
version));
}
public static ElasticsearchVersion get(String clusterVersion) {
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
deleted file mode 100644
index 14dbfb7ae..000000000
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/BulkElasticsearchException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class BulkElasticsearchException extends RuntimeException {
-
- public BulkElasticsearchException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public BulkElasticsearchException(String message) {
- super(message);
- }
-
-}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
new file mode 100644
index 000000000..2fb5c11c9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode {
+
+ BULK_RESPONSE_ERROR("ELASTICSEARCH-01", "Bulk es response error"),
+ GET_ES_VERSION_FAILED("ELASTICSEARCH-02", "Get elasticsearch version
failed"),
+ SCROLL_REQUEST_ERROR("ELASTICSEARCH-03", "Fail to scroll request"),
+ GET_INDEX_DOCS_COUNT_FAILED("ELASTICSEARCH-04", "Get elasticsearch
document index count failed");
+
+ private final String code;
+ private final String description;
+
+ ElasticsearchConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorException.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorException.java
new file mode 100644
index 000000000..ad41811f7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class ElasticsearchConnectorException extends SeaTunnelRuntimeException
{
+ public ElasticsearchConnectorException(SeaTunnelErrorCode
seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public ElasticsearchConnectorException(SeaTunnelErrorCode
seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public ElasticsearchConnectorException(SeaTunnelErrorCode
seaTunnelErrorCode, Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
deleted file mode 100644
index c146de07a..000000000
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetElasticsearchVersionException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class GetElasticsearchVersionException extends RuntimeException {
-
- public GetElasticsearchVersionException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java
deleted file mode 100644
index 19925a966..000000000
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/GetIndexDocsCountException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class GetIndexDocsCountException extends RuntimeException {
-
- public GetIndexDocsCountException(String message) {
- super(message);
- }
-
- public GetIndexDocsCountException(Throwable cause) {
- super(cause);
- }
-
-}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java
deleted file mode 100644
index f09483417..000000000
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ScrollRequestException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception;
-
-public class ScrollRequestException extends RuntimeException {
-
- public ScrollRequestException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ScrollRequestException(String message) {
- super(message);
- }
-
-}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
index fd7fe4a1f..27eabbf46 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java
@@ -19,8 +19,10 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.index.IndexSerializerFactory;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.type.IndexTypeSerializer;
@@ -51,14 +53,14 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
}
@Override
- public String serializeRow(SeaTunnelRow row){
+ public String serializeRow(SeaTunnelRow row) {
String[] fieldNames = seaTunnelRowType.getFieldNames();
Map<String, Object> doc = new HashMap<>(fieldNames.length);
Object[] fields = row.getFields();
for (int i = 0; i < fieldNames.length; i++) {
- Object value = fields[i];
- if (value instanceof Temporal){
- //jackson not support jdk8 new time api
+ Object value = fields[i];
+ if (value instanceof Temporal) {
+ // jackson not support jdk8 new time api
doc.put(fieldNames[i], value.toString());
} else {
doc.put(fieldNames[i], value);
@@ -80,7 +82,7 @@ public class ElasticsearchRowSerializer implements
SeaTunnelRowSerializer {
String indexDoc = objectMapper.writeValueAsString(doc);
sb.append(indexDoc);
} catch (JsonProcessingException e) {
- throw new RuntimeException("Object json deserialization
exception.", e);
+ throw new
ElasticsearchConnectorException(CommonErrorCode.JSON_OPERATION_FAILED, "Object
json deserialization exception.", e);
}
return sb.toString();
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 7dbdafa5b..63346937b 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -36,7 +36,9 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -57,7 +59,7 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
private final ObjectMapper mapper = new ObjectMapper();
- private final Map<Integer, DateTimeFormatter> dateTimeFormatterMap = new
HashMap<Integer, DateTimeFormatter>(){
+ private final Map<Integer, DateTimeFormatter> dateTimeFormatterMap = new
HashMap<Integer, DateTimeFormatter>() {
{
put("yyyy-MM-dd HH".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH"));
put("yyyy-MM-dd HH:mm".length(),
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"));
@@ -94,8 +96,9 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
seaTunnelFields[i] = convertValue(seaTunnelDataType,
value.toString());
}
}
- } catch (Exception ex){
- throw new RuntimeException(String.format("error
fieldName=%s,fieldValue=%s,seaTunnelDataType=%s,rowRecord=%s", fieldName,
value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)), ex);
+ } catch (Exception ex) {
+ throw new
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format("error
fieldName=%s,fieldValue=%s,seaTunnelDataType=%s,rowRecord=%s", fieldName,
value, seaTunnelDataType, JsonUtils.toJsonString(rowRecord)), ex);
}
return new SeaTunnelRow(seaTunnelFields);
}
@@ -132,7 +135,7 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
BasicType<?> elementType = arrayType.getElementType();
List<String> stringList = JsonUtils.toList(fieldValue,
String.class);
Object arr = Array.newInstance(elementType.getTypeClass(),
stringList.size());
- for (int i = 0; i < stringList.size(); i++){
+ for (int i = 0; i < stringList.size(); i++) {
Object convertValue = convertValue(elementType,
stringList.get(i));
Array.set(arr, 0, convertValue);
}
@@ -142,9 +145,10 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
- Map<String, String> stringMap = mapper.readValue(fieldValue, new
TypeReference<HashMap<String, String>>() {});
+ Map<String, String> stringMap = mapper.readValue(fieldValue, new
TypeReference<HashMap<String, String>>() {
+ });
Map<Object, Object> convertMap = new HashMap<Object, Object>();
- for (Map.Entry<String, String> entry : stringMap.entrySet()){
+ for (Map.Entry<String, String> entry : stringMap.entrySet()) {
Object convertKey = convertValue(keyType, entry.getKey());
Object convertValue = convertValue(valueType,
entry.getValue());
convertMap.put(convertKey, convertValue);
@@ -155,19 +159,19 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
} else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
return null;
} else {
- throw new UnsupportedOperationException("Unexpected value: " +
fieldType);
+ throw new
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + fieldType);
}
}
- private LocalDateTime parseDate(String fieldValue){
+ private LocalDateTime parseDate(String fieldValue) {
String formatDate = fieldValue.replace("T", " ");
- if (fieldValue.length() == "yyyyMMdd".length() || fieldValue.length()
== "yyyy-MM-dd".length()){
+ if (fieldValue.length() == "yyyyMMdd".length() || fieldValue.length()
== "yyyy-MM-dd".length()) {
formatDate = fieldValue + " 00:00:00";
}
DateTimeFormatter dateTimeFormatter =
dateTimeFormatterMap.get(formatDate.length());
- if (dateTimeFormatter == null){
- throw new UnsupportedOperationException("unsupported date format");
+ if (dateTimeFormatter == null) {
+ throw new
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
"unsupported date format");
}
return LocalDateTime.parse(formatDate, dateTimeFormatter);
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 251905c77..c2cc4dd69 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -20,14 +20,15 @@ package
org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.RetryUtils.RetryMaterial;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.constant.ElasticsearchVersion;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.IndexInfo;
-import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.ElasticsearchRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.serialize.SeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
@@ -104,14 +105,15 @@ public class ElasticsearchSinkWriter implements
SinkWriter<SeaTunnelRow, Elastic
String requestBody = String.join("\n", requestEsList) +
"\n";
BulkResponse bulkResponse = esRestClient.bulk(requestBody);
if (bulkResponse.isErrors()) {
- throw new BulkElasticsearchException("bulk es error: "
+ bulkResponse.getResponse());
+ throw new
ElasticsearchConnectorException(ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
+ "bulk es error: " + bulkResponse.getResponse());
}
return bulkResponse;
}
return null;
}, retryMaterial);
} catch (Exception e) {
- throw new SeaTunnelException("ElasticSearch execute batch
statement error", e);
+ throw new
ElasticsearchConnectorException(CommonErrorCode.SQL_OPERATION_FAILED,
"ElasticSearch execute batch statement error", e);
}
}
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
index fafd749fd..daa4f6383 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java
@@ -18,10 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.SourceIndexInfo;
+import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -91,7 +93,7 @@ public class ElasticsearchSourceSplitEnumerator implements
SourceSplitEnumerator
}
log.debug("No more splits to assign." +
- " Sending NoMoreSplitsEvent to reader {}.", readers);
+ " Sending NoMoreSplitsEvent to reader {}.", readers);
readers.forEach(context::signalNoMoreSplits);
}
@@ -101,7 +103,7 @@ public class ElasticsearchSourceSplitEnumerator implements
SourceSplitEnumerator
int ownerReader = getSplitOwner(split.splitId(), readerCount);
log.info("Assigning {} to {} reader.", split, ownerReader);
pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>())
- .add(split);
+ .add(split);
}
}
@@ -116,12 +118,12 @@ public class ElasticsearchSourceSplitEnumerator
implements SourceSplitEnumerator
List<ElasticsearchSourceSplit> assignmentForReader =
pendingSplit.remove(reader);
if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
log.info("Assign splits {} to reader {}",
- assignmentForReader, reader);
+ assignmentForReader, reader);
try {
context.assignSplit(reader, assignmentForReader);
} catch (Exception e) {
log.error("Failed to assign splits {} to reader {}",
- assignmentForReader, reader, e);
+ assignmentForReader, reader, e);
pendingSplit.put(reader, assignmentForReader);
}
}
@@ -168,13 +170,13 @@ public class ElasticsearchSourceSplitEnumerator
implements SourceSplitEnumerator
@Override
public void handleSplitRequest(int subtaskId) {
- throw new UnsupportedOperationException("Unsupported
handleSplitRequest: " + subtaskId);
+ throw new
ElasticsearchConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
"Unsupported handleSplitRequest: " + subtaskId);
}
@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to IoTDBSourceSplitEnumerator.",
- subtaskId);
+ subtaskId);
if (!pendingSplit.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}