This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 06359ea76 [Feature][Connector V2] expose configurable options in IoTDB 
(#3387)
06359ea76 is described below

commit 06359ea76a094c586c70d7a060778156e580f8e1
Author: john <[email protected]>
AuthorDate: Mon Nov 21 09:57:38 2022 +0800

    [Feature][Connector V2] expose configurable options in IoTDB (#3387)
    
    
    Co-authored-by: wenwei <[email protected]>
---
 .../seatunnel/iotdb/config/CommonConfig.java       |  9 +-
 .../seatunnel/iotdb/config/SinkConfig.java         | 95 +++++++++++-----------
 .../seatunnel/iotdb/config/SourceConfig.java       | 31 +++----
 .../connectors/seatunnel/iotdb/sink/IoTDBSink.java |  2 +-
 .../seatunnel/iotdb/sink/IoTDBSinkFactory.java     | 60 ++++++++++++++
 .../seatunnel/iotdb/source/IoTDBSource.java        |  4 +-
 .../seatunnel/iotdb/source/IoTDBSourceFactory.java | 56 +++++++++++++
 .../seatunnel/iotdb/source/IoTDBSourceReader.java  | 41 +++++-----
 .../iotdb/source/IoTDBSourceSplitEnumerator.java   | 10 +--
 9 files changed, 217 insertions(+), 91 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
index c52a743dc..e82f6f98f 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
@@ -28,9 +31,9 @@ import java.util.List;
 @AllArgsConstructor
 public class CommonConfig {
 
-    public static final String NODE_URLS = "node_urls";
-    public static final String USERNAME = "username";
-    public static final String PASSWORD = "password";
+    public static final Option<String> NODE_URLS = 
Options.key("node_urls").stringType().noDefaultValue().withDescription("node 
urls");
+    public static final Option<String> USERNAME = 
Options.key("username").stringType().noDefaultValue().withDescription("username");
+    public static final Option<String> PASSWORD = 
Options.key("password").stringType().noDefaultValue().withDescription("password");
 
     private final List<String> nodeUrls;
     private final String username;
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
index f942af672..65a482b27 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
@@ -20,6 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.iotdb.config;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -35,28 +38,28 @@ import java.util.List;
 @ToString
 public class SinkConfig extends CommonConfig {
 
-    public static final String KEY_TIMESTAMP = "key_timestamp";
-    public static final String KEY_DEVICE = "key_device";
-    public static final String KEY_MEASUREMENT_FIELDS = 
"key_measurement_fields";
-    public static final String STORAGE_GROUP = "storage_group";
-    public static final String BATCH_SIZE = "batch_size";
-    public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
-    public static final String MAX_RETRIES = "max_retries";
-    public static final String RETRY_BACKOFF_MULTIPLIER_MS = 
"retry_backoff_multiplier_ms";
-    public static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
-    public static final String DEFAULT_THRIFT_BUFFER_SIZE = 
"default_thrift_buffer_size";
-    public static final String MAX_THRIFT_FRAME_SIZE = "max_thrift_frame_size";
-    public static final String ZONE_ID = "zone_id";
-    public static final String ENABLE_RPC_COMPRESSION = 
"enable_rpc_compression";
-    public static final String CONNECTION_TIMEOUT_IN_MS = 
"connection_timeout_in_ms";
-
     private static final int DEFAULT_BATCH_SIZE = 1024;
 
+    public static final Option<String> KEY_TIMESTAMP = 
Options.key("key_timestamp").stringType().noDefaultValue().withDescription("key 
timestamp");
+    public static final Option<String> KEY_DEVICE = 
Options.key("key_device").stringType().noDefaultValue().withDescription("key 
device");
+    public static final Option<List<String>> KEY_MEASUREMENT_FIELDS = 
Options.key("key_measurement_fields").listType().noDefaultValue().withDescription("key
 measurement fields");
+    public static final Option<String> STORAGE_GROUP = 
Options.key("storage_group").stringType().noDefaultValue().withDescription("store
 group");
+    public static final Option<Integer> BATCH_SIZE =  
Options.key("batch_size").intType().defaultValue(DEFAULT_BATCH_SIZE).withDescription("batch
 size");
+    public static final Option<String> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms").stringType().noDefaultValue().withDescription("batch
 interval ms");
+    public static final Option<Integer> MAX_RETRIES = 
Options.key("max_retries").intType().noDefaultValue().withDescription("max 
retries");
+    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS = 
Options.key("retry_backoff_multiplier_ms").intType().noDefaultValue().withDescription("retry
 backoff multiplier ms ");
+    public static final Option<Integer> MAX_RETRY_BACKOFF_MS = 
Options.key("max_retry_backoff_ms").intType().noDefaultValue().withDescription("max
 retry backoff ms ");
+    public static final Option<Integer> DEFAULT_THRIFT_BUFFER_SIZE = 
Options.key("default_thrift_buffer_size").intType().noDefaultValue().withDescription("default
 thrift buffer size");
+    public static final Option<Integer> MAX_THRIFT_FRAME_SIZE = 
Options.key("max_thrift_frame_size").intType().noDefaultValue().withDescription("max
 thrift frame size");
+    public static final Option<String> ZONE_ID = 
Options.key("zone_id").stringType().noDefaultValue().withDescription("zone id");
+    public static final Option<Boolean> ENABLE_RPC_COMPRESSION = 
Options.key("enable_rpc_compression").booleanType().noDefaultValue().withDescription("enable
 rpc comm");
+    public static final Option<Integer> CONNECTION_TIMEOUT_IN_MS = 
Options.key("connection_timeout_in_ms").intType().noDefaultValue().withDescription("connection
 timeout ms");
+
     private String keyTimestamp;
     private String keyDevice;
     private List<String> keyMeasurementFields;
     private String storageGroup;
-    private int batchSize = DEFAULT_BATCH_SIZE;
+    private int batchSize = BATCH_SIZE.defaultValue();
     private Integer batchIntervalMs;
     private int maxRetries;
     private int retryBackoffMultiplierMs;
@@ -75,56 +78,56 @@ public class SinkConfig extends CommonConfig {
 
     public static SinkConfig loadConfig(Config pluginConfig) {
         SinkConfig sinkConfig = new SinkConfig(
-                pluginConfig.getStringList(NODE_URLS),
-                pluginConfig.getString(USERNAME),
-                pluginConfig.getString(PASSWORD));
+                pluginConfig.getStringList(NODE_URLS.key()),
+                pluginConfig.getString(USERNAME.key()),
+                pluginConfig.getString(PASSWORD.key()));
 
-        sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE));
-        if (pluginConfig.hasPath(KEY_TIMESTAMP)) {
-            sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP));
+        sinkConfig.setKeyDevice(pluginConfig.getString(KEY_DEVICE.key()));
+        if (pluginConfig.hasPath(KEY_TIMESTAMP.key())) {
+            
sinkConfig.setKeyTimestamp(pluginConfig.getString(KEY_TIMESTAMP.key()));
         }
-        if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS)) {
-            
sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS));
+        if (pluginConfig.hasPath(KEY_MEASUREMENT_FIELDS.key())) {
+            
sinkConfig.setKeyMeasurementFields(pluginConfig.getStringList(KEY_MEASUREMENT_FIELDS.key()));
         }
-        if (pluginConfig.hasPath(STORAGE_GROUP)) {
-            sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP));
+        if (pluginConfig.hasPath(STORAGE_GROUP.key())) {
+            
sinkConfig.setStorageGroup(pluginConfig.getString(STORAGE_GROUP.key()));
         }
-        if (pluginConfig.hasPath(BATCH_SIZE)) {
-            int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE));
+        if (pluginConfig.hasPath(BATCH_SIZE.key())) {
+            int batchSize = 
checkIntArgument(pluginConfig.getInt(BATCH_SIZE.key()));
             sinkConfig.setBatchSize(batchSize);
         }
-        if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) {
-            int batchIntervalMs = 
checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS));
+        if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
+            int batchIntervalMs = 
checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS.key()));
             sinkConfig.setBatchIntervalMs(batchIntervalMs);
         }
-        if (pluginConfig.hasPath(MAX_RETRIES)) {
-            int maxRetries = 
checkIntArgument(pluginConfig.getInt(MAX_RETRIES));
+        if (pluginConfig.hasPath(MAX_RETRIES.key())) {
+            int maxRetries = 
checkIntArgument(pluginConfig.getInt(MAX_RETRIES.key()));
             sinkConfig.setMaxRetries(maxRetries);
         }
-        if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
-            int retryBackoffMultiplierMs = 
checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+        if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
+            int retryBackoffMultiplierMs = 
checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
             sinkConfig.setRetryBackoffMultiplierMs(retryBackoffMultiplierMs);
         }
-        if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS)) {
-            int maxRetryBackoffMs = 
checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS));
+        if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
+            int maxRetryBackoffMs = 
checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
             sinkConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
         }
-        if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE)) {
-            int thriftDefaultBufferSize = 
checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE));
+        if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE.key())) {
+            int thriftDefaultBufferSize = 
checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE.key()));
             sinkConfig.setThriftDefaultBufferSize(thriftDefaultBufferSize);
         }
-        if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE)) {
-            int thriftMaxFrameSize = 
checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE));
+        if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE.key())) {
+            int thriftMaxFrameSize = 
checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE.key()));
             sinkConfig.setThriftMaxFrameSize(thriftMaxFrameSize);
         }
-        if (pluginConfig.hasPath(ZONE_ID)) {
-            sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID)));
+        if (pluginConfig.hasPath(ZONE_ID.key())) {
+            
sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID.key())));
         }
-        if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION)) {
-            
sinkConfig.setEnableRPCCompression(pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION));
+        if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION.key())) {
+            
sinkConfig.setEnableRPCCompression(pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION.key()));
         }
-        if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS)) {
-            int connectionTimeoutInMs = 
checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS));
+        if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS.key())) {
+            int connectionTimeoutInMs = 
checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS.key()));
             checkNotNull(sinkConfig.getEnableRPCCompression());
             sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
         }
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
index 5c952b416..90cc33b8b 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SourceConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
 /**
  * SourceConfig is the configuration for the IotDBSource.
  * <p>
@@ -25,19 +28,19 @@ package 
org.apache.seatunnel.connectors.seatunnel.iotdb.config;
  */
 public class SourceConfig {
 
-    public static final String SQL = "sql";
+    public static final Option<String> SQL = 
Options.key("sql").stringType().noDefaultValue().withDescription("sql");
 
     /*---------------------- single node configurations 
-------------------------*/
 
     /**
      * The host of the IotDB server.
      */
-    public static final String HOST = "host";
+    public static final Option<String> HOST = 
Options.key("host").stringType().noDefaultValue().withDescription("host");
 
     /*
      * The port of the IotDB server.
      */
-    public static final String PORT = "port";
+    public static final Option<Integer> PORT = 
Options.key("port").intType().noDefaultValue().withDescription("port");
 
 
     /*---------------------- multiple node configurations 
-------------------------*/
@@ -45,58 +48,58 @@ public class SourceConfig {
     /**
      * Username for the source.
      */
-    public static final String USERNAME = "username";
+    public static final Option<String> USERNAME = 
Options.key("username").stringType().noDefaultValue().withDescription("usernam");
 
     /**
      * Password for the source.
      */
-    public static final String PASSWORD = "password";
+    public static final Option<String> PASSWORD = 
Options.key("password").stringType().noDefaultValue().withDescription("password");
 
     /**
      * multiple nodes
      */
-    public static final String NODE_URLS = "node_urls";
+    public static final Option<String> NODE_URLS = 
Options.key("node_urls").stringType().noDefaultValue().withDescription("node 
urls");
 
     /*---------------------- other configurations -------------------------*/
 
     /**
      * Fetches the next batch of data from the source.
      */
-    public static final String FETCH_SIZE = "fetch_size";
+    public static final Option<Integer> FETCH_SIZE = 
Options.key("fetch_size").intType().noDefaultValue().withDescription("fetch 
size");
 
     /**
      * thrift default buffer size
      */
-    public static final String THRIFT_DEFAULT_BUFFER_SIZE = 
"thrift_default_buffer_size";
+    public static final Option<Integer> THRIFT_DEFAULT_BUFFER_SIZE = 
Options.key("thrift_default_buffer_size").intType().noDefaultValue().withDescription("
 default thrift buffer size of iot db ");
 
     /**
      * thrift max frame size
      */
-    public static final String THRIFT_MAX_FRAME_SIZE = "thrift_max_frame_size";
+    public static final Option<Integer> THRIFT_MAX_FRAME_SIZE = 
Options.key("thrift_max_frame_size").intType().noDefaultValue().withDescription("thrift
 max frame size ");
 
     /**
      * cassandra default buffer size
      */
-    public static final String ENABLE_CACHE_LEADER = "enable_cache_leader";
+    public static final Option<Boolean> ENABLE_CACHE_LEADER = 
Options.key("enable_cache_leader").booleanType().noDefaultValue().withDescription("enable
 cache leader ");
 
     /**
      * Version represents the SQL semantic version used by the client, which 
is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. 
The possible values are: V_0_12, V_0_13.
      */
-    public static final String VERSION = "version";
+    public static final Option<String> VERSION = 
Options.key("version").stringType().noDefaultValue().withDescription("version");
 
     /**
      * Query lower bound of the time range to be read.
      */
-    public static final String LOWER_BOUND = "lower_bound";
+    public static final Option<Long> LOWER_BOUND = 
Options.key("lower_bound").longType().noDefaultValue().withDescription("low 
bound");
 
     /**
      * Query upper bound of the time range to be read.
      */
-    public static final String UPPER_BOUND = "upper_bound";
+    public static final Option<Long> UPPER_BOUND = 
Options.key("upper_bound").longType().noDefaultValue().withDescription("upper 
bound");
 
     /**
      * Query num partitions to be read.
      */
-    public static final String NUM_PARTITIONS = "num_partitions";
+    public static final Option<Integer> NUM_PARTITIONS = 
Options.key("num_partitions").intType().noDefaultValue().withDescription("num 
partitions");
 
 }
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
index fe00e2d81..aa813b95e 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -51,7 +51,7 @@ public class IoTDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
NODE_URLS, USERNAME, PASSWORD, KEY_DEVICE);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
NODE_URLS.key(), USERNAME.key(), PASSWORD.key(), KEY_DEVICE.key());
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
         }
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java
new file mode 100644
index 000000000..ac2beb0a6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.NODE_URLS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.BATCH_INTERVAL_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.CONNECTION_TIMEOUT_IN_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.DEFAULT_THRIFT_BUFFER_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.ENABLE_RPC_COMPRESSION;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_DEVICE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_MEASUREMENT_FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.KEY_TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.MAX_RETRIES;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.MAX_RETRY_BACKOFF_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.MAX_THRIFT_FRAME_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.STORAGE_GROUP;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.ZONE_ID;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class IoTDBSinkFactory implements TableSinkFactory{
+    @Override
+    public String factoryIdentifier() {
+        return "IoTDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(NODE_URLS, USERNAME, PASSWORD, KEY_DEVICE)
+                .optional(KEY_TIMESTAMP, KEY_MEASUREMENT_FIELDS, 
STORAGE_GROUP, BATCH_SIZE, BATCH_INTERVAL_MS,
+                        MAX_RETRIES, RETRY_BACKOFF_MULTIPLIER_MS, 
MAX_RETRY_BACKOFF_MS, DEFAULT_THRIFT_BUFFER_SIZE,
+                        MAX_THRIFT_FRAME_SIZE, ZONE_ID, 
ENABLE_RPC_COMPRESSION, CONNECTION_TIMEOUT_IN_MS)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
index 592c0739e..003a67fd1 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSource.java
@@ -59,9 +59,9 @@ public class IoTDBSource implements 
SeaTunnelSource<SeaTunnelRow, IoTDBSourceSpl
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HOST, PORT);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HOST.key(), PORT.key());
         if (!result.isSuccess()) {
-            result = CheckConfigUtil.checkAllExists(pluginConfig, NODE_URLS);
+            result = CheckConfigUtil.checkAllExists(pluginConfig, 
NODE_URLS.key());
 
             if (!result.isSuccess()) {
                 throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "host and port and node urls are both empty");
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
new file mode 100644
index 000000000..ee92a50ea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.NODE_URLS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.CommonConfig.USERNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.ENABLE_CACHE_LEADER;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.FETCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.LOWER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.NUM_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.PORT;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.SQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_DEFAULT_BUFFER_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.THRIFT_MAX_FRAME_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.UPPER_BOUND;
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SourceConfig.VERSION;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class IoTDBSourceFactory implements TableSourceFactory{
+    @Override
+    public String factoryIdentifier() {
+        return "IoTDB";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(NODE_URLS, USERNAME, PASSWORD, SQL)
+                .optional(HOST, PORT, FETCH_SIZE, THRIFT_DEFAULT_BUFFER_SIZE, 
THRIFT_MAX_FRAME_SIZE,
+                        ENABLE_CACHE_LEADER, VERSION, LOWER_BOUND, 
UPPER_BOUND, NUM_PARTITIONS)
+                .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
index a7e8aaa8a..28510361a 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceReader.java
@@ -87,7 +87,9 @@ public class IoTDBSourceReader implements 
SourceReader<SeaTunnelRow, IoTDBSource
     public void close() throws IOException {
         //nothing to do
         try {
-            session.close();
+            if (session != null) {
+                session.close();
+            }
         } catch (IoTDBConnectionException e) {
             throw new IOException("close IoTDB session failed", e);
         }
@@ -123,37 +125,36 @@ public class IoTDBSourceReader implements 
SourceReader<SeaTunnelRow, IoTDBSource
 
     private Session buildSession(Map<String, Object> conf) {
         Session.Builder sessionBuilder = new Session.Builder();
-        if (conf.containsKey(HOST)) {
+        if (conf.containsKey(HOST.key())) {
             sessionBuilder
-                    .host((String) conf.get(HOST))
-                    .port(Integer.parseInt(conf.get(PORT).toString()))
+                    .host((String) conf.get(HOST.key()))
+                    .port(Integer.parseInt(conf.get(PORT.key()).toString()))
                     .build();
         } else {
-            String nodeUrlsString = (String) conf.get(NODE_URLS);
-
+            String nodeUrlsString = (String) conf.get(NODE_URLS.key());
             List<String> nodes = 
Stream.of(nodeUrlsString.split(NODES_SPLIT)).collect(Collectors.toList());
             sessionBuilder.nodeUrls(nodes);
         }
-        if (null != conf.get(FETCH_SIZE)) {
-            
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE).toString()));
+        if (null != conf.get(FETCH_SIZE.key())) {
+            
sessionBuilder.fetchSize(Integer.parseInt(conf.get(FETCH_SIZE.key()).toString()));
         }
-        if (null != conf.get(USERNAME)) {
-            sessionBuilder.username((String) conf.get(USERNAME));
+        if (null != conf.get(USERNAME.key())) {
+            sessionBuilder.username((String) conf.get(USERNAME.key()));
         }
-        if (null != conf.get(PASSWORD)) {
-            sessionBuilder.password((String) conf.get(PASSWORD));
+        if (null != conf.get(PASSWORD.key())) {
+            sessionBuilder.password((String) conf.get(PASSWORD.key()));
         }
-        if (null != conf.get(THRIFT_DEFAULT_BUFFER_SIZE)) {
-            
sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(THRIFT_DEFAULT_BUFFER_SIZE).toString()));
+        if (null != conf.get(THRIFT_DEFAULT_BUFFER_SIZE.key())) {
+            
sessionBuilder.thriftDefaultBufferSize(Integer.parseInt(conf.get(THRIFT_DEFAULT_BUFFER_SIZE.key()).toString()));
         }
-        if (null != conf.get(THRIFT_MAX_FRAME_SIZE)) {
-            
sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(THRIFT_MAX_FRAME_SIZE).toString()));
+        if (null != conf.get(THRIFT_MAX_FRAME_SIZE.key())) {
+            
sessionBuilder.thriftMaxFrameSize(Integer.parseInt(conf.get(THRIFT_MAX_FRAME_SIZE.key()).toString()));
         }
-        if (null != conf.get(ENABLE_CACHE_LEADER)) {
-            
sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(ENABLE_CACHE_LEADER).toString()));
+        if (null != conf.get(ENABLE_CACHE_LEADER.key())) {
+            
sessionBuilder.enableCacheLeader(Boolean.parseBoolean(conf.get(ENABLE_CACHE_LEADER.key()).toString()));
         }
-        if (null != conf.get(VERSION)) {
-            Version version = Version.valueOf(conf.get(VERSION).toString());
+        if (null != conf.get(VERSION.key())) {
+            Version version = 
Version.valueOf(conf.get(VERSION.key()).toString());
             sessionBuilder.version(version);
         }
         return sessionBuilder.build();
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
index 5d7a83c59..5820d1d4b 100644
--- 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/source/IoTDBSourceSplitEnumerator.java
@@ -113,16 +113,16 @@ public class IoTDBSourceSplitEnumerator implements 
SourceSplitEnumerator<IoTDBSo
      * split 2: select * from test  where (time >= 6 and time < 11) and (  age 
> 0 and age < 10 )
      */
     private Set<IoTDBSourceSplit> getIotDBSplit() {
-        String sql = conf.get(SQL).toString();
+        String sql = conf.get(SQL.key()).toString();
         Set<IoTDBSourceSplit> iotDBSourceSplits = new HashSet<>();
         // no need numPartitions, use one partition
-        if (!conf.containsKey(NUM_PARTITIONS)) {
+        if (!conf.containsKey(NUM_PARTITIONS.key())) {
             iotDBSourceSplits.add(new IoTDBSourceSplit(DEFAULT_PARTITIONS, 
sql));
             return iotDBSourceSplits;
         }
-        long start = Long.parseLong(conf.get(LOWER_BOUND).toString());
-        long end = Long.parseLong(conf.get(UPPER_BOUND).toString());
-        int numPartitions = 
Integer.parseInt(conf.get(NUM_PARTITIONS).toString());
+        long start = Long.parseLong(conf.get(LOWER_BOUND.key()).toString());
+        long end = Long.parseLong(conf.get(UPPER_BOUND.key()).toString());
+        int numPartitions = 
Integer.parseInt(conf.get(NUM_PARTITIONS.key()).toString());
         String sqlBase = sql;
         String sqlAlign = null;
         String sqlCondition = null;

Reply via email to