This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ebebf0b63 [Feature][connector-v2] add tablestore source and sink  
(#3309)
ebebf0b63 is described below

commit ebebf0b63360e9eababda42da4ddd62b65d2599b
Author: liugddx <[email protected]>
AuthorDate: Tue Nov 15 10:27:41 2022 +0800

    [Feature][connector-v2] add tablestore source and sink  (#3309)
---
 docs/en/connector-v2/sink/Tablestore.md            |  74 ++++++++++
 docs/en/connector-v2/source/Jdbc.md                |  25 ++--
 plugin-mapping.properties                          |   3 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  20 ++-
 .../jdbc/internal/dialect/JdbcDialect.java         |   7 +
 .../dialect/tablestore/TablestoreDialect.java      |  58 ++++++++
 .../tablestore/TablestoreDialectFactory.java       |  40 ++++++
 .../tablestore/TablestoreJdbcRowConverter.java     |  39 +++++
 .../dialect/tablestore/TablestoreTypeMapper.java   |  78 ++++++++++
 .../seatunnel/jdbc/source/JdbcSource.java          |   4 +-
 .../connector-tablestore/pom.xml                   |  49 +++++++
 .../tablestore/config/TablestoreConfig.java        |  58 ++++++++
 .../tablestore/config/TablestoreOptions.java       |  66 +++++++++
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 158 +++++++++++++++++++++
 .../serialize/SeaTunnelRowSerializer.java          |  27 ++++
 .../seatunnel/tablestore/sink/TablestoreSink.java  |  81 +++++++++++
 .../tablestore/sink/TablestoreSinkClient.java      | 127 +++++++++++++++++
 .../tablestore/sink/TablestoreSinkFactory.java     |  50 +++++++
 .../tablestore/sink/TablestoreWriter.java          |  48 +++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   8 +-
 21 files changed, 1004 insertions(+), 17 deletions(-)

diff --git a/docs/en/connector-v2/sink/Tablestore.md 
b/docs/en/connector-v2/sink/Tablestore.md
new file mode 100644
index 000000000..15ca34eda
--- /dev/null
+++ b/docs/en/connector-v2/sink/Tablestore.md
@@ -0,0 +1,74 @@
+# Tablestore
+
+> Tablestore sink connector
+
+## Description
+
+Write data to `Tablestore`
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name             | type   | required | default value |
+|----------------- | ------ |----------| ------------- |
+| end_point        | string | yes      | -             |
+| instance_name    | string | yes      | -             |
+| access_key_id    | string | yes      | -             |
+| access_key_secret| string | yes      | -             |
+| table            | string | yes      | -             |
+| primary_keys     | array  | yes      | -             |
+| batch_size       | string | no       | 25            |
+| batch_interval_ms| string | no       | 1000          |
+| common-options   | config | no       | -             |
+
+### end_point [string]
+
+endPoint to write to Tablestore.
+
+### instanceName [string]
+
+The instanceName of Tablestore.
+
+### access_key_id [string]
+
+The access id of Tablestore.
+
+### access_key_secret [string]
+
+The access secret of Tablestore.
+
+### table [string]
+
+The table of Tablestore.
+
+### primaryKeys [array]
+
+The primaryKeys of Tablestore.
+
+### common options [ config ]
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details.
+
+## Example
+
+```bash
+Tablestore {
+    end_point = "xxxx"
+    instance_name = "xxxx"
+    access_key_id = "xxxx"
+    access_key_secret = "xxxx"
+    table = "sink"
+    primary_keys = ["pk_1","pk_2","pk_3","pk_4"]
+  }
+```
+
+## Changelog
+
+### next version
+
+- Add Tablestore Sink Connector
+
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 3e8d91806..92b40b489 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -90,17 +90,18 @@ in parallel according to the concurrency of tasks.
 
 there are some reference value for params above.
 
-| datasource | driver                                       | url              
                                                  | maven                       
                                                                                
|
-|------------|----------------------------------------------|--------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
-| mysql      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
-| postgresql | org.postgresql.Driver                        | 
jdbc:postgresql://localhost:5432/postgres                          | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                            |
-| dm         | dm.jdbc.driver.DmDriver                      | 
jdbc:dm://localhost:5236                                           | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                            |
-| phoenix    | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                        |
-| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:microsoft:sqlserver://localhost:1433                          | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                            |
-| oracle     | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                            | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                            |
-| gbase8a    | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | 
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
 |
-| starrocks  | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
-| db2        | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
+| datasource | driver                                                          
       | url                                                                    
                              | maven                                           
                                                            |
+|------------|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| mysql      | com.mysql.cj.jdbc.Driver                                        
       | jdbc:mysql://localhost:3306/test                                       
                              | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
+| postgresql | org.postgresql.Driver                                           
       | jdbc:postgresql://localhost:5432/postgres                              
                              | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                            |
+| dm         | dm.jdbc.driver.DmDriver                                         
       | jdbc:dm://localhost:5236                                               
                              | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                            |
+| phoenix    | org.apache.phoenix.queryserver.client.Driver                    
       | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF     
                              | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                        |
+| sqlserver  | com.microsoft.sqlserver.jdbc.SQLServerDriver                    
       | jdbc:microsoft:sqlserver://localhost:1433                              
                              | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                            |
+| oracle     | oracle.jdbc.OracleDriver                                        
       | jdbc:oracle:thin:@localhost:1521/xepdb1                                
                              | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                            |
+| gbase8a    | com.gbase.jdbc.Driver                                           
       | jdbc:gbase://e2e_gbase8aDb:5258/test                                   
                              | 
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
 |
+| starrocks  | com.mysql.cj.jdbc.Driver                                        
       | jdbc:mysql://localhost:3306/test                                       
                              | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                            |
+| db2        | com.ibm.db2.jcc.DB2Driver                                       
       | jdbc:db2://localhost:50000/testdb                                      
                              | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                            |
+| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver             
       | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" 
                               | 
https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc      
                            |
 
 ## Example
 
@@ -145,6 +146,8 @@ parallel:
 - [Feature] Support StarRocks JDBC Source 
([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
 - [Feature] Support GBase8a JDBC Source 
([3026](https://github.com/apache/incubator-seatunnel/pull/3026))
 - [Feature] Support DB2 JDBC Source 
([2410](https://github.com/apache/incubator-seatunnel/pull/2410))
+
 ### next version
 
 - [BugFix] Fix jdbc split bug 
([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
+- [Feature] Support Tablestore Source 
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 6863ef37e..fdb10982e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -144,4 +144,5 @@ seatunnel.sink.Cassandra = connector-cassandra
 seatunnel.sink.StarRocks = connector-starrocks
 seatunnel.source.MyHours = connector-http-myhours
 seatunnel.sink.InfluxDB = connector-influxdb
-seatunnel.source.GoogleSheets = connector-google-sheets
\ No newline at end of file
+seatunnel.source.GoogleSheets = connector-google-sheets
+seatunnel.sink.Tablestore = connector-tablestore
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index c04b9c9bd..cd4a5f221 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -37,6 +37,7 @@
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
         <oracle.version>12.2.0.1</oracle.version>
         <db2.version>db2jcc4</db2.version>
+        <tablestore.version>5.13.9</tablestore.version>
     </properties>
 
     <dependencyManagement>
@@ -84,11 +85,24 @@
                 <scope>provided</scope>
             </dependency>
 
+            <dependency>
+                <groupId>com.aliyun.openservices</groupId>
+                <artifactId>tablestore-jdbc</artifactId>
+                <version>${tablestore.version}</version>
+                <scope>provided</scope>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
     <dependencies>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
@@ -119,6 +133,10 @@
             <groupId>com.ibm.db2.jcc</groupId>
             <artifactId>db2jcc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>tablestore-jdbc</artifactId>
+        </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 12fc2cd8c..150e8e9bd 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,12 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 
 import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 
 /**
@@ -67,4 +69,9 @@ public interface JdbcDialect extends Serializable {
         return statement;
     }
 
+    default ResultSetMetaData getResultSetMetaData(Connection conn, 
JdbcSourceOptions jdbcSourceOptions) throws SQLException {
+        PreparedStatement ps = 
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
+        return ps.getMetaData();
+    }
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
new file mode 100644
index 000000000..7dfa1888e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class TablestoreDialect implements JdbcDialect {
+    @Override
+    public String dialectName() {
+        return "Tablestore";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new TablestoreJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new TablestoreTypeMapper();
+    }
+
+    @Override
+    public PreparedStatement creatPreparedStatement(Connection connection, 
String queryTemplate, int fetchSize) throws SQLException {
+        PreparedStatement statement = 
connection.prepareStatement(queryTemplate);
+        statement.setFetchSize(fetchSize);
+        return statement;
+    }
+
+    @Override
+    public ResultSetMetaData getResultSetMetaData(Connection conn, 
JdbcSourceOptions jdbcSourceOptions) throws SQLException {
+        PreparedStatement ps = 
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
+        return ps.executeQuery().getMetaData();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
new file mode 100644
index 000000000..995cbb6f3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link TablestoreDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class TablestoreDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:ots:https:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new TablestoreDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
new file mode 100644
index 000000000..81e2df6b9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class TablestoreJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "Tablestore";
+    }
+
+    @Override
+    public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, 
SeaTunnelRowType typeInfo) throws SQLException {
+        return super.toInternal(rs, metaData, typeInfo);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
new file mode 100644
index 000000000..56b8fa507
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class TablestoreTypeMapper implements JdbcDialectTypeMapper {
+
+
+    // ============================data types=====================
+
+    private static final String TABLESTORE_UNKNOWN = "UNKNOWN";
+
+    private static final String TABLESTORE_BOOL = "BOOL";
+
+    // -------------------------number----------------------------
+    private static final String TABLESTORE_BIGINT = "BIGINT";
+    private static final String TABLESTORE_DOUBLE = "DOUBLE";
+    // -------------------------string----------------------------
+    private static final String TABLESTORE_VARCHAR = "VARCHAR";
+    private static final String TABLESTORE_MEDIUMTEXT = "MEDIUMTEXT";
+
+    // ------------------------------blob-------------------------
+    private static final String TABLESTORE_VARBINARY = "VARBINARY";
+    private static final String TABLESTORE_MEDIUMBLOB = "MEDIUMBLOB";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex) throws SQLException {
+        String tablestoreServerType = 
metadata.getColumnTypeName(colIndex).toUpperCase();
+        switch (tablestoreServerType) {
+            case TABLESTORE_BOOL:
+                return BasicType.BOOLEAN_TYPE;
+            case TABLESTORE_BIGINT:
+                return BasicType.LONG_TYPE;
+            case TABLESTORE_DOUBLE:
+                return BasicType.DOUBLE_TYPE;
+            case TABLESTORE_VARCHAR:
+            case TABLESTORE_MEDIUMTEXT:
+                return BasicType.STRING_TYPE;
+            case TABLESTORE_VARBINARY:
+            case TABLESTORE_MEDIUMBLOB:
+                return PrimitiveByteArrayType.INSTANCE;
+            //Doesn't support yet
+            case TABLESTORE_UNKNOWN:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new UnsupportedOperationException(
+                    String.format(
+                        "Doesn't support TABLESTORE type '%s' on column '%s'  
yet.",
+                        tablestoreServerType, jdbcColumnName));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 2fd6f730d..918ea1052 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -44,7 +44,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -129,8 +128,7 @@ public class JdbcSource implements 
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
         ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
         ArrayList<String> fieldNames = new ArrayList<>();
         try {
-            PreparedStatement ps = 
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
-            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+            ResultSetMetaData resultSetMetaData = 
jdbcDialect.getResultSetMetaData(conn, jdbcSourceOptions);
             for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
                 fieldNames.add(resultSetMetaData.getColumnName(i));
                 
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
diff --git a/seatunnel-connectors-v2/connector-tablestore/pom.xml 
b/seatunnel-connectors-v2/connector-tablestore/pom.xml
new file mode 100644
index 000000000..e12a0c72d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tablestore/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-tablestore</artifactId>
+
+    <properties>
+        <tablestore.version>5.13.9</tablestore.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>tablestore</artifactId>
+            <version>${tablestore.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
new file mode 100644
index 000000000..4a79197c6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class TablestoreConfig implements Serializable {
+    public static final Option<String> END_POINT = Options.key("end_point")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(" Tablestore end_point");
+    public static final Option<String> INSTANCE_NAME = 
Options.key("instance_name")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(" Tablestore instance_name");
+    public static final Option<String> ACCESS_KEY_ID = 
Options.key("access_key_id")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(" Tablestore access_key_id");
+    public static final Option<String> ACCESS_KEY_SECRET = 
Options.key("access_key_secret")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(" Tablestore access_key_secret");
+    public static final Option<String> TABLE = Options.key("table")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(" Tablestore table");
+    public static final Option<String> BATCH_SIZE = Options.key("batch_size")
+        .stringType()
+        .defaultValue("25")
+        .withDescription(" Tablestore batch_size");
+    public static final Option<String> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms")
+        .stringType()
+        .defaultValue("1000")
+        .withDescription(" Tablestore batch_interval_ms");
+    public static final Option<String> PRIMARY_KEYS = 
Options.key("primary_keys")
+        .stringType()
+        .noDefaultValue()
+        .withDescription(" Tablestore primary_keys");
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
new file mode 100644
index 000000000..7faa8a688
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class TablestoreOptions implements Serializable {
+
+
+    private String endpoint;
+
+    private String instanceName;
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String table;
+
+    private List<String> primaryKeys;
+
+    public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
+    public int batchIntervalMs = 
Integer.parseInt(BATCH_INTERVAL_MS.defaultValue());
+
+    public TablestoreOptions(Config config) {
+        this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
+        this.instanceName = 
config.getString(TablestoreConfig.INSTANCE_NAME.key());
+        this.accessKeyId = 
config.getString(TablestoreConfig.ACCESS_KEY_ID.key());
+        this.accessKeySecret = 
config.getString(TablestoreConfig.ACCESS_KEY_SECRET.key());
+        this.table = config.getString(TablestoreConfig.TABLE.key());
+        this.primaryKeys = 
config.getStringList(TablestoreConfig.PRIMARY_KEYS.key());
+
+        if (config.hasPath(BATCH_SIZE.key())) {
+            this.batchSize = config.getInt(BATCH_SIZE.key());
+        }
+        if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) {
+            this.batchIntervalMs = 
config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 000000000..0f3e7f3e2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import com.alicloud.openservices.tablestore.model.Column;
+import com.alicloud.openservices.tablestore.model.ColumnType;
+import com.alicloud.openservices.tablestore.model.ColumnValue;
+import com.alicloud.openservices.tablestore.model.Condition;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyType;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyValue;
+import com.alicloud.openservices.tablestore.model.RowExistenceExpectation;
+import com.alicloud.openservices.tablestore.model.RowPutChange;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
+
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final TablestoreOptions tablestoreOptions;
+
+    public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType, 
TablestoreOptions tablestoreOptions) {
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tablestoreOptions = tablestoreOptions;
+    }
+
+    @Override
+    public RowPutChange serialize(SeaTunnelRow seaTunnelRow) {
+
+        PrimaryKeyBuilder primaryKeyBuilder = 
PrimaryKeyBuilder.createPrimaryKeyBuilder();
+        List<Column> columns = new ArrayList<>(seaTunnelRow.getFields().length 
- tablestoreOptions.getPrimaryKeys().size());
+        Arrays.stream(seaTunnelRowType.getFieldNames()).forEach(fieldName -> {
+            Object field = 
seaTunnelRow.getField(seaTunnelRowType.indexOf(fieldName));
+            int index = seaTunnelRowType.indexOf(fieldName);
+            if (tablestoreOptions.getPrimaryKeys().contains(fieldName)) {
+                primaryKeyBuilder.addPrimaryKeyColumn(
+                    this.convertPrimaryKeyColumn(fieldName, field,
+                        
this.convertPrimaryKeyType(seaTunnelRowType.getFieldType(index))));
+            } else {
+                columns.add(this.convertColumn(fieldName, field,
+                    
this.convertColumnType(seaTunnelRowType.getFieldType(index))));
+            }
+        });
+        RowPutChange rowPutChange = new 
RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build());
+        rowPutChange.setCondition(new 
Condition(RowExistenceExpectation.IGNORE));
+        columns.forEach(rowPutChange::addColumn);
+
+        return rowPutChange;
+    }
+
+    private ColumnType convertColumnType(SeaTunnelDataType<?> 
seaTunnelDataType) {
+        switch (seaTunnelDataType.getSqlType()) {
+            case INT:
+            case TINYINT:
+            case SMALLINT:
+            case BIGINT:
+                return ColumnType.INTEGER;
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+                return ColumnType.DOUBLE;
+            case STRING:
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                return ColumnType.STRING;
+            case BOOLEAN:
+                return ColumnType.BOOLEAN;
+            case BYTES:
+                return ColumnType.BINARY;
+            default:
+                throw new UnsupportedOperationException("Unsupported 
columnType: " + seaTunnelDataType);
+        }
+    }
+
+    private PrimaryKeyType convertPrimaryKeyType(SeaTunnelDataType<?> 
seaTunnelDataType) {
+        switch (seaTunnelDataType.getSqlType()) {
+            case INT:
+            case TINYINT:
+            case SMALLINT:
+            case BIGINT:
+                return PrimaryKeyType.INTEGER;
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+            case STRING:
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+            case BOOLEAN:
+                return PrimaryKeyType.STRING;
+            case BYTES:
+                return PrimaryKeyType.BINARY;
+            default:
+                throw new UnsupportedOperationException("Unsupported 
primaryKeyType: " + seaTunnelDataType);
+        }
+    }
+
+    private Column convertColumn(String columnName, Object value, ColumnType 
columnType) {
+        if (value == null) {
+            return null;
+        }
+        switch (columnType) {
+            case STRING:
+                return new Column(columnName, 
ColumnValue.fromString(String.valueOf(value)));
+            case INTEGER:
+                return new Column(columnName, ColumnValue.fromLong((long) 
value));
+            case BOOLEAN:
+                return new Column(columnName, 
ColumnValue.fromBoolean((boolean) value));
+            case DOUBLE:
+                return new Column(columnName, ColumnValue.fromDouble((Double) 
value));
+            case BINARY:
+                return new Column(columnName, ColumnValue.fromBinary((byte[]) 
value));
+            default:
+                throw new UnsupportedOperationException("Unsupported 
columnType: " + columnType);
+        }
+    }
+
+    private PrimaryKeyColumn convertPrimaryKeyColumn(String columnName, Object 
value, PrimaryKeyType primaryKeyType) {
+        if (value == null) {
+            return null;
+        }
+        switch (primaryKeyType) {
+            case STRING:
+                return new PrimaryKeyColumn(columnName, 
PrimaryKeyValue.fromString(String.valueOf(value)));
+            case INTEGER:
+                return new PrimaryKeyColumn(columnName, 
PrimaryKeyValue.fromLong((long) value));
+            case BINARY:
+                return new PrimaryKeyColumn(columnName, 
PrimaryKeyValue.fromBinary((byte[]) value));
+            default:
+                throw new UnsupportedOperationException("Unsupported 
primaryKeyType: " + primaryKeyType);
+        }
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..24a9eab8d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alicloud.openservices.tablestore.model.RowPutChange;
+
+public interface SeaTunnelRowSerializer {
+
+    RowPutChange serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
new file mode 100644
index 000000000..56ffdda5d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class TablestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private SeaTunnelRowType rowType;
+
+    private TablestoreOptions tablestoreOptions;
+
+    @Override
+    public String getPluginName() {
+        return "Tablestore";
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
END_POINT.key(), TABLE.key(), INSTANCE_NAME.key(), ACCESS_KEY_ID.key(), 
ACCESS_KEY_SECRET.key(), PRIMARY_KEYS.key());
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+        }
+        tablestoreOptions = new TablestoreOptions(pluginConfig);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.rowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return rowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new TablestoreWriter(tablestoreOptions, rowType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
new file mode 100644
index 000000000..ebeb6aa8b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import com.alicloud.openservices.tablestore.SyncClient;
+import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
+import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
+import com.alicloud.openservices.tablestore.model.RowPutChange;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class TablestoreSinkClient {
+    private final TablestoreOptions tablestoreOptions;
+    private ScheduledExecutorService scheduler;
+    private ScheduledFuture<?> scheduledFuture;
+    private volatile boolean initialize;
+    private volatile Exception flushException;
+    private SyncClient syncClient;
+    private final List<RowPutChange> batchList;
+
+    public TablestoreSinkClient(TablestoreOptions tablestoreOptions, 
SeaTunnelRowType typeInfo) {
+        this.tablestoreOptions = tablestoreOptions;
+        this.batchList = new ArrayList<>();
+    }
+
+    private void tryInit() throws IOException {
+        if (initialize) {
+            return;
+        }
+        syncClient = new SyncClient(
+            tablestoreOptions.getEndpoint(),
+            tablestoreOptions.getAccessKeyId(),
+            tablestoreOptions.getAccessKeySecret(),
+            tablestoreOptions.getInstanceName());
+
+        scheduler = Executors.newSingleThreadScheduledExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat("Tablestore-sink-output-%s").build());
+        scheduledFuture = scheduler.scheduleAtFixedRate(
+            () -> {
+                try {
+                    flush();
+                } catch (IOException e) {
+                    flushException = e;
+                }
+            },
+            tablestoreOptions.getBatchIntervalMs(),
+            tablestoreOptions.getBatchIntervalMs(),
+            TimeUnit.MILLISECONDS);
+
+        initialize = true;
+    }
+
+    public void write(RowPutChange rowPutChange) throws IOException {
+        tryInit();
+        checkFlushException();
+        batchList.add(rowPutChange);
+        if (tablestoreOptions.getBatchSize() > 0
+            && batchList.size() >= tablestoreOptions.getBatchSize()) {
+            flush();
+        }
+    }
+
+    public void close() throws IOException {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduler.shutdown();
+        }
+        if (syncClient != null) {
+            flush();
+            syncClient.shutdown();
+        }
+    }
+
+    synchronized void flush() throws IOException {
+        checkFlushException();
+        if (batchList.isEmpty()) {
+            return;
+        }
+        BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
+        batchList.forEach(batchWriteRowRequest::addRowChange);
+        BatchWriteRowResponse response = 
syncClient.batchWriteRow(batchWriteRowRequest);
+
+        if (!response.isAllSucceed()) {
+            for (BatchWriteRowResponse.RowResult rowResult : 
response.getFailedRows()) {
+                throw new SeaTunnelException("Code: " + 
rowResult.getError().getCode()
+                    + "Message:" + rowResult.getError().getMessage());
+            }
+        }
+
+        batchList.clear();
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing items to Tablestore failed.", 
flushException);
+        }
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
new file mode 100644
index 000000000..2310292a7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
+import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class TablestoreSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Tablestore";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+            .required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID, 
ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA)
+            .optional(BATCH_INTERVAL_MS, BATCH_SIZE)
+            .build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
new file mode 100644
index 000000000..b156d7317
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
+
+import java.io.IOException;
+
+public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+    private final TablestoreSinkClient tablestoreSinkClient;
+    private final SeaTunnelRowSerializer serializer;
+
+    public TablestoreWriter(TablestoreOptions tablestoreOptions, 
SeaTunnelRowType seaTunnelRowType) {
+        tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions, 
seaTunnelRowType);
+        serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
tablestoreOptions);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        tablestoreSinkClient.write(serializer.serialize(element));
+    }
+
+    @Override
+    public void close() throws IOException {
+        tablestoreSinkClient.close();
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index a2ce69fff..9696421c3 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -58,6 +58,7 @@
         <module>connector-iceberg</module>
         <module>connector-influxdb</module>
         <module>connector-amazondynamodb</module>
+        <module>connector-tablestore</module>
         <module>connector-cassandra</module>
         <module>connector-starrocks</module>
         <module>connector-google-sheets</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index e6cbc4afb..ba38d3ea3 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -321,6 +321,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-tablestore</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
@@ -677,7 +683,7 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
-                
+
                 <!-- transforms v2 -->
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>

Reply via email to