This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ebebf0b63 [Feature][connector-v2] add tablestore source and sink
(#3309)
ebebf0b63 is described below
commit ebebf0b63360e9eababda42da4ddd62b65d2599b
Author: liugddx <[email protected]>
AuthorDate: Tue Nov 15 10:27:41 2022 +0800
[Feature][connector-v2] add tablestore source and sink (#3309)
---
docs/en/connector-v2/sink/Tablestore.md | 74 ++++++++++
docs/en/connector-v2/source/Jdbc.md | 25 ++--
plugin-mapping.properties | 3 +-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 20 ++-
.../jdbc/internal/dialect/JdbcDialect.java | 7 +
.../dialect/tablestore/TablestoreDialect.java | 58 ++++++++
.../tablestore/TablestoreDialectFactory.java | 40 ++++++
.../tablestore/TablestoreJdbcRowConverter.java | 39 +++++
.../dialect/tablestore/TablestoreTypeMapper.java | 78 ++++++++++
.../seatunnel/jdbc/source/JdbcSource.java | 4 +-
.../connector-tablestore/pom.xml | 49 +++++++
.../tablestore/config/TablestoreConfig.java | 58 ++++++++
.../tablestore/config/TablestoreOptions.java | 66 +++++++++
.../serialize/DefaultSeaTunnelRowSerializer.java | 158 +++++++++++++++++++++
.../serialize/SeaTunnelRowSerializer.java | 27 ++++
.../seatunnel/tablestore/sink/TablestoreSink.java | 81 +++++++++++
.../tablestore/sink/TablestoreSinkClient.java | 127 +++++++++++++++++
.../tablestore/sink/TablestoreSinkFactory.java | 50 +++++++
.../tablestore/sink/TablestoreWriter.java | 48 +++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 8 +-
21 files changed, 1004 insertions(+), 17 deletions(-)
diff --git a/docs/en/connector-v2/sink/Tablestore.md
b/docs/en/connector-v2/sink/Tablestore.md
new file mode 100644
index 000000000..15ca34eda
--- /dev/null
+++ b/docs/en/connector-v2/sink/Tablestore.md
@@ -0,0 +1,74 @@
+# Tablestore
+
+> Tablestore sink connector
+
+## Description
+
+Write data to `Tablestore`
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------- | ------ |----------| ------------- |
+| end_point | string | yes | - |
+| instance_name | string | yes | - |
+| access_key_id | string | yes | - |
+| access_key_secret| string | yes | - |
+| table | string | yes | - |
+| primary_keys | array | yes | - |
+| batch_size | string | no | 25 |
+| batch_interval_ms| string | no | 1000 |
+| common-options | config | no | - |
+
+### end_point [string]
+
+endPoint to write to Tablestore.
+
+### instanceName [string]
+
+The instanceName of Tablestore.
+
+### access_key_id [string]
+
+The access id of Tablestore.
+
+### access_key_secret [string]
+
+The access secret of Tablestore.
+
+### table [string]
+
+The table of Tablestore.
+
+### primaryKeys [array]
+
+The primaryKeys of Tablestore.
+
+### common options [ config ]
+
+Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details.
+
+## Example
+
+```bash
+Tablestore {
+ end_point = "xxxx"
+ instance_name = "xxxx"
+ access_key_id = "xxxx"
+ access_key_secret = "xxxx"
+ table = "sink"
+ primary_keys = ["pk_1","pk_2","pk_3","pk_4"]
+ }
+```
+
+## Changelog
+
+### next version
+
+- Add Tablestore Sink Connector
+
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 3e8d91806..92b40b489 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -90,17 +90,18 @@ in parallel according to the concurrency of tasks.
there are some reference value for params above.
-| datasource | driver | url
| maven
|
-|------------|----------------------------------------------|--------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
-| mysql | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| postgresql | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
-| dm | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
-| phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF |
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
-| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:microsoft:sqlserver://localhost:1433 |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
-| oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
-| gbase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test |
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
|
-| starrocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| datasource | driver
| url
| maven
|
+|------------|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
+| mysql | com.mysql.cj.jdbc.Driver
| jdbc:mysql://localhost:3306/test
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| postgresql | org.postgresql.Driver
| jdbc:postgresql://localhost:5432/postgres
|
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
+| dm | dm.jdbc.driver.DmDriver
| jdbc:dm://localhost:5236
|
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
+| phoenix | org.apache.phoenix.queryserver.client.Driver
| jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
+| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver
| jdbc:microsoft:sqlserver://localhost:1433
|
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
+| oracle | oracle.jdbc.OracleDriver
| jdbc:oracle:thin:@localhost:1521/xepdb1
|
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| gbase8a | com.gbase.jdbc.Driver
| jdbc:gbase://e2e_gbase8aDb:5258/test
|
https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar
|
+| starrocks | com.mysql.cj.jdbc.Driver
| jdbc:mysql://localhost:3306/test
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| db2 | com.ibm.db2.jcc.DB2Driver
| jdbc:db2://localhost:50000/testdb
|
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver
| "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance"
|
https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc
|
## Example
@@ -145,6 +146,8 @@ parallel:
- [Feature] Support StarRocks JDBC Source
([3060](https://github.com/apache/incubator-seatunnel/pull/3060))
- [Feature] Support GBase8a JDBC Source
([3026](https://github.com/apache/incubator-seatunnel/pull/3026))
- [Feature] Support DB2 JDBC Source
([2410](https://github.com/apache/incubator-seatunnel/pull/2410))
+
### next version
- [BugFix] Fix jdbc split bug
([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
+- [Feature] Support Tablestore Source
([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 6863ef37e..fdb10982e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -144,4 +144,5 @@ seatunnel.sink.Cassandra = connector-cassandra
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.source.MyHours = connector-http-myhours
seatunnel.sink.InfluxDB = connector-influxdb
-seatunnel.source.GoogleSheets = connector-google-sheets
\ No newline at end of file
+seatunnel.source.GoogleSheets = connector-google-sheets
+seatunnel.sink.Tablestore = connector-tablestore
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index c04b9c9bd..cd4a5f221 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -37,6 +37,7 @@
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<oracle.version>12.2.0.1</oracle.version>
<db2.version>db2jcc4</db2.version>
+ <tablestore.version>5.13.9</tablestore.version>
</properties>
<dependencyManagement>
@@ -84,11 +85,24 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.aliyun.openservices</groupId>
+ <artifactId>tablestore-jdbc</artifactId>
+ <version>${tablestore.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
@@ -119,6 +133,10 @@
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.aliyun.openservices</groupId>
+ <artifactId>tablestore-jdbc</artifactId>
+ </dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 12fc2cd8c..150e8e9bd 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -17,12 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
/**
@@ -67,4 +69,9 @@ public interface JdbcDialect extends Serializable {
return statement;
}
+ default ResultSetMetaData getResultSetMetaData(Connection conn,
JdbcSourceOptions jdbcSourceOptions) throws SQLException {
+ PreparedStatement ps =
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
+ return ps.getMetaData();
+ }
+
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
new file mode 100644
index 000000000..7dfa1888e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class TablestoreDialect implements JdbcDialect {
+ @Override
+ public String dialectName() {
+ return "Tablestore";
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new TablestoreJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new TablestoreTypeMapper();
+ }
+
+ @Override
+ public PreparedStatement creatPreparedStatement(Connection connection,
String queryTemplate, int fetchSize) throws SQLException {
+ PreparedStatement statement =
connection.prepareStatement(queryTemplate);
+ statement.setFetchSize(fetchSize);
+ return statement;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData(Connection conn,
JdbcSourceOptions jdbcSourceOptions) throws SQLException {
+ PreparedStatement ps =
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
+ return ps.executeQuery().getMetaData();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
new file mode 100644
index 000000000..995cbb6f3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/**
+ * Factory for {@link TablestoreDialect}.
+ */
+
+@AutoService(JdbcDialectFactory.class)
+public class TablestoreDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:ots:https:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new TablestoreDialect();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
new file mode 100644
index 000000000..81e2df6b9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreJdbcRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class TablestoreJdbcRowConverter extends AbstractJdbcRowConverter {
+
+ @Override
+ public String converterName() {
+ return "Tablestore";
+ }
+
+ @Override
+ public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData,
SeaTunnelRowType typeInfo) throws SQLException {
+ return super.toInternal(rs, metaData, typeInfo);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
new file mode 100644
index 000000000..56b8fa507
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreTypeMapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+@Slf4j
+public class TablestoreTypeMapper implements JdbcDialectTypeMapper {
+
+
+ // ============================data types=====================
+
+ private static final String TABLESTORE_UNKNOWN = "UNKNOWN";
+
+ private static final String TABLESTORE_BOOL = "BOOL";
+
+ // -------------------------number----------------------------
+ private static final String TABLESTORE_BIGINT = "BIGINT";
+ private static final String TABLESTORE_DOUBLE = "DOUBLE";
+ // -------------------------string----------------------------
+ private static final String TABLESTORE_VARCHAR = "VARCHAR";
+ private static final String TABLESTORE_MEDIUMTEXT = "MEDIUMTEXT";
+
+ // ------------------------------blob-------------------------
+ private static final String TABLESTORE_VARBINARY = "VARBINARY";
+ private static final String TABLESTORE_MEDIUMBLOB = "MEDIUMBLOB";
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex) throws SQLException {
+ String tablestoreServerType =
metadata.getColumnTypeName(colIndex).toUpperCase();
+ switch (tablestoreServerType) {
+ case TABLESTORE_BOOL:
+ return BasicType.BOOLEAN_TYPE;
+ case TABLESTORE_BIGINT:
+ return BasicType.LONG_TYPE;
+ case TABLESTORE_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case TABLESTORE_VARCHAR:
+ case TABLESTORE_MEDIUMTEXT:
+ return BasicType.STRING_TYPE;
+ case TABLESTORE_VARBINARY:
+ case TABLESTORE_MEDIUMBLOB:
+ return PrimitiveByteArrayType.INSTANCE;
+ //Doesn't support yet
+ case TABLESTORE_UNKNOWN:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support TABLESTORE type '%s' on column '%s'
yet.",
+ tablestoreServerType, jdbcColumnName));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 2fd6f730d..918ea1052 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -44,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -129,8 +128,7 @@ public class JdbcSource implements
SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {
- PreparedStatement ps =
conn.prepareStatement(jdbcSourceOptions.getJdbcConnectionOptions().getQuery());
- ResultSetMetaData resultSetMetaData = ps.getMetaData();
+ ResultSetMetaData resultSetMetaData =
jdbcDialect.getResultSetMetaData(conn, jdbcSourceOptions);
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
fieldNames.add(resultSetMetaData.getColumnName(i));
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
diff --git a/seatunnel-connectors-v2/connector-tablestore/pom.xml
b/seatunnel-connectors-v2/connector-tablestore/pom.xml
new file mode 100644
index 000000000..e12a0c72d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-tablestore/pom.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-tablestore</artifactId>
+
+ <properties>
+ <tablestore.version>5.13.9</tablestore.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun.openservices</groupId>
+ <artifactId>tablestore</artifactId>
+ <version>${tablestore.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
new file mode 100644
index 000000000..4a79197c6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+
+public class TablestoreConfig implements Serializable {
+ public static final Option<String> END_POINT = Options.key("end_point")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(" Tablestore end_point");
+ public static final Option<String> INSTANCE_NAME =
Options.key("instance_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(" Tablestore instance_name");
+ public static final Option<String> ACCESS_KEY_ID =
Options.key("access_key_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(" Tablestore access_key_id");
+ public static final Option<String> ACCESS_KEY_SECRET =
Options.key("access_key_secret")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(" Tablestore access_key_secret");
+ public static final Option<String> TABLE = Options.key("table")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(" Tablestore table");
+ public static final Option<String> BATCH_SIZE = Options.key("batch_size")
+ .stringType()
+ .defaultValue("25")
+ .withDescription(" Tablestore batch_size");
+ public static final Option<String> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
+ .stringType()
+ .defaultValue("1000")
+ .withDescription(" Tablestore batch_interval_ms");
+ public static final Option<String> PRIMARY_KEYS =
Options.key("primary_keys")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(" Tablestore primary_keys");
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
new file mode 100644
index 000000000..7faa8a688
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
+
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class TablestoreOptions implements Serializable {
+
+
+ private String endpoint;
+
+ private String instanceName;
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String table;
+
+ private List<String> primaryKeys;
+
+ public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
+ public int batchIntervalMs =
Integer.parseInt(BATCH_INTERVAL_MS.defaultValue());
+
+ public TablestoreOptions(Config config) {
+ this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
+ this.instanceName =
config.getString(TablestoreConfig.INSTANCE_NAME.key());
+ this.accessKeyId =
config.getString(TablestoreConfig.ACCESS_KEY_ID.key());
+ this.accessKeySecret =
config.getString(TablestoreConfig.ACCESS_KEY_SECRET.key());
+ this.table = config.getString(TablestoreConfig.TABLE.key());
+ this.primaryKeys =
config.getStringList(TablestoreConfig.PRIMARY_KEYS.key());
+
+ if (config.hasPath(BATCH_SIZE.key())) {
+ this.batchSize = config.getInt(BATCH_SIZE.key());
+ }
+ if (config.hasPath(TablestoreConfig.BATCH_INTERVAL_MS.key())) {
+ this.batchIntervalMs =
config.getInt(TablestoreConfig.BATCH_INTERVAL_MS.key());
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 000000000..0f3e7f3e2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import com.alicloud.openservices.tablestore.model.Column;
+import com.alicloud.openservices.tablestore.model.ColumnType;
+import com.alicloud.openservices.tablestore.model.ColumnValue;
+import com.alicloud.openservices.tablestore.model.Condition;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyType;
+import com.alicloud.openservices.tablestore.model.PrimaryKeyValue;
+import com.alicloud.openservices.tablestore.model.RowExistenceExpectation;
+import com.alicloud.openservices.tablestore.model.RowPutChange;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
+
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final TablestoreOptions tablestoreOptions;
+
+ public DefaultSeaTunnelRowSerializer(SeaTunnelRowType seaTunnelRowType,
TablestoreOptions tablestoreOptions) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.tablestoreOptions = tablestoreOptions;
+ }
+
+ @Override
+ public RowPutChange serialize(SeaTunnelRow seaTunnelRow) {
+
+ PrimaryKeyBuilder primaryKeyBuilder =
PrimaryKeyBuilder.createPrimaryKeyBuilder();
+ List<Column> columns = new ArrayList<>(seaTunnelRow.getFields().length
- tablestoreOptions.getPrimaryKeys().size());
+ Arrays.stream(seaTunnelRowType.getFieldNames()).forEach(fieldName -> {
+ Object field =
seaTunnelRow.getField(seaTunnelRowType.indexOf(fieldName));
+ int index = seaTunnelRowType.indexOf(fieldName);
+ if (tablestoreOptions.getPrimaryKeys().contains(fieldName)) {
+ primaryKeyBuilder.addPrimaryKeyColumn(
+ this.convertPrimaryKeyColumn(fieldName, field,
+
this.convertPrimaryKeyType(seaTunnelRowType.getFieldType(index))));
+ } else {
+ columns.add(this.convertColumn(fieldName, field,
+
this.convertColumnType(seaTunnelRowType.getFieldType(index))));
+ }
+ });
+ RowPutChange rowPutChange = new
RowPutChange(tablestoreOptions.getTable(), primaryKeyBuilder.build());
+ rowPutChange.setCondition(new
Condition(RowExistenceExpectation.IGNORE));
+ columns.forEach(rowPutChange::addColumn);
+
+ return rowPutChange;
+ }
+
+ private ColumnType convertColumnType(SeaTunnelDataType<?>
seaTunnelDataType) {
+ switch (seaTunnelDataType.getSqlType()) {
+ case INT:
+ case TINYINT:
+ case SMALLINT:
+ case BIGINT:
+ return ColumnType.INTEGER;
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ return ColumnType.DOUBLE;
+ case STRING:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ return ColumnType.STRING;
+ case BOOLEAN:
+ return ColumnType.BOOLEAN;
+ case BYTES:
+ return ColumnType.BINARY;
+ default:
+ throw new UnsupportedOperationException("Unsupported
columnType: " + seaTunnelDataType);
+ }
+ }
+
+ private PrimaryKeyType convertPrimaryKeyType(SeaTunnelDataType<?>
seaTunnelDataType) {
+ switch (seaTunnelDataType.getSqlType()) {
+ case INT:
+ case TINYINT:
+ case SMALLINT:
+ case BIGINT:
+ return PrimaryKeyType.INTEGER;
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ case STRING:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case BOOLEAN:
+ return PrimaryKeyType.STRING;
+ case BYTES:
+ return PrimaryKeyType.BINARY;
+ default:
+ throw new UnsupportedOperationException("Unsupported
primaryKeyType: " + seaTunnelDataType);
+ }
+ }
+
+ private Column convertColumn(String columnName, Object value, ColumnType
columnType) {
+ if (value == null) {
+ return null;
+ }
+ switch (columnType) {
+ case STRING:
+ return new Column(columnName,
ColumnValue.fromString(String.valueOf(value)));
+ case INTEGER:
+ return new Column(columnName, ColumnValue.fromLong((long)
value));
+ case BOOLEAN:
+ return new Column(columnName,
ColumnValue.fromBoolean((boolean) value));
+ case DOUBLE:
+ return new Column(columnName, ColumnValue.fromDouble((Double)
value));
+ case BINARY:
+ return new Column(columnName, ColumnValue.fromBinary((byte[])
value));
+ default:
+ throw new UnsupportedOperationException("Unsupported
columnType: " + columnType);
+ }
+ }
+
+ private PrimaryKeyColumn convertPrimaryKeyColumn(String columnName, Object
value, PrimaryKeyType primaryKeyType) {
+ if (value == null) {
+ return null;
+ }
+ switch (primaryKeyType) {
+ case STRING:
+ return new PrimaryKeyColumn(columnName,
PrimaryKeyValue.fromString(String.valueOf(value)));
+ case INTEGER:
+ return new PrimaryKeyColumn(columnName,
PrimaryKeyValue.fromLong((long) value));
+ case BINARY:
+ return new PrimaryKeyColumn(columnName,
PrimaryKeyValue.fromBinary((byte[]) value));
+ default:
+ throw new UnsupportedOperationException("Unsupported
primaryKeyType: " + primaryKeyType);
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..24a9eab8d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.alicloud.openservices.tablestore.model.RowPutChange;
+
+public interface SeaTunnelRowSerializer {
+
+ RowPutChange serialize(SeaTunnelRow seaTunnelRow);
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
new file mode 100644
index 000000000..56ffdda5d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSink.class)
+public class TablestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private SeaTunnelRowType rowType;
+
+ private TablestoreOptions tablestoreOptions;
+
+ @Override
+ public String getPluginName() {
+ return "Tablestore";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
END_POINT.key(), TABLE.key(), INSTANCE_NAME.key(), ACCESS_KEY_ID.key(),
ACCESS_KEY_SECRET.key(), PRIMARY_KEYS.key());
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
result.getMsg());
+ }
+ tablestoreOptions = new TablestoreOptions(pluginConfig);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.rowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return rowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) throws IOException {
+ return new TablestoreWriter(tablestoreOptions, rowType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
new file mode 100644
index 000000000..ebeb6aa8b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+
+import com.alicloud.openservices.tablestore.SyncClient;
+import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
+import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
+import com.alicloud.openservices.tablestore.model.RowPutChange;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class TablestoreSinkClient {
+ private final TablestoreOptions tablestoreOptions;
+ private ScheduledExecutorService scheduler;
+ private ScheduledFuture<?> scheduledFuture;
+ private volatile boolean initialize;
+ private volatile Exception flushException;
+ private SyncClient syncClient;
+ private final List<RowPutChange> batchList;
+
+ public TablestoreSinkClient(TablestoreOptions tablestoreOptions,
SeaTunnelRowType typeInfo) {
+ this.tablestoreOptions = tablestoreOptions;
+ this.batchList = new ArrayList<>();
+ }
+
+ private void tryInit() throws IOException {
+ if (initialize) {
+ return;
+ }
+ syncClient = new SyncClient(
+ tablestoreOptions.getEndpoint(),
+ tablestoreOptions.getAccessKeyId(),
+ tablestoreOptions.getAccessKeySecret(),
+ tablestoreOptions.getInstanceName());
+
+ scheduler = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("Tablestore-sink-output-%s").build());
+ scheduledFuture = scheduler.scheduleAtFixedRate(
+ () -> {
+ try {
+ flush();
+ } catch (IOException e) {
+ flushException = e;
+ }
+ },
+ tablestoreOptions.getBatchIntervalMs(),
+ tablestoreOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+
+ initialize = true;
+ }
+
+ public void write(RowPutChange rowPutChange) throws IOException {
+ tryInit();
+ checkFlushException();
+ batchList.add(rowPutChange);
+ if (tablestoreOptions.getBatchSize() > 0
+ && batchList.size() >= tablestoreOptions.getBatchSize()) {
+ flush();
+ }
+ }
+
+ public void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ scheduler.shutdown();
+ }
+ if (syncClient != null) {
+ flush();
+ syncClient.shutdown();
+ }
+ }
+
+ synchronized void flush() throws IOException {
+ checkFlushException();
+ if (batchList.isEmpty()) {
+ return;
+ }
+ BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
+ batchList.forEach(batchWriteRowRequest::addRowChange);
+ BatchWriteRowResponse response =
syncClient.batchWriteRow(batchWriteRowRequest);
+
+ if (!response.isAllSucceed()) {
+ for (BatchWriteRowResponse.RowResult rowResult :
response.getFailedRows()) {
+ throw new SeaTunnelException("Code: " +
rowResult.getError().getCode()
+ + "Message:" + rowResult.getError().getMessage());
+ }
+ }
+
+ batchList.clear();
+ }
+
+ private void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Writing items to Tablestore failed.",
flushException);
+ }
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
new file mode 100644
index 000000000..2310292a7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_INTERVAL_MS;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
+import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class TablestoreSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Tablestore";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(END_POINT, TABLE, INSTANCE_NAME, ACCESS_KEY_ID,
ACCESS_KEY_SECRET, PRIMARY_KEYS, SeaTunnelSchema.SCHEMA)
+ .optional(BATCH_INTERVAL_MS, BATCH_SIZE)
+ .build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
new file mode 100644
index 000000000..b156d7317
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
+
+import java.io.IOException;
+
+public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+
+ private final TablestoreSinkClient tablestoreSinkClient;
+ private final SeaTunnelRowSerializer serializer;
+
+ public TablestoreWriter(TablestoreOptions tablestoreOptions,
SeaTunnelRowType seaTunnelRowType) {
+ tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions,
seaTunnelRowType);
+ serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
tablestoreOptions);
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ tablestoreSinkClient.write(serializer.serialize(element));
+ }
+
+ @Override
+ public void close() throws IOException {
+ tablestoreSinkClient.close();
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index a2ce69fff..9696421c3 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -58,6 +58,7 @@
<module>connector-iceberg</module>
<module>connector-influxdb</module>
<module>connector-amazondynamodb</module>
+ <module>connector-tablestore</module>
<module>connector-cassandra</module>
<module>connector-starrocks</module>
<module>connector-google-sheets</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index e6cbc4afb..ba38d3ea3 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -321,6 +321,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-tablestore</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -677,7 +683,7 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
-
+
<!-- transforms v2 -->
<dependency>
<groupId>org.apache.seatunnel</groupId>