This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4686f35d6 [Improve][Connector-V2][Influxdb] Unified exception for 
influxdb source & sink connector (#3558)
4686f35d6 is described below

commit 4686f35d68458c2f86471ed8678a0e43152bb28a
Author: ChunFu Wu <[email protected]>
AuthorDate: Fri Nov 25 21:50:05 2022 +0800

    [Improve][Connector-V2][Influxdb] Unified exception for influxdb source & 
sink connector (#3558)
    
    * [Improve][Connector-V2][Influxdb] Unified exception for influxdb source & 
sink connector
    
    * [Improve][Connector-V2][Influxdb] Unified exception for influxdb source & 
sink connector
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  8 ++-
 .../seatunnel/influxdb/client/InfluxDBClient.java  | 78 +++++++++++-----------
 .../influxdb/converter/InfluxDBRowConverter.java   |  8 ++-
 .../exception/InfluxdbConnectorErrorCode.java      | 49 ++++++++++++++
 .../exception/InfluxdbConnectorException.java      | 36 ++++++++++
 .../influxdb/serialize/DefaultSerializer.java      | 36 +++++-----
 .../seatunnel/influxdb/sink/InfluxDBSink.java      | 13 +++-
 .../influxdb/sink/InfluxDBSinkWriter.java          | 52 ++++++++-------
 .../seatunnel/influxdb/source/InfluxDBSource.java  | 33 ++++++---
 .../influxdb/source/InfluxDBSourceSplit.java       |  4 +-
 .../source/InfluxDBSourceSplitEnumerator.java      |  8 ++-
 .../influxdb/source/InfluxdbSourceReader.java      | 41 ++++++------
 12 files changed, 247 insertions(+), 119 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index f5aac0371..4fc996f3c 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -76,7 +76,6 @@ This document records some common error codes and 
corresponding solutions of Sea
 | SOCKET-02 | Failed to send message to socket server                  | When 
the user encounters this error code, it means that there is a problem sending 
data and retry is not enabled, please check |
 | SOCKET-03 | Unable to write; interrupted while doing another attempt | When 
the user encounters this error code, it means that the data writing is 
interrupted abnormally, please check               |
 
-
 ## Hive Connector Error Codes
 
 | code    | description                                                   | 
solution                                                                        
                                              |
@@ -84,3 +83,10 @@ This document records some common error codes and 
corresponding solutions of Sea
 | HIVE-01 | Get name node host from table location failed                 | 
When users encounter this error code, it means that the metastore inforamtion 
has some problems, please check it              |
 | HIVE-02 | Initialize hive metastore client failed                       | 
When users encounter this error code, it means that connect to hive metastore 
service failed, please check it whether is work |
 | HIVE-03 | Get hive table information from hive metastore service failed | 
When users encounter this error code, it means that hive metastore service has 
some problems, please check it whether is work |
+
+## InfluxDB Connector Error Codes
+
+| code        | description                                                    
  | solution                                                                    
                                |
+|-------------|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| INFLUXDB-01 | Connect influxdb failed, due to influxdb version info is 
unknown | When the user encounters this error code, it indicates that the 
connection to influxdb failed. Please check |
+| INFLUXDB-02 | Get column index of query result exception                     
  | When the user encounters this error code, it indicates that obtaining the 
column index failed. Please check |
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
index 3ad3a99d5..d287fdc52 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
@@ -19,6 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.client;
 
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.HttpUrl;
@@ -38,56 +40,56 @@ import java.util.concurrent.TimeUnit;
 public class InfluxDBClient {
     public static InfluxDB getInfluxDB(InfluxDBConfig config) throws 
ConnectException {
         OkHttpClient.Builder clientBuilder =
-                new OkHttpClient.Builder()
-                        .connectTimeout(config.getConnectTimeOut(), 
TimeUnit.MILLISECONDS)
-                        .readTimeout(config.getQueryTimeOut(), 
TimeUnit.SECONDS);
+            new OkHttpClient.Builder()
+                .connectTimeout(config.getConnectTimeOut(), 
TimeUnit.MILLISECONDS)
+                .readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);
         InfluxDB.ResponseFormat format = 
InfluxDB.ResponseFormat.valueOf(config.getFormat());
         clientBuilder.addInterceptor(
-                new Interceptor() {
-                    @Override
-                    public Response intercept(Chain chain) throws IOException {
-                        Request request = chain.request();
-                        HttpUrl httpUrl =
-                                request.url()
-                                        .newBuilder()
-                                        //set epoch
-                                        .addQueryParameter("epoch", 
config.getEpoch())
-                                        .build();
-                        Request build = 
request.newBuilder().url(httpUrl).build();
-                        Response response = chain.proceed(build);
-                        return response;
-                    }
-                });
-        InfluxDB influxDB =
-                new InfluxDBImpl(
-                        config.getUrl(),
-                        StringUtils.isEmpty(config.getUsername()) ? 
StringUtils.EMPTY : config.getUsername(),
-                        StringUtils.isEmpty(config.getPassword()) ? 
StringUtils.EMPTY : config.getPassword(),
-                        clientBuilder,
-                        format);
-        String version = influxDB.version();
-        if (!influxDB.ping().isGood()) {
-            String errorMessage =
-                    String.format(
-                            "connect influxdb failed, the url is: {%s}",
-                            config.getUrl());
-            throw new ConnectException(errorMessage);
+            new Interceptor() {
+                @Override
+                public Response intercept(Chain chain) throws IOException {
+                    Request request = chain.request();
+                    HttpUrl httpUrl =
+                        request.url()
+                            .newBuilder()
+                            //set epoch
+                            .addQueryParameter("epoch", config.getEpoch())
+                            .build();
+                    Request build = request.newBuilder().url(httpUrl).build();
+                    return chain.proceed(build);
+                }
+            });
+        InfluxDB influxdb =
+            new InfluxDBImpl(
+                config.getUrl(),
+                StringUtils.isEmpty(config.getUsername()) ? StringUtils.EMPTY 
: config.getUsername(),
+                StringUtils.isEmpty(config.getPassword()) ? StringUtils.EMPTY 
: config.getPassword(),
+                clientBuilder,
+                format);
+        String version = influxdb.version();
+        if (!influxdb.ping().isGood()) {
+            throw new 
InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
+                String.format(
+                    "Connect influxdb failed, the url is: {%s}",
+                    config.getUrl()
+                )
+            );
         }
         log.info("connect influxdb successful. sever version :{}.", version);
-        return influxDB;
+        return influxdb;
     }
 
-    public static void setWriteProperty(InfluxDB influxDB, SinkConfig 
sinkConfig) {
+    public static void setWriteProperty(InfluxDB influxdb, SinkConfig 
sinkConfig) {
         String rp = sinkConfig.getRp();
         if (!StringUtils.isEmpty(rp)) {
-            influxDB.setRetentionPolicy(rp);
+            influxdb.setRetentionPolicy(rp);
         }
     }
 
     public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws 
ConnectException {
-        InfluxDB influxDB = getInfluxDB(sinkConfig);
-        influxDB.setDatabase(sinkConfig.getDatabase());
+        InfluxDB influxdb = getInfluxDB(sinkConfig);
+        influxdb.setDatabase(sinkConfig.getDatabase());
         setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
-        return  influxDB;
+        return influxdb;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
index 405ab2d43..a16121181 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -39,8 +41,7 @@ public class InfluxDBRowConverter {
             SqlType fieldSqlType = seaTunnelDataType.getSqlType();
             if (null == values.get(columnIndex)) {
                 seaTunnelField = null;
-            }
-            else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
+            } else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
                 seaTunnelField = 
Boolean.parseBoolean(values.get(columnIndex).toString());
             } else if (SqlType.SMALLINT.equals(fieldSqlType)) {
                 seaTunnelField = 
Short.valueOf(values.get(columnIndex).toString());
@@ -55,7 +56,8 @@ public class InfluxDBRowConverter {
             } else if (SqlType.STRING.equals(fieldSqlType)) {
                 seaTunnelField = values.get(columnIndex);
             } else {
-                throw new IllegalStateException("Unexpected value: " + 
seaTunnelDataType);
+                throw new 
InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    "Unsupported data type: " + seaTunnelDataType);
             }
 
             fields.add(seaTunnelField);
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorErrorCode.java
new file mode 100644
index 000000000..3eb64be79
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorErrorCode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum InfluxdbConnectorErrorCode implements SeaTunnelErrorCode {
+
+    CONNECT_FAILED("INFLUXDB-01", "Connect influxdb failed, due to influxdb 
version info is unknown"),
+    GET_COLUMN_INDEX_FAILED("INFLUXDB-02", "Get column index of query result 
exception");
+
+    private final String code;
+    private final String description;
+
+    InfluxdbConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+
+    @Override
+    public String getErrorMessage() {
+        return SeaTunnelErrorCode.super.getErrorMessage();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorException.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorException.java
new file mode 100644
index 000000000..178f44b17
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class InfluxdbConnectorException extends SeaTunnelRuntimeException {
+
+    public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
index 8cc458939..8b803c746 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.serialize;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 
 import com.google.common.base.Strings;
 import org.apache.commons.collections4.CollectionUtils;
@@ -35,14 +37,14 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class DefaultSerializer implements Serializer {
-    private SeaTunnelRowType seaTunnelRowType;
+    private final SeaTunnelRowType seaTunnelRowType;
 
     private final BiConsumer<SeaTunnelRow, Point.Builder> timestampExtractor;
     private final BiConsumer<SeaTunnelRow, Point.Builder> fieldExtractor;
     private final BiConsumer<SeaTunnelRow, Point.Builder> tagExtractor;
-    private String measurement;
+    private final String measurement;
 
-    private TimeUnit precision;
+    private final TimeUnit precision;
 
     public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit 
precision, List<String> tagKeys,
                              String timestampKey,
@@ -67,8 +69,7 @@ public class DefaultSerializer implements Serializer {
 
     private BiConsumer<SeaTunnelRow, Point.Builder> 
createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List<String> fieldKeys) 
{
         return (row, builder) -> {
-            for (int i = 0; i < fieldKeys.size(); i++) {
-                String field = fieldKeys.get(i);
+            for (String field : fieldKeys) {
                 int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field);
                 SeaTunnelDataType dataType = 
seaTunnelRowType.getFieldType(indexOfSeaTunnelRow);
                 Object val = row.getField(indexOfSeaTunnelRow);
@@ -96,14 +97,15 @@ public class DefaultSerializer implements Serializer {
                         builder.addField(field, val.toString());
                         break;
                     default:
-                        throw new UnsupportedOperationException("Unsupported 
dataType: " + dataType);
+                        throw new 
InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Unsupported data type: " + dataType);
                 }
             }
         };
     }
 
     private BiConsumer<SeaTunnelRow, Point.Builder> 
createTimestampExtractor(SeaTunnelRowType seaTunnelRowType,
-                                                                  String 
timeKey) {
+                                                                             
String timeKey) {
         //not config timeKey, use processing time
         if (Strings.isNullOrEmpty(timeKey)) {
             return (row, builder) -> builder.time(System.currentTimeMillis(), 
precision);
@@ -121,7 +123,7 @@ public class DefaultSerializer implements Serializer {
                     builder.time(Long.parseLong((String) time), precision);
                     break;
                 case TIMESTAMP:
-                    builder.time(LocalDateTime.class.cast(time)
+                    builder.time(((LocalDateTime) time)
                         .atZone(ZoneOffset.UTC)
                         .toInstant()
                         .toEpochMilli(), precision);
@@ -136,15 +138,15 @@ public class DefaultSerializer implements Serializer {
     }
 
     private BiConsumer<SeaTunnelRow, Point.Builder> 
createTagExtractor(SeaTunnelRowType seaTunnelRowType,
-                                                            List<String> 
tagKeys) {
+                                                                       
List<String> tagKeys) {
         //not config tagKeys
         if (CollectionUtils.isEmpty(tagKeys)) {
-            return (row, builder) -> {};
+            return (row, builder) -> {
+            };
         }
 
         return (row, builder) -> {
-            for (int i = 0; i < tagKeys.size(); i++) {
-                String tagKey = tagKeys.get(i);
+            for (String tagKey : tagKeys) {
                 int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey);
                 builder.tag(tagKey, 
row.getField(indexOfSeaTunnelRow).toString());
             }
@@ -152,11 +154,11 @@ public class DefaultSerializer implements Serializer {
     }
 
     private List<String> getFieldKeys(SeaTunnelRowType seaTunnelRowType,
-                                            String timestampKey,
-                                            List<String> tagKeys) {
+                                      String timestampKey,
+                                      List<String> tagKeys) {
         return Stream.of(seaTunnelRowType.getFieldNames())
-                    .filter(name -> CollectionUtils.isEmpty(tagKeys) || 
!tagKeys.contains(name))
-                    .filter(name -> StringUtils.isEmpty(timestampKey) || 
!name.equals(timestampKey))
-                    .collect(Collectors.toList());
+            .filter(name -> CollectionUtils.isEmpty(tagKeys) || 
!tagKeys.contains(name))
+            .filter(name -> StringUtils.isEmpty(timestampKey) || 
!name.equals(timestampKey))
+            .collect(Collectors.toList());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 074e5a518..86180e552 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -21,6 +21,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDB
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -31,6 +32,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -53,7 +55,12 @@ public class InfluxDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     public void prepare(Config config) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(config, URL.key(), 
KEY_MEASUREMENT.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SINK,
+                    result.getMsg()
+                )
+            );
         }
         this.pluginConfig = config;
     }
@@ -64,12 +71,12 @@ public class InfluxDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public SeaTunnelDataType getConsumedType() {
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
         return seaTunnelRowType;
     }
 
     @Override
-    public AbstractSinkWriter createWriter(SinkWriter.Context context) throws 
IOException {
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
         return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
index 809a3eaaa..0b9c3beaa 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -19,9 +19,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer;
 
@@ -48,8 +51,8 @@ import java.util.concurrent.TimeUnit;
 public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> 
{
 
     private final Serializer serializer;
-    private InfluxDB influxDB;
-    private SinkConfig sinkConfig;
+    private InfluxDB influxdb;
+    private final SinkConfig sinkConfig;
     private final List<Point> batchList;
     private ScheduledExecutorService scheduler;
     private ScheduledFuture<?> scheduledFuture;
@@ -61,12 +64,12 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
         this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
         this.serializer = new DefaultSerializer(
-                seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), 
sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
+            seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), 
sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
         this.batchList = new ArrayList<>();
 
         if (batchIntervalMs != null) {
             scheduler = Executors.newSingleThreadScheduledExecutor(
-                    new 
ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
+                new 
ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
             scheduledFuture = scheduler.scheduleAtFixedRate(
                 () -> {
                     try {
@@ -106,9 +109,9 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
         flush();
 
-        if (influxDB != null) {
-            influxDB.close();
-            influxDB = null;
+        if (influxdb != null) {
+            influxdb.close();
+            influxdb = null;
         }
     }
 
@@ -117,7 +120,7 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
         batchList.add(record);
         if (sinkConfig.getBatchSize() > 0
-                && batchList.size() >= sinkConfig.getBatchSize()) {
+            && batchList.size() >= sinkConfig.getBatchSize()) {
             flush();
         }
     }
@@ -131,21 +134,22 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
             try {
                 batchPoints.points(batchList);
-                influxDB.write(batchPoints.build());
+                influxdb.write(batchPoints.build());
             } catch (Exception e) {
                 log.error("Writing records to influxdb failed, retry times = 
{}", i, e);
                 if (i >= sinkConfig.getMaxRetries()) {
-                    throw new IOException("Writing records to InfluxDB 
failed.", e);
+                    throw new 
InfluxdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+                        "Writing records to InfluxDB failed.", e);
                 }
 
                 try {
                     long backoff = 
Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
-                            sinkConfig.getMaxRetryBackoffMs());
+                        sinkConfig.getMaxRetryBackoffMs());
                     Thread.sleep(backoff);
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
-                    throw new IOException(
-                            "Unable to flush; interrupted while doing another 
attempt.", e);
+                    throw new 
InfluxdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+                        "Unable to flush; interrupted while doing another 
attempt.", e);
                 }
             }
         }
@@ -155,20 +159,22 @@ public class InfluxDBSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private void checkFlushException() {
         if (flushException != null) {
-            throw new RuntimeException("Writing records to InfluxDB failed.", 
flushException);
+            throw new 
InfluxdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+                "Writing records to InfluxDB failed.", flushException);
         }
     }
 
     public void connect() throws ConnectException {
-        if (influxDB == null) {
-            influxDB = InfluxDBClient.getWriteClient(sinkConfig);
-            String version = influxDB.version();
-            if (!influxDB.ping().isGood()) {
-                String errorMessage =
-                        String.format(
-                                "connect influxdb failed, due to influxdb 
version info is unknown, the url is: {%s}",
-                                sinkConfig.getUrl());
-                throw new ConnectException(errorMessage);
+        if (influxdb == null) {
+            influxdb = InfluxDBClient.getWriteClient(sinkConfig);
+            String version = influxdb.version();
+            if (!influxdb.ping().isGood()) {
+                throw new 
InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
+                    String.format(
+                        "connect influxdb failed, due to influxdb version info 
is unknown, the url is: {%s}",
+                        sinkConfig.getUrl()
+                    )
+                );
             }
             log.info("connect influxdb successful. sever version :{}.", 
version);
         }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 26230f0ae..2c961517f 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -33,6 +34,8 @@ import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -50,7 +53,7 @@ import java.util.stream.Collectors;
 
 @Slf4j
 @AutoService(SeaTunnelSource.class)
-public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, 
InfluxDBSourceSplit, InfluxDBSourceState>  {
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow, 
InfluxDBSourceSplit, InfluxDBSourceState> {
     private SeaTunnelRowType typeInfo;
     private SourceConfig sourceConfig;
 
@@ -67,7 +70,12 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
     public void prepare(Config config) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SOURCE,
+                    result.getMsg()
+                )
+            );
         }
         try {
             this.sourceConfig = SourceConfig.loadConfig(config);
@@ -75,7 +83,10 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
             this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
             this.columnsIndexList = 
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
         } catch (Exception e) {
-            throw new PrepareFailException("InfluxDB", PluginType.SOURCE, 
e.toString());
+            throw new 
InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SOURCE, e)
+            );
         }
     }
 
@@ -85,7 +96,7 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
     }
 
     @Override
-    public SeaTunnelDataType getProducedType() {
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
         return typeInfo;
     }
 
@@ -104,20 +115,20 @@ public class InfluxDBSource implements 
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
         return new InfluxDBSourceSplitEnumerator(enumeratorContext, 
checkpointState, sourceConfig);
     }
 
-    private List<Integer> initColumnsIndex(InfluxDB influxDB)  {
+    private List<Integer> initColumnsIndex(InfluxDB influxdb) {
         //query one row to get column info
         String query = sourceConfig.getSql() + QUERY_LIMIT;
-        List<String> fieldNames = new ArrayList<>();
         try {
-            QueryResult queryResult = influxDB.query(
-                    new Query(query, sourceConfig.getDatabase()));
+            QueryResult queryResult = influxdb.query(
+                new Query(query, sourceConfig.getDatabase()));
 
             List<QueryResult.Series> serieList = 
queryResult.getResults().get(0).getSeries();
-            fieldNames.addAll(serieList.get(0).getColumns());
+            List<String> fieldNames = new 
ArrayList<>(serieList.get(0).getColumns());
 
-            return Arrays.stream(typeInfo.getFieldNames()).map(x -> 
fieldNames.indexOf(x)).collect(Collectors.toList());
+            return 
Arrays.stream(typeInfo.getFieldNames()).map(fieldNames::indexOf).collect(Collectors.toList());
         } catch (Exception e) {
-            throw new RuntimeException("get column index of query result 
exception", e);
+            throw new 
InfluxdbConnectorException(InfluxdbConnectorErrorCode.GET_COLUMN_INDEX_FAILED,
+                "Get column index of query result exception", e);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
index 37dd688f8..50d9a51e2 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
@@ -20,9 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 import org.apache.seatunnel.api.source.SourceSplit;
 
 public class InfluxDBSourceSplit implements SourceSplit {
-    private String splitId;
+    private final String splitId;
 
-    private String query;
+    private final String query;
 
     public InfluxDBSourceSplit(String splitId, String query) {
         this.query = query;
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index aad78e6fc..fdcabac1f 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -20,7 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
 import static 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
 
 import lombok.extern.slf4j.Slf4j;
@@ -121,7 +123,8 @@ public class InfluxDBSourceSplitEnumerator implements 
SourceSplitEnumerator<Infl
 
         String[] sqls = sql.split(SQL_WHERE.key());
         if (sqls.length > 2) {
-            throw new IllegalArgumentException("sql should not contain more 
than one where");
+            throw new 
InfluxdbConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                "sql should not contain more than one where");
         }
 
         int i = 0;
@@ -209,7 +212,8 @@ public class InfluxDBSourceSplitEnumerator implements 
SourceSplitEnumerator<Infl
 
     @Override
     public void handleSplitRequest(int subtaskId) {
-        throw new UnsupportedOperationException("Unsupported 
handleSplitRequest: " + subtaskId);
+        throw new 
InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+            String.format("Unsupported handleSplitRequest: %d", subtaskId));
     }
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
index 090b8bf97..915eaa734 100644
--- 
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
@@ -40,12 +42,12 @@ import java.util.Queue;
 
 @Slf4j
 public class InfluxdbSourceReader implements SourceReader<SeaTunnelRow, 
InfluxDBSourceSplit> {
-    private InfluxDB influxDB;
+    private InfluxDB influxdb;
     InfluxDBConfig config;
 
     private final SourceReader.Context context;
 
-    private SeaTunnelRowType seaTunnelRowType;
+    private final SeaTunnelRowType seaTunnelRowType;
 
     List<Integer> columnsIndexList;
     private final Queue<InfluxDBSourceSplit> pendingSplits;
@@ -61,15 +63,16 @@ public class InfluxdbSourceReader implements 
SourceReader<SeaTunnelRow, InfluxDB
     }
 
     public void connect() throws ConnectException {
-        if (influxDB == null) {
-            influxDB = InfluxDBClient.getInfluxDB(config);
-            String version = influxDB.version();
-            if (!influxDB.ping().isGood()) {
-                String errorMessage =
-                        String.format(
-                                "connect influxdb failed, due to influxdb 
version info is unknown, the url is: {%s}",
-                                config.getUrl());
-                throw new ConnectException(errorMessage);
+        if (influxdb == null) {
+            influxdb = InfluxDBClient.getInfluxDB(config);
+            String version = influxdb.version();
+            if (!influxdb.ping().isGood()) {
+                throw new 
InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
+                    String.format(
+                        "connect influxdb failed, due to influxdb version info 
is unknown, the url is: {%s}",
+                        config.getUrl()
+                    )
+                );
             }
             log.info("connect influxdb successful. sever version :{}.", 
version);
         }
@@ -82,9 +85,9 @@ public class InfluxdbSourceReader implements 
SourceReader<SeaTunnelRow, InfluxDB
 
     @Override
     public void close() {
-        if (influxDB != null) {
-            influxDB.close();
-            influxDB = null;
+        if (influxdb != null) {
+            influxdb.close();
+            influxdb = null;
         }
     }
 
@@ -98,8 +101,8 @@ public class InfluxdbSourceReader implements 
SourceReader<SeaTunnelRow, InfluxDB
         }
 
         if (Boundedness.BOUNDED.equals(context.getBoundedness())
-                && noMoreSplitsAssignment
-                && pendingSplits.isEmpty()) {
+            && noMoreSplitsAssignment
+            && pendingSplits.isEmpty()) {
             // signal to the source that we have reached the end of the data.
             log.info("Closed the bounded influxDB source");
             context.signalNoMoreElement();
@@ -123,12 +126,12 @@ public class InfluxdbSourceReader implements 
SourceReader<SeaTunnelRow, InfluxDB
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId)  {
+    public void notifyCheckpointComplete(long checkpointId) {
 
     }
 
     private void read(InfluxDBSourceSplit split, Collector<SeaTunnelRow> 
output) {
-        QueryResult queryResult = influxDB.query(new Query(split.getQuery(), 
config.getDatabase()));
+        QueryResult queryResult = influxdb.query(new Query(split.getQuery(), 
config.getDatabase()));
         for (QueryResult.Result result : queryResult.getResults()) {
             List<QueryResult.Series> serieList = result.getSeries();
             if (CollectionUtils.isNotEmpty(serieList)) {
@@ -140,7 +143,7 @@ public class InfluxdbSourceReader implements 
SourceReader<SeaTunnelRow, InfluxDB
                 }
             } else {
                 log.debug(
-                        "split[{}] reader influxDB series is empty.", 
split.splitId());
+                    "split[{}] reader influxDB series is empty.", 
split.splitId());
             }
         }
     }


Reply via email to