This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c1bbbd59d [Feature][Connector-V2] Support IoTDB sink (#2407)
c1bbbd59d is described below

commit c1bbbd59d55b82d93b4dbc3d0aa3450cc2c91b7b
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 17 17:30:41 2022 +0800

    [Feature][Connector-V2] Support IoTDB sink (#2407)
    
    * [Feature][Connector-V2] Support IoTDB sink
    
    
    * update
    
    * update docs
    
    Co-authored-by: wanghailin <[email protected]>
---
 docs/en/connector-v2/sink/IoTDB.md                 | 117 +++++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |  21 ++
 seatunnel-connectors-v2-dist/pom.xml               |   7 +-
 .../connector-iotdb}/pom.xml                       |  18 +-
 .../seatunnel/iotdb/config/CommonConfig.java       |  38 ++++
 .../seatunnel/iotdb/config/SinkConfig.java         | 147 ++++++++++++++
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 177 +++++++++++++++++
 .../seatunnel/iotdb/serialize/IoTDBRecord.java     |  37 ++++
 .../iotdb/serialize/SeaTunnelRowSerializer.java    |  25 +++
 .../connectors/seatunnel/iotdb/sink/IoTDBSink.java |  63 ++++++
 .../seatunnel/iotdb/sink/IoTDBSinkClient.java      | 214 +++++++++++++++++++++
 .../seatunnel/iotdb/sink/IoTDBSinkWriter.java      |  58 ++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   4 +
 .../e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java    | 126 ++++++++++++
 .../test/resources/iotdb/fakesource_to_iotdb.conf  |  59 ++++++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   5 +
 .../e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java    | 125 ++++++++++++
 .../test/resources/iotdb/fakesource_to_iotdb.conf  |  58 ++++++
 20 files changed, 1295 insertions(+), 6 deletions(-)

diff --git a/docs/en/connector-v2/sink/IoTDB.md 
b/docs/en/connector-v2/sink/IoTDB.md
new file mode 100644
index 000000000..3ea624fd5
--- /dev/null
+++ b/docs/en/connector-v2/sink/IoTDB.md
@@ -0,0 +1,117 @@
+# IoTDB
+
+> IoTDB sink connector
+
+## Description
+
+Used to write data to IoTDB. Supports Batch and Streaming mode.
+
+:::tip
+
+There is a conflict of thrift version between IoTDB and Spark.Therefore, you 
need to execute `rm -f $SPARK_HOME/jars/libthrift*` and `cp 
$IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/` to resolve it.
+
+:::
+
+## Options
+
+| name                          | type              | required | default value 
|
+|-------------------------------|-------------------|----------|---------------|
+| node_urls                     | list              | yes      | -             
|
+| username                      | string            | yes      | -             
|
+| password                      | string            | yes      | -             
|
+| batch_size                    | int               | no       | 1024          
|
+| batch_interval_ms             | int               | no       | -             
|
+| max_retries                   | int               | no       | -             
|
+| retry_backoff_multiplier_ms   | int               | no       | -             
|
+| max_retry_backoff_ms          | int               | no       | -             
|
+| default_thrift_buffer_size    | int               | no       | -             
|
+| max_thrift_frame_size         | int               | no       | -             
|
+| zone_id                       | string            | no       | -             
|
+| enable_rpc_compression        | boolean           | no       | -             
|
+| connection_timeout_in_ms      | int               | no       | -             
|
+| timeseries_options            | list              | no       | -             
|
+| timeseries_options.path       | string            | no       | -             
|
+| timeseries_options.data_type  | string            | no       | -             
|
+| common-options                | string            | no       | -             
|
+
+### node_urls [list]
+
+`IoTDB` cluster address, the format is `["host:port", ...]`
+
+### username [string]
+
+`IoTDB` user username
+
+### password [string]
+
+`IoTDB` user password
+
+### batch_size [int]
+
+For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the IoTDB
+
+### batch_interval_ms [int]
+
+For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the IoTDB
+
+### max_retries [int]
+
+The number of retries to flush failed
+
+### retry_backoff_multiplier_ms [int]
+
+Using as a multiplier for generating the next delay for backoff
+
+### max_retry_backoff_ms [int]
+
+The amount of time to wait before attempting to retry a request to `IoTDB`
+
+### default_thrift_buffer_size [int]
+
+Thrift init buffer size in `IoTDB` client
+
+### max_thrift_frame_size [int]
+
+Thrift max frame size in `IoTDB` client
+
+### zone_id [string]
+
+java.time.ZoneId in `IoTDB` client
+
+### enable_rpc_compression [boolean]
+
+Enable rpc compression in `IoTDB` client
+
+### connection_timeout_in_ms [int]
+
+The maximum time (in ms) to wait when connect `IoTDB`
+
+### timeseries_options [list]
+
+Timeseries options
+
+### timeseries_options.path [string]
+
+Timeseries path
+
+### timeseries_options.data_type [string]
+
+Timeseries data type
+
+### common options [string]
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
+
+## Examples
+
+```hocon
+sink {
+  IoTDB {
+    node_urls = ["localhost:6667"]
+    username = "root"
+    password = "root"
+    batch_size = 1024
+    batch_interval_ms = 1000
+  }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 6f0fffb56..28bbf5041 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -115,3 +115,4 @@ seatunnel.source.Pulsar = connector-pulsar
 seatunnel.source.Hudi = connector-hudi
 seatunnel.sink.DingTalk = connector-dingtalk
 seatunnel.sink.elasticsearch = connector-elasticsearch
+seatunnel.sink.IoTDB = connector-iotdb
diff --git a/pom.xml b/pom.xml
index 120c5a73d..70d209999 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,6 +225,8 @@
         <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
         
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
         <checker.qual.version>3.10.0</checker.qual.version>
+        <iotdb.version>0.13.1</iotdb.version>
+        <awaitility.version>4.2.0</awaitility.version>
     </properties>
 
     <dependencyManagement>
@@ -940,6 +942,25 @@
                 <artifactId>checker-qual</artifactId>
                 <version>${checker.qual.version}</version>
             </dependency>
+            
+            <dependency>
+                <groupId>org.apache.iotdb</groupId>
+                <artifactId>iotdb-session</artifactId>
+                <version>${iotdb.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>ch.qos.logback</groupId>
+                        <artifactId>logback-classic</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.awaitility</groupId>
+                <artifactId>awaitility</artifactId>
+                <version>${awaitility.version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/seatunnel-connectors-v2-dist/pom.xml 
b/seatunnel-connectors-v2-dist/pom.xml
index 0f50bba0e..5cb853f27 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -121,11 +121,16 @@
             <artifactId>connector-email</artifactId>
             <version>${project.version}</version>
         </dependency>
-          <dependency>
+        <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-elasticsearch</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-iotdb</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml 
b/seatunnel-connectors-v2/connector-iotdb/pom.xml
similarity index 81%
copy from seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
copy to seatunnel-connectors-v2/connector-iotdb/pom.xml
index 925bbd385..f9635cb83 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-connectors-v2/connector-iotdb/pom.xml
@@ -1,42 +1,50 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
+
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
+
        http://www.apache.org/licenses/LICENSE-2.0
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
+
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0";
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-e2e</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>jar</packaging>
 
-    <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
+    <artifactId>connector-iotdb</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-spark</artifactId>
+            <artifactId>seatunnel-api</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-connectors-v2-dist</artifactId>
+            <artifactId>connector-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
new file mode 100644
index 000000000..c52a743dc
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/CommonConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
+import java.util.List;
+
+@Getter
+@ToString
+@AllArgsConstructor
+public class CommonConfig {
+
+    public static final String NODE_URLS = "node_urls";
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+
+    private final List<String> nodeUrls;
+    private final String username;
+    private final String password;
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
new file mode 100644
index 000000000..b6320da8f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/config/SinkConfig.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.config;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+@Setter
+@Getter
+@ToString
+public class SinkConfig extends CommonConfig {
+
+    public static final String BATCH_SIZE = "batch_size";
+    public static final String BATCH_INTERVAL_MS = "batch_interval_ms";
+    public static final String MAX_RETRIES = "max_retries";
+    public static final String RETRY_BACKOFF_MULTIPLIER_MS = 
"retry_backoff_multiplier_ms";
+    public static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms";
+    public static final String DEFAULT_THRIFT_BUFFER_SIZE = 
"default_thrift_buffer_size";
+    public static final String MAX_THRIFT_FRAME_SIZE = "max_thrift_frame_size";
+    public static final String ZONE_ID = "zone_id";
+    public static final String ENABLE_RPC_COMPRESSION = 
"enable_rpc_compression";
+    public static final String CONNECTION_TIMEOUT_IN_MS = 
"connection_timeout_in_ms";
+    public static final String TIMESERIES_OPTIONS = "timeseries_options";
+    public static final String TIMESERIES_OPTION_PATH = "path";
+    public static final String TIMESERIES_OPTION_DATA_TYPE = "data_type";
+
+    private static final int DEFAULT_BATCH_SIZE = 1024;
+
+    private int batchSize = DEFAULT_BATCH_SIZE;
+    private Integer batchIntervalMs;
+    private int maxRetries;
+    private int retryBackoffMultiplierMs;
+    private int maxRetryBackoffMs;
+    private Integer thriftDefaultBufferSize;
+    private Integer thriftMaxFrameSize;
+    private ZoneId zoneId;
+    private Boolean enableRPCCompression;
+    private Integer connectionTimeoutInMs;
+    private List<TimeseriesOption> timeseriesOptions;
+
+    public SinkConfig(@NonNull List<String> nodeUrls,
+                      @NonNull String username,
+                      @NonNull String password) {
+        super(nodeUrls, username, password);
+    }
+
+    public static SinkConfig loadConfig(Config pluginConfig) {
+        SinkConfig sinkConfig = new SinkConfig(
+                pluginConfig.getStringList(NODE_URLS),
+                pluginConfig.getString(USERNAME),
+                pluginConfig.getString(PASSWORD));
+        if (pluginConfig.hasPath(BATCH_SIZE)) {
+            int batchSize = checkIntArgument(pluginConfig.getInt(BATCH_SIZE));
+            sinkConfig.setBatchSize(batchSize);
+        }
+        if (pluginConfig.hasPath(BATCH_INTERVAL_MS)) {
+            int batchIntervalMs = 
checkIntArgument(pluginConfig.getInt(BATCH_INTERVAL_MS));
+            sinkConfig.setBatchIntervalMs(batchIntervalMs);
+        }
+        if (pluginConfig.hasPath(MAX_RETRIES)) {
+            int maxRetries = 
checkIntArgument(pluginConfig.getInt(MAX_RETRIES));
+            sinkConfig.setMaxRetries(maxRetries);
+        }
+        if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) {
+            int retryBackoffMultiplierMs = 
checkIntArgument(pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS));
+            sinkConfig.setRetryBackoffMultiplierMs(retryBackoffMultiplierMs);
+        }
+        if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS)) {
+            int maxRetryBackoffMs = 
checkIntArgument(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS));
+            sinkConfig.setMaxRetryBackoffMs(maxRetryBackoffMs);
+        }
+        if (pluginConfig.hasPath(DEFAULT_THRIFT_BUFFER_SIZE)) {
+            int thriftDefaultBufferSize = 
checkIntArgument(pluginConfig.getInt(DEFAULT_THRIFT_BUFFER_SIZE));
+            sinkConfig.setThriftDefaultBufferSize(thriftDefaultBufferSize);
+        }
+        if (pluginConfig.hasPath(MAX_THRIFT_FRAME_SIZE)) {
+            int thriftMaxFrameSize = 
checkIntArgument(pluginConfig.getInt(MAX_THRIFT_FRAME_SIZE));
+            sinkConfig.setThriftMaxFrameSize(thriftMaxFrameSize);
+        }
+        if (pluginConfig.hasPath(ZONE_ID)) {
+            sinkConfig.setZoneId(ZoneId.of(pluginConfig.getString(ZONE_ID)));
+        }
+        if (pluginConfig.hasPath(ENABLE_RPC_COMPRESSION)) {
+            
sinkConfig.setEnableRPCCompression(pluginConfig.getBoolean(ENABLE_RPC_COMPRESSION));
+        }
+        if (pluginConfig.hasPath(CONNECTION_TIMEOUT_IN_MS)) {
+            int connectionTimeoutInMs = 
checkIntArgument(pluginConfig.getInt(CONNECTION_TIMEOUT_IN_MS));
+            checkNotNull(sinkConfig.getEnableRPCCompression());
+            sinkConfig.setConnectionTimeoutInMs(connectionTimeoutInMs);
+        }
+        if (pluginConfig.hasPath(TIMESERIES_OPTIONS)) {
+            List<? extends Config> timeseriesConfigs = 
pluginConfig.getConfigList(TIMESERIES_OPTIONS);
+            List<TimeseriesOption> timeseriesOptions = new 
ArrayList<>(timeseriesConfigs.size());
+            for (Config timeseriesConfig : timeseriesConfigs) {
+                String timeseriesPath = 
timeseriesConfig.getString(TIMESERIES_OPTION_PATH);
+                String timeseriesDataType = 
timeseriesConfig.getString(TIMESERIES_OPTION_DATA_TYPE);
+                TimeseriesOption timeseriesOption = new TimeseriesOption(
+                        timeseriesPath, 
TSDataType.valueOf(timeseriesDataType));
+                timeseriesOptions.add(timeseriesOption);
+            }
+            sinkConfig.setTimeseriesOptions(timeseriesOptions);
+        }
+        return sinkConfig;
+    }
+
+    private static int checkIntArgument(int args) {
+        checkArgument(args > 0);
+        return args;
+    }
+
+    @Getter
+    @ToString
+    @AllArgsConstructor
+    public static class TimeseriesOption implements Serializable {
+        private String path;
+        private TSDataType dataType = TSDataType.TEXT;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 000000000..b0ba1e288
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.serialize;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig.TimeseriesOption;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
+
+    private static final String FIELD_DEVICE = "device";
+    private static final String FIELD_TIMESTAMP = "timestamp";
+    private static final String FIELD_MEASUREMENTS = "measurements";
+    private static final String FIELD_TYPES = "types";
+    private static final String FIELD_VALUES = "values";
+    private static final String SEPARATOR = ",";
+
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final Map<String, TimeseriesOption> timeseriesOptionMap;
+    private final Function<SeaTunnelRow, String> deviceExtractor;
+    private final Function<SeaTunnelRow, Long> timestampExtractor;
+    private final Function<SeaTunnelRow, List<String>> measurementsExtractor;
+    private final Function<SeaTunnelRow, List<TSDataType>> typesExtractor;
+
+    public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType,
+                                         List<TimeseriesOption> 
timeseriesOptions) {
+        validateRowTypeSchema(seaTunnelRowType);
+
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.timeseriesOptionMap = Optional.ofNullable(timeseriesOptions)
+                .orElse(Collections.emptyList()).stream()
+                .collect(Collectors.toMap(option -> option.getPath(), option 
-> option));
+
+        final List<String> rowTypeFields = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+        final int deviceIndex = seaTunnelRowType.indexOf(FIELD_DEVICE);
+        this.deviceExtractor = seaTunnelRow -> 
seaTunnelRow.getField(deviceIndex).toString();
+        final int timestampIndex = seaTunnelRowType.indexOf(FIELD_TIMESTAMP);
+        this.timestampExtractor = rowTypeFields.contains(FIELD_TIMESTAMP) ?
+            seaTunnelRow -> 
Long.parseLong(seaTunnelRow.getField(timestampIndex).toString()) :
+            seaTunnelRow -> System.currentTimeMillis();
+        final int measurementsIndex = 
seaTunnelRowType.indexOf(FIELD_MEASUREMENTS);
+        this.measurementsExtractor = seaTunnelRow ->
+                
Arrays.asList(seaTunnelRow.getField(measurementsIndex).toString().split(SEPARATOR));
+        final boolean containsTypesField = rowTypeFields.contains(FIELD_TYPES);
+        final int typesIndex = containsTypesField ? 
seaTunnelRowType.indexOf(FIELD_TYPES) : -1;
+        this.typesExtractor = seaTunnelRow -> {
+            if (!containsTypesField) {
+                return null;
+            }
+            return 
Arrays.stream(seaTunnelRow.getField(typesIndex).toString().split(SEPARATOR))
+                    .map(type -> TSDataType.valueOf(type))
+                    .collect(Collectors.toList());
+        };
+    }
+
+    @Override
+    public IoTDBRecord serialize(SeaTunnelRow seaTunnelRow) {
+        String device = deviceExtractor.apply(seaTunnelRow);
+        Long timestamp = timestampExtractor.apply(seaTunnelRow);
+        List<String> measurements = measurementsExtractor.apply(seaTunnelRow);
+        List<TSDataType> types = typesExtractor.apply(seaTunnelRow);
+        List<Object> values = extractValues(device, measurements, types, 
seaTunnelRow);
+        return new IoTDBRecord(device, timestamp, measurements, types, values);
+    }
+
+    private void validateRowTypeSchema(SeaTunnelRowType seaTunnelRowType) 
throws IllegalArgumentException {
+        List<String> rowTypeFields = 
Lists.newArrayList(seaTunnelRowType.getFieldNames());
+        checkArgument(rowTypeFields.contains(FIELD_DEVICE));
+        checkArgument(rowTypeFields.contains(FIELD_MEASUREMENTS));
+        checkArgument(rowTypeFields.contains(FIELD_VALUES));
+
+        rowTypeFields.remove(FIELD_DEVICE);
+        rowTypeFields.remove(FIELD_TIMESTAMP);
+        rowTypeFields.remove(FIELD_MEASUREMENTS);
+        rowTypeFields.remove(FIELD_TYPES);
+        rowTypeFields.remove(FIELD_VALUES);
+        checkArgument(rowTypeFields.isEmpty(),
+                "Illegal SeaTunnelRowType fields: " + rowTypeFields);
+    }
+
+    private List<Object> extractValues(String device,
+                                       List<String> measurements,
+                                       List<TSDataType> tsDataTypes,
+                                       SeaTunnelRow seaTunnelRow) {
+        int valuesIndex = seaTunnelRowType.indexOf(FIELD_VALUES);
+        String[] valuesStr = StringUtils.trim(
+                
seaTunnelRow.getField(valuesIndex).toString()).split(SEPARATOR);
+        if (tsDataTypes == null || tsDataTypes.isEmpty()) {
+            convertTextValues(device, measurements, valuesStr);
+            return Arrays.asList(valuesStr);
+        }
+
+        List<Object> values = new ArrayList<>();
+        for (int i = 0; i < valuesStr.length; i++) {
+            TSDataType tsDataType = tsDataTypes.get(i);
+            switch (tsDataType) {
+                case INT32:
+                    values.add(Integer.valueOf(valuesStr[i]));
+                    break;
+                case INT64:
+                    values.add(Long.valueOf(valuesStr[i]));
+                    break;
+                case FLOAT:
+                    values.add(Float.valueOf(valuesStr[i]));
+                    break;
+                case DOUBLE:
+                    values.add(Double.valueOf(valuesStr[i]));
+                    break;
+                case BOOLEAN:
+                    values.add(Boolean.valueOf(valuesStr[i]));
+                    break;
+                case TEXT:
+                    String value = valuesStr[i];
+                    if (!value.startsWith("\"") && !value.startsWith("'")) {
+                        value = convertToTextValue(value);
+                    }
+                    values.add(value);
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Unsupported 
dataType: " + tsDataType);
+            }
+        }
+        return values;
+    }
+
+    private void convertTextValues(String device, List<String> measurements, 
String[] values) {
+        if (device != null
+                && measurements != null
+                && values != null
+                && !timeseriesOptionMap.isEmpty()
+                && measurements.size() == values.length) {
+            for (int i = 0; i < measurements.size(); i++) {
+                String measurement = device + TsFileConstant.PATH_SEPARATOR + 
measurements.get(i);
+                TimeseriesOption timeseriesOption = 
timeseriesOptionMap.get(measurement);
+                if (timeseriesOption != null && 
TSDataType.TEXT.equals(timeseriesOption.getDataType())) {
+                    // The TEXT data type should be covered by " or '
+                    values[i] = convertToTextValue(values[i]);
+                }
+            }
+        }
+    }
+
+    private String convertToTextValue(Object value) {
+        return "'" + value + "'";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/IoTDBRecord.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/IoTDBRecord.java
new file mode 100644
index 000000000..4b5c06325
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/IoTDBRecord.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.serialize;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+
+@Getter
+@ToString
+@AllArgsConstructor
+public class IoTDBRecord {
+
+    private String device;
+    private Long timestamp;
+    private List<String> measurements;
+    private List<TSDataType> types;
+    private List<Object> values;
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..0af7b496b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public interface SeaTunnelRowSerializer {
+
+    IoTDBRecord serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
new file mode 100644
index 000000000..6ba43079b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class IoTDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private Config pluginConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+
+    @Override
+    public String getPluginName() {
+        return "IoTDB";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        this.pluginConfig = pluginConfig;
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
+        return new IoTDBSinkWriter(pluginConfig, seaTunnelRowType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java
new file mode 100644
index 000000000..c56204c38
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkClient.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.IoTDBRecord;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class IoTDBSinkClient {
+
+    private final SinkConfig sinkConfig;
+    private final List<IoTDBRecord> batchList;
+
+    private Session session;
+    private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> scheduledFuture;
+    private volatile boolean initialize;
+    private volatile Exception flushException;
+
+    public IoTDBSinkClient(SinkConfig sinkConfig) {
+        this.sinkConfig = sinkConfig;
+        this.batchList = new ArrayList<>();
+    }
+
+    private void tryInit() throws IOException {
+        if (initialize) {
+            return;
+        }
+
+        Session.Builder sessionBuilder = new Session.Builder()
+                .nodeUrls(sinkConfig.getNodeUrls())
+                .username(sinkConfig.getUsername())
+                .password(sinkConfig.getPassword());
+        if (sinkConfig.getThriftDefaultBufferSize() != null) {
+            
sessionBuilder.thriftDefaultBufferSize(sinkConfig.getThriftDefaultBufferSize());
+        }
+        if (sinkConfig.getThriftMaxFrameSize() != null) {
+            
sessionBuilder.thriftMaxFrameSize(sinkConfig.getThriftMaxFrameSize());
+        }
+        if (sinkConfig.getZoneId() != null) {
+            sessionBuilder.zoneId(sinkConfig.getZoneId());
+        }
+
+        session = sessionBuilder.build();
+        try {
+            if (sinkConfig.getConnectionTimeoutInMs() != null) {
+                session.open(sinkConfig.getEnableRPCCompression(), 
sinkConfig.getConnectionTimeoutInMs());
+            } else if (sinkConfig.getEnableRPCCompression() != null) {
+                session.open(sinkConfig.getEnableRPCCompression());
+            } else {
+                session.open();
+            }
+        } catch (IoTDBConnectionException e) {
+            log.error("Initialize IoTDB client failed.", e);
+            throw new IOException(e);
+        }
+
+        if (sinkConfig.getBatchIntervalMs() != null) {
+            scheduler = Executors.newSingleThreadScheduledExecutor(
+                    new 
ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build());
+            scheduledFuture = scheduler.scheduleAtFixedRate(
+                () -> {
+                    try {
+                        flush();
+                    } catch (IOException e) {
+                        flushException = e;
+                    }
+                },
+                    sinkConfig.getBatchIntervalMs(),
+                    sinkConfig.getBatchIntervalMs(),
+                    TimeUnit.MILLISECONDS);
+        }
+        initialize = true;
+    }
+
+    public synchronized void write(IoTDBRecord record) throws IOException {
+        tryInit();
+        checkFlushException();
+
+        batchList.add(record);
+        if (sinkConfig.getBatchSize() > 0
+                && batchList.size() >= sinkConfig.getBatchSize()) {
+            flush();
+        }
+    }
+
+    public synchronized void close() throws IOException {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduler.shutdown();
+        }
+
+        flush();
+
+        try {
+            session.close();
+        } catch (IoTDBConnectionException e) {
+            log.error("Close IoTDB client failed.", e);
+            throw new IOException("Close IoTDB client failed.", e);
+        }
+    }
+
+    private synchronized void flush() throws IOException {
+        checkFlushException();
+        if (batchList.isEmpty()) {
+            return;
+        }
+
+        BatchRecords batchRecords = new BatchRecords(batchList);
+        for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
+            try {
+                if (batchRecords.getTypesList().isEmpty()) {
+                    session.insertRecords(batchRecords.getDeviceIds(),
+                            batchRecords.getTimestamps(),
+                            batchRecords.getMeasurementsList(),
+                            batchRecords.getStringValuesList());
+                } else {
+                    session.insertRecords(batchRecords.getDeviceIds(),
+                            batchRecords.getTimestamps(),
+                            batchRecords.getMeasurementsList(),
+                            batchRecords.getTypesList(),
+                            batchRecords.getValuesList());
+                }
+            } catch (IoTDBConnectionException | StatementExecutionException e) 
{
+                log.error("Writing records to IoTDB failed, retry times = {}", 
i, e);
+                if (i >= sinkConfig.getMaxRetries()) {
+                    throw new IOException("Writing records to IoTDB failed.", 
e);
+                }
+
+                try {
+                    long backoff = 
Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
+                            sinkConfig.getMaxRetryBackoffMs());
+                    Thread.sleep(backoff);
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                    throw new IOException(
+                            "Unable to flush; interrupted while doing another 
attempt.", e);
+                }
+            }
+        }
+
+        batchList.clear();
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to IoTDB failed.", 
flushException);
+        }
+    }
+
+    @Getter
+    private class BatchRecords {
+        private List<String> deviceIds;
+        private List<Long> timestamps;
+        private List<List<String>> measurementsList;
+        private List<List<TSDataType>> typesList;
+        private List<List<Object>> valuesList;
+
+        public BatchRecords(List<IoTDBRecord> batchList) {
+            int batchSize = batchList.size();
+            this.deviceIds = new ArrayList<>(batchSize);
+            this.timestamps = new ArrayList<>(batchSize);
+            this.measurementsList = new ArrayList<>(batchSize);
+            this.typesList = new ArrayList<>(batchSize);
+            this.valuesList = new ArrayList<>(batchSize);
+
+            for (IoTDBRecord record : batchList) {
+                deviceIds.add(record.getDevice());
+                timestamps.add(record.getTimestamp());
+                measurementsList.add(record.getMeasurements());
+                if (record.getTypes() != null && !record.getTypes().isEmpty()) 
{
+                    typesList.add(record.getTypes());
+                }
+                valuesList.add(record.getValues());
+            }
+        }
+
+        private List<List<String>> getStringValuesList() {
+            List<?> tmp = valuesList;
+            return (List<List<String>>) tmp;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java
 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java
new file mode 100644
index 000000000..fc37c5434
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iotdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/iotdb/sink/IoTDBSinkWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.IoTDBRecord;
+import 
org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.SeaTunnelRowSerializer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class IoTDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final SeaTunnelRowSerializer serializer;
+    private final IoTDBSinkClient sinkClient;
+
+    public IoTDBSinkWriter(Config pluginConfig,
+                           SeaTunnelRowType seaTunnelRowType) {
+        SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig);
+        this.serializer = new DefaultSeaTunnelRowSerializer(
+                seaTunnelRowType, sinkConfig.getTimeseriesOptions());
+        this.sinkClient = new IoTDBSinkClient(sinkConfig);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        IoTDBRecord record = serializer.serialize(element);
+        sinkClient.write(record);
+    }
+
+    @Override
+    public void close() throws IOException {
+        sinkClient.close();
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 083bae888..ed84c6dce 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -48,6 +48,7 @@
         <module>connector-email</module>
         <module>connector-dingtalk</module>
         <module>connector-elasticsearch</module>
+        <module>connector-iotdb</module>
     </modules>
 
     <dependencies>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index b4c2f6081..ffde5c595 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -55,6 +55,10 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java
new file mode 100644
index 000000000..b126831cc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.iotdb;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoTDBIT extends FlinkContainer {
+
+    private static final String IOTDB_DOCKER_IMAGE = 
"apache/iotdb:0.13.1-node";
+    private static final String IOTDB_HOST = "flink_e2e_iotdb_sink";
+    private static final int IOTDB_PORT = 6667;
+    private static final String IOTDB_USERNAME = "root";
+    private static final String IOTDB_PASSWORD = "root";
+
+    private GenericContainer<?> iotdbServer;
+    private Session session;
+
+    @BeforeEach
+    public void startIoTDBContainer() throws Exception {
+        iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(IOTDB_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        iotdbServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:6667", IOTDB_PORT)));
+        Startables.deepStart(Stream.of(iotdbServer)).join();
+        log.info("IoTDB container started");
+        // wait for IoTDB fully start
+        session = createSession();
+        given().ignoreExceptions()
+                .await()
+                .atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> session.open());
+        initIoTDBTimeseries();
+    }
+
+    /**
+     * fake source -> IoTDB sink
+     */
+    @Test
+    public void testFakeSourceToIoTDB() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/iotdb/fakesource_to_iotdb.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // query result
+        SessionDataSet dataSet = session.executeQueryStatement("select status, 
value from root.ln.d1");
+        List<Object> actual = new ArrayList<>();
+        while (dataSet.hasNext()) {
+            RowRecord row = dataSet.next();
+            List<Field> fields = row.getFields();
+            Field status = fields.get(0);
+            Field val = fields.get(1);
+            actual.add(Arrays.asList(status.getBoolV(), val.getLongV()));
+        }
+        List<Object> expected = Arrays.asList(
+                Arrays.asList(Boolean.TRUE, Long.valueOf(1001)),
+                Arrays.asList(Boolean.FALSE, Long.valueOf(1002)));
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    private Session createSession() {
+        return new Session.Builder()
+                .host("localhost")
+                .port(IOTDB_PORT)
+                .username(IOTDB_USERNAME)
+                .password(IOTDB_PASSWORD)
+                .build();
+    }
+
+    private void initIoTDBTimeseries() throws Exception {
+        session.setStorageGroup("root.ln");
+        session.createTimeseries("root.ln.d1.status",
+                TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
+        session.createTimeseries("root.ln.d1.value",
+                TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    }
+
+    @AfterEach
+    public void closeIoTDBContainer() {
+        if (iotdbServer != null) {
+            iotdbServer.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
new file mode 100644
index 000000000..da1ae4936
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    # You can set flink configuration here
+    execution.parallelism = 1
+    job.mode = "BATCH"
+    #execution.checkpoint.interval = 10000
+    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
+    # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+        sql = "select * from (values('root.ln.d1', '1660147200000', 
'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value', 
'false,1002')) t (device, `timestamp`, measurements, `values`)"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    IoTDB {
+        node_urls = ["flink_e2e_iotdb_sink:6667"]
+        username = "root"
+        password = "root"
+        batch_size = 1
+        batch_interval_ms = 10
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of sink plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/sink/IoTDB
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 925bbd385..4da27bcf0 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -37,6 +37,11 @@
             <artifactId>seatunnel-connectors-v2-dist</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java
new file mode 100644
index 000000000..66b044fb0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.iotdb;
+
+import static org.awaitility.Awaitility.given;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.SessionDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class FakeSourceToIoTDBIT extends SparkContainer {
+
+    private static final String IOTDB_DOCKER_IMAGE = 
"apache/iotdb:0.13.1-node";
+    private static final String IOTDB_HOST = "spark_e2e_iotdb_sink";
+    private static final int IOTDB_PORT = 6668;
+    private static final String IOTDB_USERNAME = "root";
+    private static final String IOTDB_PASSWORD = "root";
+
+    private GenericContainer<?> iotdbServer;
+    private Session session;
+
+    @BeforeEach
+    public void startIoTDBContainer() throws Exception {
+        iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE)
+                .withNetwork(NETWORK)
+                .withNetworkAliases(IOTDB_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));
+        iotdbServer.setPortBindings(Lists.newArrayList(
+                String.format("%s:6667", IOTDB_PORT)));
+        Startables.deepStart(Stream.of(iotdbServer)).join();
+        log.info("IoTDB container started");
+        // wait for IoTDB fully start
+        session = createSession();
+        given().ignoreExceptions()
+                .await()
+                .atMost(30, TimeUnit.SECONDS)
+                .untilAsserted(() -> session.open());
+        initIoTDBTimeseries();
+    }
+
+    /**
+     * fake source -> IoTDB sink
+     */
+    //@Test
+    public void testFakeSourceToIoTDB() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/iotdb/fakesource_to_iotdb.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        // query result
+        SessionDataSet dataSet = session.executeQueryStatement("select status, 
value from root.ln.d1");
+        List<Object> actual = new ArrayList<>();
+        while (dataSet.hasNext()) {
+            RowRecord row = dataSet.next();
+            List<Field> fields = row.getFields();
+            Field status = fields.get(0);
+            Field val = fields.get(1);
+            actual.add(Arrays.asList(status.getBoolV(), val.getLongV()));
+        }
+        List<Object> expected = Arrays.asList(
+                Arrays.asList(Boolean.TRUE, Long.valueOf(1001)),
+                Arrays.asList(Boolean.FALSE, Long.valueOf(1002)));
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    private Session createSession() {
+        return new Session.Builder()
+                .host("localhost")
+                .port(IOTDB_PORT)
+                .username(IOTDB_USERNAME)
+                .password(IOTDB_PASSWORD)
+                .build();
+    }
+
+    private void initIoTDBTimeseries() throws Exception {
+        session.setStorageGroup("root.ln");
+        session.createTimeseries("root.ln.d1.status",
+                TSDataType.BOOLEAN, TSEncoding.PLAIN, CompressionType.SNAPPY);
+        session.createTimeseries("root.ln.d1.value",
+                TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY);
+    }
+
+    @AfterEach
+    public void closeIoTDBContainer() {
+        if (iotdbServer != null) {
+            iotdbServer.stop();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
new file mode 100644
index 000000000..9c7e521b7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        result_table_name = "fake"
+        field_name = "name, age"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of source plugins,
+    # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+transform {
+    sql {
+        sql = "select * from (values('root.ln.d1', '1660147200000', 
'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value', 
'false,1002')) t (device, `timestamp`, measurements, `values`)"
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    IoTDB {
+        node_urls = ["spark_e2e_iotdb_sink:6668"]
+        username = "root"
+        password = "root"
+        batch_size = 1
+        batch_interval_ms = 10
+    }
+
+    # If you would like to get more information about how to configure 
seatunnel and see full list of sink plugins,
+    # please go to https://seatunnel.apache.org/docs/connector-v2/sink/IoTDB
+}
\ No newline at end of file

Reply via email to