This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4686f35d6 [Improve][Connector-V2][Influxdb] Unified exception for
influxdb source & sink connector (#3558)
4686f35d6 is described below
commit 4686f35d68458c2f86471ed8678a0e43152bb28a
Author: ChunFu Wu <[email protected]>
AuthorDate: Fri Nov 25 21:50:05 2022 +0800
[Improve][Connector-V2][Influxdb] Unified exception for influxdb source &
sink connector (#3558)
* [Improve][Connector-V2][Influxdb] Unified exception for influxdb source &
sink connector
* [Improve][Connector-V2][Influxdb] Unified exception for influxdb source &
sink connector
---
.../connector-v2/Error-Quick-Reference-Manual.md | 8 ++-
.../seatunnel/influxdb/client/InfluxDBClient.java | 78 +++++++++++-----------
.../influxdb/converter/InfluxDBRowConverter.java | 8 ++-
.../exception/InfluxdbConnectorErrorCode.java | 49 ++++++++++++++
.../exception/InfluxdbConnectorException.java | 36 ++++++++++
.../influxdb/serialize/DefaultSerializer.java | 36 +++++-----
.../seatunnel/influxdb/sink/InfluxDBSink.java | 13 +++-
.../influxdb/sink/InfluxDBSinkWriter.java | 52 ++++++++-------
.../seatunnel/influxdb/source/InfluxDBSource.java | 33 ++++++---
.../influxdb/source/InfluxDBSourceSplit.java | 4 +-
.../source/InfluxDBSourceSplitEnumerator.java | 8 ++-
.../influxdb/source/InfluxdbSourceReader.java | 41 ++++++------
12 files changed, 247 insertions(+), 119 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index f5aac0371..4fc996f3c 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -76,7 +76,6 @@ This document records some common error codes and
corresponding solutions of Sea
| SOCKET-02 | Failed to send message to socket server | When
the user encounters this error code, it means that there is a problem sending
data and retry is not enabled, please check |
| SOCKET-03 | Unable to write; interrupted while doing another attempt | When
the user encounters this error code, it means that the data writing is
interrupted abnormally, please check |
-
## Hive Connector Error Codes
| code | description |
solution
|
@@ -84,3 +83,10 @@ This document records some common error codes and
corresponding solutions of Sea
| HIVE-01 | Get name node host from table location failed |
When users encounter this error code, it means that the metastore inforamtion
has some problems, please check it |
| HIVE-02 | Initialize hive metastore client failed |
When users encounter this error code, it means that connect to hive metastore
service failed, please check it whether is work |
| HIVE-03 | Get hive table information from hive metastore service failed |
When users encounter this error code, it means that hive metastore service has
some problems, please check it whether is work |
+
+## InfluxDB Connector Error Codes
+
+| code | description
| solution
|
+|-------------|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| INFLUXDB-01 | Connect influxdb failed, due to influxdb version info is
unknown | When the user encounters this error code, it indicates that the
connection to influxdb failed. Please check |
+| INFLUXDB-02 | Get column index of query result exception
| When the user encounters this error code, it indicates that obtaining the
column index failed. Please check |
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
index 3ad3a99d5..d287fdc52 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java
@@ -19,6 +19,8 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.client;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
@@ -38,56 +40,56 @@ import java.util.concurrent.TimeUnit;
public class InfluxDBClient {
public static InfluxDB getInfluxDB(InfluxDBConfig config) throws
ConnectException {
OkHttpClient.Builder clientBuilder =
- new OkHttpClient.Builder()
- .connectTimeout(config.getConnectTimeOut(),
TimeUnit.MILLISECONDS)
- .readTimeout(config.getQueryTimeOut(),
TimeUnit.SECONDS);
+ new OkHttpClient.Builder()
+ .connectTimeout(config.getConnectTimeOut(),
TimeUnit.MILLISECONDS)
+ .readTimeout(config.getQueryTimeOut(), TimeUnit.SECONDS);
InfluxDB.ResponseFormat format =
InfluxDB.ResponseFormat.valueOf(config.getFormat());
clientBuilder.addInterceptor(
- new Interceptor() {
- @Override
- public Response intercept(Chain chain) throws IOException {
- Request request = chain.request();
- HttpUrl httpUrl =
- request.url()
- .newBuilder()
- //set epoch
- .addQueryParameter("epoch",
config.getEpoch())
- .build();
- Request build =
request.newBuilder().url(httpUrl).build();
- Response response = chain.proceed(build);
- return response;
- }
- });
- InfluxDB influxDB =
- new InfluxDBImpl(
- config.getUrl(),
- StringUtils.isEmpty(config.getUsername()) ?
StringUtils.EMPTY : config.getUsername(),
- StringUtils.isEmpty(config.getPassword()) ?
StringUtils.EMPTY : config.getPassword(),
- clientBuilder,
- format);
- String version = influxDB.version();
- if (!influxDB.ping().isGood()) {
- String errorMessage =
- String.format(
- "connect influxdb failed, the url is: {%s}",
- config.getUrl());
- throw new ConnectException(errorMessage);
+ new Interceptor() {
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request request = chain.request();
+ HttpUrl httpUrl =
+ request.url()
+ .newBuilder()
+ //set epoch
+ .addQueryParameter("epoch", config.getEpoch())
+ .build();
+ Request build = request.newBuilder().url(httpUrl).build();
+ return chain.proceed(build);
+ }
+ });
+ InfluxDB influxdb =
+ new InfluxDBImpl(
+ config.getUrl(),
+ StringUtils.isEmpty(config.getUsername()) ? StringUtils.EMPTY
: config.getUsername(),
+ StringUtils.isEmpty(config.getPassword()) ? StringUtils.EMPTY
: config.getPassword(),
+ clientBuilder,
+ format);
+ String version = influxdb.version();
+ if (!influxdb.ping().isGood()) {
+ throw new
InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
+ String.format(
+ "Connect influxdb failed, the url is: {%s}",
+ config.getUrl()
+ )
+ );
}
log.info("connect influxdb successful. sever version :{}.", version);
- return influxDB;
+ return influxdb;
}
- public static void setWriteProperty(InfluxDB influxDB, SinkConfig
sinkConfig) {
+ public static void setWriteProperty(InfluxDB influxdb, SinkConfig
sinkConfig) {
String rp = sinkConfig.getRp();
if (!StringUtils.isEmpty(rp)) {
- influxDB.setRetentionPolicy(rp);
+ influxdb.setRetentionPolicy(rp);
}
}
public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws
ConnectException {
- InfluxDB influxDB = getInfluxDB(sinkConfig);
- influxDB.setDatabase(sinkConfig.getDatabase());
+ InfluxDB influxdb = getInfluxDB(sinkConfig);
+ influxdb.setDatabase(sinkConfig.getDatabase());
setWriteProperty(getInfluxDB(sinkConfig), sinkConfig);
- return influxDB;
+ return influxdb;
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
index 405ab2d43..a16121181 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/converter/InfluxDBRowConverter.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import java.util.ArrayList;
import java.util.List;
@@ -39,8 +41,7 @@ public class InfluxDBRowConverter {
SqlType fieldSqlType = seaTunnelDataType.getSqlType();
if (null == values.get(columnIndex)) {
seaTunnelField = null;
- }
- else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
+ } else if (SqlType.BOOLEAN.equals(fieldSqlType)) {
seaTunnelField =
Boolean.parseBoolean(values.get(columnIndex).toString());
} else if (SqlType.SMALLINT.equals(fieldSqlType)) {
seaTunnelField =
Short.valueOf(values.get(columnIndex).toString());
@@ -55,7 +56,8 @@ public class InfluxDBRowConverter {
} else if (SqlType.STRING.equals(fieldSqlType)) {
seaTunnelField = values.get(columnIndex);
} else {
- throw new IllegalStateException("Unexpected value: " +
seaTunnelDataType);
+ throw new
InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported data type: " + seaTunnelDataType);
}
fields.add(seaTunnelField);
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorErrorCode.java
new file mode 100644
index 000000000..3eb64be79
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorErrorCode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum InfluxdbConnectorErrorCode implements SeaTunnelErrorCode {
+
+ CONNECT_FAILED("INFLUXDB-01", "Connect influxdb failed, due to influxdb
version info is unknown"),
+ GET_COLUMN_INDEX_FAILED("INFLUXDB-02", "Get column index of query result
exception");
+
+ private final String code;
+ private final String description;
+
+ InfluxdbConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return SeaTunnelErrorCode.super.getErrorMessage();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorException.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorException.java
new file mode 100644
index 000000000..178f44b17
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/exception/InfluxdbConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.influxdb.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class InfluxdbConnectorException extends SeaTunnelRuntimeException {
+
+ public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public InfluxdbConnectorException(SeaTunnelErrorCode seaTunnelErrorCode,
Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
index 8cc458939..8b803c746 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java
@@ -20,6 +20,8 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import com.google.common.base.Strings;
import org.apache.commons.collections4.CollectionUtils;
@@ -35,14 +37,14 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DefaultSerializer implements Serializer {
- private SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRowType seaTunnelRowType;
private final BiConsumer<SeaTunnelRow, Point.Builder> timestampExtractor;
private final BiConsumer<SeaTunnelRow, Point.Builder> fieldExtractor;
private final BiConsumer<SeaTunnelRow, Point.Builder> tagExtractor;
- private String measurement;
+ private final String measurement;
- private TimeUnit precision;
+ private final TimeUnit precision;
public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit
precision, List<String> tagKeys,
String timestampKey,
@@ -67,8 +69,7 @@ public class DefaultSerializer implements Serializer {
private BiConsumer<SeaTunnelRow, Point.Builder>
createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List<String> fieldKeys)
{
return (row, builder) -> {
- for (int i = 0; i < fieldKeys.size(); i++) {
- String field = fieldKeys.get(i);
+ for (String field : fieldKeys) {
int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field);
SeaTunnelDataType dataType =
seaTunnelRowType.getFieldType(indexOfSeaTunnelRow);
Object val = row.getField(indexOfSeaTunnelRow);
@@ -96,14 +97,15 @@ public class DefaultSerializer implements Serializer {
builder.addField(field, val.toString());
break;
default:
- throw new UnsupportedOperationException("Unsupported
dataType: " + dataType);
+ throw new
InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported data type: " + dataType);
}
}
};
}
private BiConsumer<SeaTunnelRow, Point.Builder>
createTimestampExtractor(SeaTunnelRowType seaTunnelRowType,
- String
timeKey) {
+
String timeKey) {
//not config timeKey, use processing time
if (Strings.isNullOrEmpty(timeKey)) {
return (row, builder) -> builder.time(System.currentTimeMillis(),
precision);
@@ -121,7 +123,7 @@ public class DefaultSerializer implements Serializer {
builder.time(Long.parseLong((String) time), precision);
break;
case TIMESTAMP:
- builder.time(LocalDateTime.class.cast(time)
+ builder.time(((LocalDateTime) time)
.atZone(ZoneOffset.UTC)
.toInstant()
.toEpochMilli(), precision);
@@ -136,15 +138,15 @@ public class DefaultSerializer implements Serializer {
}
private BiConsumer<SeaTunnelRow, Point.Builder>
createTagExtractor(SeaTunnelRowType seaTunnelRowType,
- List<String>
tagKeys) {
+
List<String> tagKeys) {
//not config tagKeys
if (CollectionUtils.isEmpty(tagKeys)) {
- return (row, builder) -> {};
+ return (row, builder) -> {
+ };
}
return (row, builder) -> {
- for (int i = 0; i < tagKeys.size(); i++) {
- String tagKey = tagKeys.get(i);
+ for (String tagKey : tagKeys) {
int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey);
builder.tag(tagKey,
row.getField(indexOfSeaTunnelRow).toString());
}
@@ -152,11 +154,11 @@ public class DefaultSerializer implements Serializer {
}
private List<String> getFieldKeys(SeaTunnelRowType seaTunnelRowType,
- String timestampKey,
- List<String> tagKeys) {
+ String timestampKey,
+ List<String> tagKeys) {
return Stream.of(seaTunnelRowType.getFieldNames())
- .filter(name -> CollectionUtils.isEmpty(tagKeys) ||
!tagKeys.contains(name))
- .filter(name -> StringUtils.isEmpty(timestampKey) ||
!name.equals(timestampKey))
- .collect(Collectors.toList());
+ .filter(name -> CollectionUtils.isEmpty(tagKeys) ||
!tagKeys.contains(name))
+ .filter(name -> StringUtils.isEmpty(timestampKey) ||
!name.equals(timestampKey))
+ .collect(Collectors.toList());
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
index 074e5a518..86180e552 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java
@@ -21,6 +21,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDB
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -31,6 +32,7 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -53,7 +55,12 @@ public class InfluxDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config, URL.key(),
KEY_MEASUREMENT.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK,
+ result.getMsg()
+ )
+ );
}
this.pluginConfig = config;
}
@@ -64,12 +71,12 @@ public class InfluxDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public SeaTunnelDataType getConsumedType() {
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return seaTunnelRowType;
}
@Override
- public AbstractSinkWriter createWriter(SinkWriter.Context context) throws
IOException {
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) throws IOException {
return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
index 809a3eaaa..0b9c3beaa 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java
@@ -19,9 +19,12 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer;
import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer;
@@ -48,8 +51,8 @@ import java.util.concurrent.TimeUnit;
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
{
private final Serializer serializer;
- private InfluxDB influxDB;
- private SinkConfig sinkConfig;
+ private InfluxDB influxdb;
+ private final SinkConfig sinkConfig;
private final List<Point> batchList;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
@@ -61,12 +64,12 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
this.serializer = new DefaultSerializer(
- seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(),
sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
+ seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(),
sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
this.batchList = new ArrayList<>();
if (batchIntervalMs != null) {
scheduler = Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
+ new
ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build());
scheduledFuture = scheduler.scheduleAtFixedRate(
() -> {
try {
@@ -106,9 +109,9 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
flush();
- if (influxDB != null) {
- influxDB.close();
- influxDB = null;
+ if (influxdb != null) {
+ influxdb.close();
+ influxdb = null;
}
}
@@ -117,7 +120,7 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
batchList.add(record);
if (sinkConfig.getBatchSize() > 0
- && batchList.size() >= sinkConfig.getBatchSize()) {
+ && batchList.size() >= sinkConfig.getBatchSize()) {
flush();
}
}
@@ -131,21 +134,22 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
try {
batchPoints.points(batchList);
- influxDB.write(batchPoints.build());
+ influxdb.write(batchPoints.build());
} catch (Exception e) {
log.error("Writing records to influxdb failed, retry times =
{}", i, e);
if (i >= sinkConfig.getMaxRetries()) {
- throw new IOException("Writing records to InfluxDB
failed.", e);
+ throw new
InfluxdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+ "Writing records to InfluxDB failed.", e);
}
try {
long backoff =
Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
- sinkConfig.getMaxRetryBackoffMs());
+ sinkConfig.getMaxRetryBackoffMs());
Thread.sleep(backoff);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
- throw new IOException(
- "Unable to flush; interrupted while doing another
attempt.", e);
+ throw new
InfluxdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+ "Unable to flush; interrupted while doing another
attempt.", e);
}
}
}
@@ -155,20 +159,22 @@ public class InfluxDBSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private void checkFlushException() {
if (flushException != null) {
- throw new RuntimeException("Writing records to InfluxDB failed.",
flushException);
+ throw new
InfluxdbConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+ "Writing records to InfluxDB failed.", flushException);
}
}
public void connect() throws ConnectException {
- if (influxDB == null) {
- influxDB = InfluxDBClient.getWriteClient(sinkConfig);
- String version = influxDB.version();
- if (!influxDB.ping().isGood()) {
- String errorMessage =
- String.format(
- "connect influxdb failed, due to influxdb
version info is unknown, the url is: {%s}",
- sinkConfig.getUrl());
- throw new ConnectException(errorMessage);
+ if (influxdb == null) {
+ influxdb = InfluxDBClient.getWriteClient(sinkConfig);
+ String version = influxdb.version();
+ if (!influxdb.ping().isGood()) {
+ throw new
InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
+ String.format(
+ "connect influxdb failed, due to influxdb version info
is unknown, the url is: {%s}",
+ sinkConfig.getUrl()
+ )
+ );
}
log.info("connect influxdb successful. sever version :{}.",
version);
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
index 26230f0ae..2c961517f 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL;
import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
@@ -33,6 +34,8 @@ import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -50,7 +53,7 @@ import java.util.stream.Collectors;
@Slf4j
@AutoService(SeaTunnelSource.class)
-public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow,
InfluxDBSourceSplit, InfluxDBSourceState> {
+public class InfluxDBSource implements SeaTunnelSource<SeaTunnelRow,
InfluxDBSourceSplit, InfluxDBSourceState> {
private SeaTunnelRowType typeInfo;
private SourceConfig sourceConfig;
@@ -67,7 +70,12 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config, SQL.key());
if (!result.isSuccess()) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ throw new
InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE,
+ result.getMsg()
+ )
+ );
}
try {
this.sourceConfig = SourceConfig.loadConfig(config);
@@ -75,7 +83,10 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
this.typeInfo = seatunnelSchema.getSeaTunnelRowType();
this.columnsIndexList =
initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig));
} catch (Exception e) {
- throw new PrepareFailException("InfluxDB", PluginType.SOURCE,
e.toString());
+ throw new
InfluxdbConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format("PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE, e)
+ );
}
}
@@ -85,7 +96,7 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
}
@Override
- public SeaTunnelDataType getProducedType() {
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return typeInfo;
}
@@ -104,20 +115,20 @@ public class InfluxDBSource implements
SeaTunnelSource<SeaTunnelRow, InfluxDBSou
return new InfluxDBSourceSplitEnumerator(enumeratorContext,
checkpointState, sourceConfig);
}
- private List<Integer> initColumnsIndex(InfluxDB influxDB) {
+ private List<Integer> initColumnsIndex(InfluxDB influxdb) {
//query one row to get column info
String query = sourceConfig.getSql() + QUERY_LIMIT;
- List<String> fieldNames = new ArrayList<>();
try {
- QueryResult queryResult = influxDB.query(
- new Query(query, sourceConfig.getDatabase()));
+ QueryResult queryResult = influxdb.query(
+ new Query(query, sourceConfig.getDatabase()));
List<QueryResult.Series> serieList =
queryResult.getResults().get(0).getSeries();
- fieldNames.addAll(serieList.get(0).getColumns());
+ List<String> fieldNames = new
ArrayList<>(serieList.get(0).getColumns());
- return Arrays.stream(typeInfo.getFieldNames()).map(x ->
fieldNames.indexOf(x)).collect(Collectors.toList());
+ return
Arrays.stream(typeInfo.getFieldNames()).map(fieldNames::indexOf).collect(Collectors.toList());
} catch (Exception e) {
- throw new RuntimeException("get column index of query result
exception", e);
+ throw new
InfluxdbConnectorException(InfluxdbConnectorErrorCode.GET_COLUMN_INDEX_FAILED,
+ "Get column index of query result exception", e);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
index 37dd688f8..50d9a51e2 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplit.java
@@ -20,9 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
import org.apache.seatunnel.api.source.SourceSplit;
public class InfluxDBSourceSplit implements SourceSplit {
- private String splitId;
+ private final String splitId;
- private String query;
+ private final String query;
public InfluxDBSourceSplit(String splitId, String query) {
this.query = query;
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
index aad78e6fc..fdcabac1f 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java
@@ -20,7 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.influxdb.source;
import static
org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import lombok.extern.slf4j.Slf4j;
@@ -121,7 +123,8 @@ public class InfluxDBSourceSplitEnumerator implements
SourceSplitEnumerator<Infl
String[] sqls = sql.split(SQL_WHERE.key());
if (sqls.length > 2) {
- throw new IllegalArgumentException("sql should not contain more
than one where");
+ throw new
InfluxdbConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+ "sql should not contain more than one where");
}
int i = 0;
@@ -209,7 +212,8 @@ public class InfluxDBSourceSplitEnumerator implements
SourceSplitEnumerator<Infl
@Override
public void handleSplitRequest(int subtaskId) {
- throw new UnsupportedOperationException("Unsupported
handleSplitRequest: " + subtaskId);
+ throw new
InfluxdbConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format("Unsupported handleSplitRequest: %d", subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
index 090b8bf97..915eaa734 100644
---
a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
+++
b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig;
import
org.apache.seatunnel.connectors.seatunnel.influxdb.converter.InfluxDBRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -40,12 +42,12 @@ import java.util.Queue;
@Slf4j
public class InfluxdbSourceReader implements SourceReader<SeaTunnelRow,
InfluxDBSourceSplit> {
- private InfluxDB influxDB;
+ private InfluxDB influxdb;
InfluxDBConfig config;
private final SourceReader.Context context;
- private SeaTunnelRowType seaTunnelRowType;
+ private final SeaTunnelRowType seaTunnelRowType;
List<Integer> columnsIndexList;
private final Queue<InfluxDBSourceSplit> pendingSplits;
@@ -61,15 +63,16 @@ public class InfluxdbSourceReader implements
SourceReader<SeaTunnelRow, InfluxDB
}
public void connect() throws ConnectException {
- if (influxDB == null) {
- influxDB = InfluxDBClient.getInfluxDB(config);
- String version = influxDB.version();
- if (!influxDB.ping().isGood()) {
- String errorMessage =
- String.format(
- "connect influxdb failed, due to influxdb
version info is unknown, the url is: {%s}",
- config.getUrl());
- throw new ConnectException(errorMessage);
+ if (influxdb == null) {
+ influxdb = InfluxDBClient.getInfluxDB(config);
+ String version = influxdb.version();
+ if (!influxdb.ping().isGood()) {
+ throw new
InfluxdbConnectorException(InfluxdbConnectorErrorCode.CONNECT_FAILED,
+ String.format(
+ "connect influxdb failed, due to influxdb version info
is unknown, the url is: {%s}",
+ config.getUrl()
+ )
+ );
}
log.info("connect influxdb successful. sever version :{}.",
version);
}
@@ -82,9 +85,9 @@ public class InfluxdbSourceReader implements
SourceReader<SeaTunnelRow, InfluxDB
@Override
public void close() {
- if (influxDB != null) {
- influxDB.close();
- influxDB = null;
+ if (influxdb != null) {
+ influxdb.close();
+ influxdb = null;
}
}
@@ -98,8 +101,8 @@ public class InfluxdbSourceReader implements
SourceReader<SeaTunnelRow, InfluxDB
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())
- && noMoreSplitsAssignment
- && pendingSplits.isEmpty()) {
+ && noMoreSplitsAssignment
+ && pendingSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded influxDB source");
context.signalNoMoreElement();
@@ -123,12 +126,12 @@ public class InfluxdbSourceReader implements
SourceReader<SeaTunnelRow, InfluxDB
}
@Override
- public void notifyCheckpointComplete(long checkpointId) {
+ public void notifyCheckpointComplete(long checkpointId) {
}
private void read(InfluxDBSourceSplit split, Collector<SeaTunnelRow>
output) {
- QueryResult queryResult = influxDB.query(new Query(split.getQuery(),
config.getDatabase()));
+ QueryResult queryResult = influxdb.query(new Query(split.getQuery(),
config.getDatabase()));
for (QueryResult.Result result : queryResult.getResults()) {
List<QueryResult.Series> serieList = result.getSeries();
if (CollectionUtils.isNotEmpty(serieList)) {
@@ -140,7 +143,7 @@ public class InfluxdbSourceReader implements
SourceReader<SeaTunnelRow, InfluxDB
}
} else {
log.debug(
- "split[{}] reader influxDB series is empty.",
split.splitId());
+ "split[{}] reader influxDB series is empty.",
split.splitId());
}
}
}