This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 06359ea76 [Feature][Connector V2] expose configurable options in IoTDB
(#3387)
06359ea76 is described below
commit 06359ea76a094c586c70d7a060778156e580f8e1
Author: john <[email protected]>
AuthorDate: Mon Nov 21 09:57:38 2022 +0800
[Feature][Connector V2] expose configurable options in IoTDB (#3387)
Co-authored-by: wenwei <[email protected]>
---
.../seatunnel/iotdb/config/CommonConfig.java | 9 +-
.../seatunnel/iotdb/config/SinkConfig.java | 95 +++++++++++-----------
.../seatunnel/iotdb/config/SourceConfig.java | 31 +++----
.../connectors/seatunnel/iotdb/sink/IoTDBSink.java | 2 +-
.../seatunnel/iotdb/sink/IoTDBSinkFactory.java | 60 ++++++++++++++
.../seatunnel/iotdb/source/IoTDBSource.java | 4 +-
.../seatunnel/iotdb/source/IoTDBSourceFactory.java | 56 +++++++++++++
.../seatunnel/iotdb/source/IoTDBSourceReader.java | 41 +++++-----
.../iotdb/source/IoTDBSourceSplitEnumerator.java | 10 +--
9 files changed, 217 insertions(+), 91 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
index c52a743dc..e82f6f98f 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
@@ -28,9 +31,9 @@ import java.util.List;
@AllArgsConstructor
public class CommonConfig {
- public static final String NODE_URLS = "node_urls";
- public static final String USERNAME = "username";
- public static final String PASSWORD = "password";
+ public static final Option<String> NODE_URLS =
Options.key("node_urls").stringType().noDefaultValue().withDescription("node
urls");
+ public static final Option<String> USERNAME =
Options.key("username").stringType().noDefaultValue().withDescription("username");
+ public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("password");
private final List<String> nodeUrls;
private final String username;
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
index f942af672..65a482b27 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
@@ -20,6 +20,9 @@ package
org.apache.seatunnel.connectors.seatunnel.iotdb.config;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.Getter;
@@ -35,28 +38,28 @@ import java.util.List;
@ToString
public class SinkConfig extends CommonConfig {
- public static final String KEY_TIMESTAMP = "key_timestamp";
- public static final String KEY_DEVICE = "key_device";
- public static final String KEY_MEASUREMENT_FIELDS =
"key_measurement_fields";
- public static final String STORAGE_GROUP = "storage_group";
- public static final String BATCH_SIZE = "batch_size";
- public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
- public static final String MAX_RETRIES = "max_retries";
- public static final String RETRY_BACKOFF_MULTIPLIER_MS =
"retry_backoff_multiplier_ms";
- public static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
- public static final String DEFAULT_THRIFT_BUFFER_SIZE =
"default_thrift_buffer_size";
- public static final String MAX_THRIFT_FRAME_SIZE = "max_thrift_frame_size";
- public static final String ZONE_ID = "zone_id";
- public static final String ENABLE_RPC_COMPRESSION =
"enable_rpc_compression";
- public static final String CONNECTION_TIMEOUT_IN_MS =
"connection_timeout_in_ms";
-
private static final int DEFAULT_BATCH_SIZE = 1024;
+ public static final Option<String> KEY_TIMESTAMP =
Options.key("key_timestamp").stringType().noDefaultValue().withDescription("key
timestamp");
+ public static final Option<String> KEY_DEVICE =
Options.key("key_device").stringType().noDefaultValue().withDescription("key
device");
+ public static final Option<List<String>> KEY_MEASUREMENT_FIELDS =
Options.key("key_measurement_fields").listType().noDefaultValue().withDescription("key
measurement fields");
+ public static final Option<String> STORAGE_GROUP =
Options.key("storage_group").stringType().noDefaultValue().withDescription("store
group");
+ public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size").intType().defaultValue(DEFAULT_BATCH_SIZE).withDescription("batch
size");
+ public static final Option<String> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms").stringType().noDefaultValue().withDescription("batch
interval ms");
+ public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries").intType().noDefaultValue().withDescription("max
retries");
+ public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
Options.key("retry_backoff_multiplier_ms").intType().noDefaultValue().withDescription("retry
backoff multiplier ms ");
+ public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
Options.key("max_retry_backoff_ms").intType().noDefaultValue().withDescription("max
retry backoff ms ");
+ public static final Option<Integer> DEFAULT_THRIFT_BUFFER_SIZE =
Options.key("default_thrift_buffer_size").intType().noDefaultValue().withDescription("default
thrift buffer size");
+ public static final Option<Integer> MAX_THRIFT_FRAME_SIZE =
Options.key("max_thrift_frame_size").intType().noDefaultValue().withDescription("max
thrift frame size");
+ public static final Option<String> ZONE_ID =
Options.key("zone_id").stringType().noDefaultValue().withDescription("zone id");
+ public static final Option<Boolean> ENABLE_RPC_COMPRESSION =
Options.key("enable_rpc_compression").booleanType().noDefaultValue().withDescription("enable
rpc comm");
+ public static final Option<Integer> CONNECTION_TIMEOUT_IN_MS =
Options.key("connection_timeout_in_ms").intType().noDefaultValue().withDescription("connection
timeout ms");
+
private String keyTimestamp;
private String keyDevice;
private List<String> keyMeasurementFields;
private String storageGroup;
- private int batchSize = DEFAULT_BATCH_SIZE;
+ private int batchSize = BATCH_SIZE.defaultValue();
private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
@@ -75,56 +78,56 @@ public class SinkConfig extends CommonConfig {
public static SinkConfig loadConfig(Config pluginConfig) {
SinkConfig sinkConfig = new SinkConfig(
- pluginConfig.getStringList(NODE_URLS),
- pluginConfig.getString(USERNAME),
- pluginConfig.getString(PASSWORD));
+ pluginConfig.getStringList(NODE_URLS.key()),
+ pluginConfig.getString(USERNAME.key()),
+ pluginConfig.getString(PASSWORD.key()));
- sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE));
- if (pluginConfig.hasPath(KEY_TIMESTAMP)) {
- sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP));
+ sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE.key()));
+ if (pluginConfig.hasPath(KEY_TIMESTAMP.key())) {
+
sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP.key()));
}
- if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS)) {
-
sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS));
+ if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS.key())) {
+
sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS.key()));
}
- if (pluginConfig.hasPath(STORAGE_GROUP)) {
- sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP));
+ if (pluginConfig.hasPath(STORAGE_GROUP.key())) {
+
sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP.key()));
}
- if (pluginConfig.hasPath(BATCH_SIZE)) {
- int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE));
+ if (pluginConfig.hasPath(BATCH_SIZE.key())) {
+ int batchSize =
checkIntArgument(pluginConfig.getInt(BATCH_SIZE.key()));
sinkConfig.setBatchSize(batchSize);
}
- if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) {
- int batchIntervalMs =
checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS));
+ if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
+ int batchIntervalMs =
checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS.key()));
sinkConfig.setBatchIntervalMs(batchIntervalMs);
}
- if (pluginConfig.hasPath(MAX_RETRIES)) {
- int maxRetries =
checkIntArgument(pluginConfig.getInt(MAX_RETRIES));
+ if (pluginConfig.hasPath(MAX_RETRIES.key())) {
+ int maxRetries =
checkIntArgument(pluginConfig.getInt(MAX_RETRIES.key()));
sinkConfig.setMaxRetries(maxRetries);
}
- if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
- int retryBackoffMultiplierMs =
checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+ if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
+ int retryBackoffMultiplierMs =
checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
sinkConfig.setRetryBackoffMultiplierMs(retryBackoffMultiplierMs);
}
- if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS)) {
- int maxRetryBackoffMs =
checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS));
+ if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
+ int maxRetryBackoffMs =
checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
sinkConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
}
- if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE)) {
- int thriftDefaultBufferSize =
checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE));
+ if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE.key())) {
+ int thriftDefaultBufferSize =
checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE.key()));
sinkConfig.setThriftDefaultBufferSize(thriftDefaultBufferSize);
}
- if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE)) {
- int thriftMaxFrameSize =
checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE));
+ if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE.key())) {
+ int thriftMaxFrameSize =
checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE.key()));
sinkConfig.setThriftMaxFrameSize(thriftMaxFrameSize);
}
- if (pluginConfig.hasPath(ZONE_ID)) {
- sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID)));
+ if (pluginConfig.hasPath(ZONE_ID.key())) {
+
sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID.key())));
}
- if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION)) {
-
sinkConfig.setEnableRPCCompression(pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION));
+ if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION.key())) {
+
sinkConfig.setEnableRPCCompression(pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION.key()));
}
- if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS)) {
- int connectionTimeoutInMs =
checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS));
+ if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS.key())) {
+ int connectionTimeoutInMs =
checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS.key()));
checkNotNull(sinkConfig.getEnableRPCCompression());
sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
index 5c952b416..90cc33b8b 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
/**
* SourceConfig is the configuration for the IotDBSource.
* <p>
@@ -25,19 +28,19 @@ package
org.apache.seatunnel.connectors.seatunnel.iotdb.config;
*/
public class SourceConfig {
- public static final String SQL = "sql";
+ public static final Option<String> SQL =
Options.key("sql").stringType().noDefaultValue().withDescription("sql");
/*---------------------- single node configurations
-------------------------*/
/**
* The host of the IotDB server.
*/
- public static final String HOST = "host";
+ public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("host");
/*
* The port of the IotDB server.
*/
- public static final String PORT = "port";
+ public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("port");
/*---------------------- multiple node configurations
-------------------------*/
@@ -45,58 +48,58 @@ public class SourceConfig {
/**
* Username for the source.
*/
- public static final String USERNAME = "username";
+ public static final Option<String> USERNAME =
Options.key("username").stringType().noDefaultValue().withDescription("usernam");
/**
* Password for the source.
*/
- public static final String PASSWORD = "password";
+ public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("password");
/**
* multiple nodes
*/
- public static final String NODE_URLS = "node_urls";
+ public static final Option<String> NODE_URLS =
Options.key("node_urls").stringType().noDefaultValue().withDescription("node
urls");
/*---------------------- other configurations -------------------------*/
/**
* Fetches the next batch of data from the source.
*/
- public static final String FETCH_SIZE = "fetch_size";
+ public static final Option<Integer> FETCH_SIZE =
Options.key("fetch_size").intType().noDefaultValue().withDescription("fetch
size");
/**
* thrift default buffer size
*/
- public static final String THRIFT_DEFAULT_BUFFER_SIZE =
"thrift_default_buffer_size";
+ public static final Option<Integer> THRIFT_DEFAULT_BUFFER_SIZE =
Options.key("thrift_default_buffer_size").intType().noDefaultValue().withDescription("
default thrift buffer size of iot db ");
/**
* thrift max frame size
*/
- public static final String THRIFT_MAX_FRAME_SIZE = "thrift_max_frame_size";
+ public static final Option<Integer> THRIFT_MAX_FRAME_SIZE =
Options.key("thrift_max_frame_size").intType().noDefaultValue().withDescription("thrift
max frame size ");
/**
* cassandra default buffer size
*/
- public static final String ENABLE_CACHE_LEADER = "enable_cache_leader";
+ public static final Option<Boolean> ENABLE_CACHE_LEADER =
Options.key("enable_cache_leader").booleanType().noDefaultValue().withDescription("enable
cache leader ");
/**
* Version represents the SQL semantic version used by the client, which
is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13.
The possible values are: V_0_12, V_0_13.
*/
- public static final String VERSION = "version";
+ public static final Option<String> VERSION =
Options.key("version").stringType().noDefaultValue().withDescription("version");
/**
* Query lower bound of the time range to be read.
*/
- public static final String LOWER_BOUND = "lower_bound";
+ public static final Option<Long> LOWER_BOUND =
Options.key("lower_bound").longType().noDefaultValue().withDescription("low
bound");
/**
* Query upper bound of the time range to be read.
*/
- public static final String UPPER_BOUND = "upper_bound";
+ public static final Option<Long> UPPER_BOUND =
Options.key("upper_bound").longType().noDefaultValue().withDescription("upper
bound");
/**
* Query num partitions to be read.
*/
- public static final String NUM_PARTITIONS = "num_partitions";
+ public static final Option<Integer> NUM_PARTITIONS =
Options.key("num_partitions").intType().noDefaultValue().withDescription("num
partitions");
}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
index fe00e2d81..aa813b95e 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -51,7 +51,7 @@ public class IoTDBSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS, USERNAME, PASSWORD, KEY_DEVICE);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS.key(), USERNAME.key(), PASSWORD.key(), KEY_DEVICE.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java
new file mode 100644
index 000000000..ac2beb0a6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.NODE_URLS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.BATCH_INTERVAL_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.CONNECTION_TIMEOUT_IN_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.DEFAULT_THRIFT_BUFFER_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.ENABLE_RPC_COMPRESSION;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_DEVICE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_MEASUREMENT_FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_TIMESTAMP;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.MAX_RETRIES;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.MAX_RETRY_BACKOFF_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.MAX_THRIFT_FRAME_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.STORAGE_GROUP;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.ZONE_ID;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class IoTDBSinkFactory implements TableSinkFactory{
+ @Override
+ public String factoryIdentifier() {
+ return "IoTDB";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(NODE_URLS, USERNAME, PASSWORD, KEY_DEVICE)
+ .optional(KEY_TIMESTAMP, KEY_MEASUREMENT_FIELDS,
STORAGE_GROUP, BATCH_SIZE, BATCH_INTERVAL_MS,
+ MAX_RETRIES, RETRY_BACKOFF_MULTIPLIER_MS,
MAX_RETRY_BACKOFF_MS, DEFAULT_THRIFT_BUFFER_SIZE,
+ MAX_THRIFT_FRAME_SIZE, ZONE_ID,
ENABLE_RPC_COMPRESSION, CONNECTION_TIMEOUT_IN_MS)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
index 592c0739e..003a67fd1 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -59,9 +59,9 @@ public class IoTDBSource implements
SeaTunnelSource<SeaTunnelRow, IoTDBSourceSpl
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HOST, PORT);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
HOST.key(), PORT.key());
if (!result.isSuccess()) {
- result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS);
+ result = CheckConfigUtil.checkAllExists(pluginConfig,
NODE_URLS.key());
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(),
PluginType.SOURCE, "host and port and node urls are both empty");
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
new file mode 100644
index 000000000..ee92a50ea
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.NODE_URLS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.USERNAME;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.ENABLE_CACHE_LEADER;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.FETCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.LOWER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NUM_PARTITIONS;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_MAX_FRAME_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.UPPER_BOUND;
+import static
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.VERSION;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class IoTDBSourceFactory implements TableSourceFactory{
+ @Override
+ public String factoryIdentifier() {
+ return "IoTDB";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(NODE_URLS, USERNAME, PASSWORD, SQL)
+ .optional(HOST, PORT, FETCH_SIZE, THRIFT_DEFAULT_BUFFER_SIZE,
THRIFT_MAX_FRAME_SIZE,
+ ENABLE_CACHE_LEADER, VERSION, LOWER_BOUND,
UPPER_BOUND, NUM_PARTITIONS)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
index a7e8aaa8a..28510361a 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
@@ -87,7 +87,9 @@ public class IoTDBSourceReader implements
SourceReader<SeaTunnelRow, IoTDBSource
public void close() throws IOException {
//nothing to do
try {
- session.close();
+ if (session != null) {
+ session.close();
+ }
} catch (IoTDBConnectionException e) {
throw new IOException("close IoTDB session failed", e);
}
@@ -123,37 +125,36 @@ public class IoTDBSourceReader implements
SourceReader<SeaTunnelRow, IoTDBSource
private Session buildSession(Map<String, Object> conf) {
Session.Builder sessionBuilder = new Session.Builder();
- if (conf.containsKey(HOST)) {
+ if (conf.containsKey(HOST.key())) {
sessionBuilder
- .host((String) conf.get(HOST))
- .port(Integer.parseInt(conf.get(PORT).toString()))
+ .host((String) conf.get(HOST.key()))
+ .port(Integer.parseInt(conf.get(PORT.key()).toString()))
.build();
} else {
- String nodeUrlsString = (String) conf.get(NODE_URLS);
-
+ String nodeUrlsString = (String) conf.get(NODE_URLS.key());
List<String> nodes =
Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList());
sessionBuilder.nodeUrls(nodes);
}
- if (null != conf.get(FETCH_SIZE)) {
-
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE).toString()));
+ if (null != conf.get(FETCH_SIZE.key())) {
+
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE.key()).toString()));
}
- if (null != conf.get(USERNAME)) {
- sessionBuilder.username((String) conf.get(USERNAME));
+ if (null != conf.get(USERNAME.key())) {
+ sessionBuilder.username((String) conf.get(USERNAME.key()));
}
- if (null != conf.get(PASSWORD)) {
- sessionBuilder.password((String) conf.get(PASSWORD));
+ if (null != conf.get(PASSWORD.key())) {
+ sessionBuilder.password((String) conf.get(PASSWORD.key()));
}
- if (null != conf.get(THRIFT_DEFAULT_BUFFER_SIZE)) {
-
sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(THRIFT_DEFAULT_BUFFER_SIZE).toString()));
+ if (null != conf.get(THRIFT_DEFAULT_BUFFER_SIZE.key())) {
+
sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(THRIFT_DEFAULT_BUFFER_SIZE.key()).toString()));
}
- if (null != conf.get(THRIFT_MAX_FRAME_SIZE)) {
-
sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(THRIFT_MAX_FRAME_SIZE).toString()));
+ if (null != conf.get(THRIFT_MAX_FRAME_SIZE.key())) {
+
sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(THRIFT_MAX_FRAME_SIZE.key()).toString()));
}
- if (null != conf.get(ENABLE_CACHE_LEADER)) {
-
sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(ENABLE_CACHE_LEADER).toString()));
+ if (null != conf.get(ENABLE_CACHE_LEADER.key())) {
+
sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(ENABLE_CACHE_LEADER.key()).toString()));
}
- if (null != conf.get(VERSION)) {
- Version version = Version.valueOf(conf.get(VERSION).toString());
+ if (null != conf.get(VERSION.key())) {
+ Version version =
Version.valueOf(conf.get(VERSION.key()).toString());
sessionBuilder.version(version);
}
return sessionBuilder.build();
diff --git
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
index 5d7a83c59..5820d1d4b 100644
---
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
@@ -113,16 +113,16 @@ public class IoTDBSourceSplitEnumerator implements
SourceSplitEnumerator<IoTDBSo
* split 2: select * from test where (time >= 6 and time < 11) and ( age
> 0 and age < 10 )
*/
private Set<IoTDBSourceSplit> getIotDBSplit() {
- String sql = conf.get(SQL).toString();
+ String sql = conf.get(SQL.key()).toString();
Set<IoTDBSourceSplit> iotDBSourceSplits = new HashSet<>();
// no need numPartitions, use one partition
- if (!conf.containsKey(NUM_PARTITIONS)) {
+ if (!conf.containsKey(NUM_PARTITIONS.key())) {
iotDBSourceSplits.add(new IoTDBSourceSplit(DEFAULT_PARTITIONS,
sql));
return iotDBSourceSplits;
}
- long start = Long.parseLong(conf.get(LOWER_BOUND).toString());
- long end = Long.parseLong(conf.get(UPPER_BOUND).toString());
- int numPartitions =
Integer.parseInt(conf.get(NUM_PARTITIONS).toString());
+ long start = Long.parseLong(conf.get(LOWER_BOUND.key()).toString());
+ long end = Long.parseLong(conf.get(UPPER_BOUND.key()).toString());
+ int numPartitions =
Integer.parseInt(conf.get(NUM_PARTITIONS.key()).toString());
String sqlBase = sql;
String sqlAlign = null;
String sqlCondition = null;