This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 12268a6f4 [Feature][Connector-V2][Cassandra] Add Cassandra Source And 
Sink Connector (#3229)
12268a6f4 is described below

commit 12268a6f4b6fd47f7bb797a13b788cbac33bbce4
Author: FWLamb <[email protected]>
AuthorDate: Tue Nov 8 19:14:33 2022 +0800

    [Feature][Connector-V2][Cassandra] Add Cassandra Source And Sink Connector 
(#3229)
---
 docs/en/Connector-v2-release-state.md              |   4 +-
 docs/en/connector-v2/sink/Cassandra.md             |  98 ++++++
 docs/en/connector-v2/source/Cassandra.md           |  82 +++++
 plugin-mapping.properties                          |   4 +-
 .../connector-cassandra/pom.xml                    |  52 +++
 .../cassandra/client/CassandraClient.java          |  65 ++++
 .../cassandra/config/CassandraConfig.java          | 116 +++++++
 .../seatunnel/cassandra/sink/CassandraSink.java    | 110 +++++++
 .../cassandra/sink/CassandraSinkWriter.java        | 148 +++++++++
 .../cassandra/source/CassandraSource.java          | 102 ++++++
 .../cassandra/source/CassandraSourceReader.java    |  76 +++++
 .../seatunnel/cassandra/util/TypeConvertUtil.java  | 309 +++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 .../connector-cassandra-e2e/pom.xml                |  45 +++
 .../seatunnel/cassandra/CassandraIT.java           | 365 +++++++++++++++++++++
 .../src/test/resources/application.conf            |  25 ++
 .../src/test/resources/cassandra_to_cassandra.conf |  52 +++
 .../src/test/resources/init/cassandra_init.conf    | 105 ++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 19 files changed, 1758 insertions(+), 2 deletions(-)

diff --git a/docs/en/Connector-v2-release-state.md 
b/docs/en/Connector-v2-release-state.md
index 4d861d394..2691192a3 100644
--- a/docs/en/Connector-v2-release-state.md
+++ b/docs/en/Connector-v2-release-state.md
@@ -58,4 +58,6 @@ SeaTunnel uses a grading system for connectors to help you 
understand what to ex
 | [Kafka](connector-v2/source/kafka.md)                       | Source | Alpha 
 | 2.3.0-beta      |
 | [Kafka](connector-v2/sink/Kafka.md)                         | Sink   | Alpha 
 | 2.3.0-beta      |
 | [S3File](connector-v2/source/S3File.md)                     | Source | Alpha 
 | 2.3.0-beta      |
-| [S3File](connector-v2/sink/S3File.md)                       | Sink   | Alpha 
 | 2.3.0-beta      |
\ No newline at end of file
+| [S3File](connector-v2/sink/S3File.md)                       | Sink   | Alpha 
 | 2.3.0-beta      |
+| [Cassandra](connector-v2/source/Cassandra.md)               | Source | Alpha 
 | 2.3.0-beta      |
+| [Cassandra](connector-v2/sink/Cassandra.md)                 | Sink   | Alpha 
 | 2.3.0-beta      |
diff --git a/docs/en/connector-v2/sink/Cassandra.md 
b/docs/en/connector-v2/sink/Cassandra.md
new file mode 100644
index 000000000..0a4ece086
--- /dev/null
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -0,0 +1,98 @@
+# Cassandra
+
+> Cassandra sink connector
+
+## Description
+
+Write data to Apache Cassandra.
+
+## Key features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name              | type   | required | default value |
+|-------------------|--------|----------|---------------|
+| host              | String | Yes      | -             |
+| keyspace          | String | Yes      | -             |
+| table             | String | Yes      | -             |
+| username          | String | No       | -             |
+| password          | String | No       | -             |
+| datacenter        | String | No       | datacenter1   |
+| consistency_level | String | No       | LOCAL_ONE     |
+| fields            | String | No       | LOCAL_ONE     |
+| batch_size        | String | No       | 5000          |
+| batch_type        | String | No       | UNLOGGER      |
+| async_write       | String | No       | true          |
+
+### host [string]
+
+`Cassandra` cluster address, the format is `host:port` , allowing multiple 
`hosts` to be specified. Such as
+`"cassandra1:9042,cassandra2:9042"`.
+
+### keyspace [string]
+
+The `Cassandra` keyspace.
+
+### table [String]
+
+The `Cassandra` table name.
+
+### username [string]
+
+`Cassandra` user username.
+
+### password [string]
+
+`Cassandra` user password.
+
+### datacenter [String]
+
+The `Cassandra` datacenter, default is `datacenter1`.
+
+### consistency_level [String]
+
+The `Cassandra` write consistency level, default is `LOCAL_ONE`.
+
+### fields [array]
+
+The data field that needs to be output to `Cassandra` , if not configured, it 
will be automatically adapted 
+according to the sink table `schema`.
+
+### batch_size [number]
+
+The number of rows written through 
[Cassandra-Java-Driver](https://github.com/datastax/java-driver) each time, 
+default is `5000`.
+
+### batch_type [String]
+
+The `Cassandra` batch processing mode, default is `UNLOGGER`.
+
+### async_write [boolean]
+
+Whether `cassandra` writes in asynchronous mode, default is `true`.
+
+## Examples
+
+```hocon
+sink {
+ Cassandra {
+     host = "localhost:9042"
+     username = "cassandra"
+     password = "cassandra"
+     datacenter = "datacenter1"
+     keyspace = "test"
+    }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Cassandra Sink Connector
+
+
+
diff --git a/docs/en/connector-v2/source/Cassandra.md 
b/docs/en/connector-v2/source/Cassandra.md
new file mode 100644
index 000000000..e2a6e4e8c
--- /dev/null
+++ b/docs/en/connector-v2/source/Cassandra.md
@@ -0,0 +1,82 @@
+# Cassandra
+
+> Cassandra source connector
+
+## Description
+
+Read data from Apache Cassandra.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name                    | type   | required | default value |
+|-------------------------|--------|----------|---------------|
+| host                    | String | Yes      | -             |
+| keyspace                | String | Yes      | -             |
+| cql                     | String | Yes      | -             |
+| username                | String | No       | -             |
+| password                | String | No       | -             |
+| datacenter              | String | No       | datacenter1   |
+| consistency_level       | String | No       | LOCAL_ONE     |
+
+### host [string]
+
+`Cassandra` cluster address, the format is `host:port` , allowing multiple 
`hosts` to be specified. Such as
+`"cassandra1:9042,cassandra2:9042"`.
+
+### keyspace [string]
+
+The `Cassandra` keyspace.
+
+### cql [String]
+
+The query cql used to search data though Cassandra session.
+
+### username [string]
+
+`Cassandra` user username.
+
+### password [string]
+
+`Cassandra` user password.
+
+### datacenter [String]
+
+The `Cassandra` datacenter, default is `datacenter1`.
+
+### consistency_level [String]
+
+The `Cassandra` write consistency level, default is `LOCAL_ONE`.
+
+## Examples
+
+```hocon
+source {
+ Cassandra {
+     host = "localhost:9042"
+     username = "cassandra"
+     password = "cassandra"
+     datacenter = "datacenter1"
+     keyspace = "test"
+     cql = "select * from source_table"
+     result_table_name = "source_table"
+    }
+}
+```
+
+## Changelog
+
+### next version
+
+- Add Cassandra Source Connector
+
+
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1ac1e2687..c943de4e3 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -139,5 +139,7 @@ seatunnel.source.S3File = connector-file-s3
 seatunnel.sink.S3File = connector-file-s3
 seatunnel.source.Amazondynamodb = connector-amazondynamodb
 seatunnel.sink.Amazondynamodb = connector-amazondynamodb
+seatunnel.source.Cassandra = connector-cassandra
+seatunnel.sink.Cassandra = connector-cassandra
 seatunnel.sink.StarRocks = connector-starrocks
-seatunnel.sink.InfluxDB = connector-influxdb
+seatunnel.sink.InfluxDB = connector-influxdb
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-cassandra/pom.xml 
b/seatunnel-connectors-v2/connector-cassandra/pom.xml
new file mode 100644
index 000000000..4be1ba14c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-cassandra/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-cassandra</artifactId>
+
+    <properties>
+        <cassandra.driver.version>4.14.0</cassandra.driver.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.datastax.oss</groupId>
+            <artifactId>java-driver-core</artifactId>
+            <version>${cassandra.driver.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java
new file mode 100644
index 000000000..a806b48fe
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/client/CassandraClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.client;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.CqlSessionBuilder;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+public class CassandraClient {
+    public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, 
String keyspace, String username, String password, String dataCenter) {
+        List<CqlSessionBuilder> cqlSessionBuilderList = 
Arrays.stream(nodeAddress.split(",")).map(address -> {
+            String[] nodeAndPort = address.split(":", 2);
+            if (StringUtils.isEmpty(username) && 
StringUtils.isEmpty(password)) {
+                return CqlSession.builder()
+                    .addContactPoint(new InetSocketAddress(nodeAndPort[0], 
Integer.parseInt(nodeAndPort[1])))
+                    .withKeyspace(keyspace)
+                    .withLocalDatacenter(dataCenter);
+            }
+            return CqlSession.builder()
+                .addContactPoint(new InetSocketAddress(nodeAndPort[0], 
Integer.parseInt(nodeAndPort[1])))
+                .withAuthCredentials(username, password)
+                .withKeyspace(keyspace)
+                .withLocalDatacenter(dataCenter);
+        }).collect(Collectors.toList());
+        return 
cqlSessionBuilderList.get(ThreadLocalRandom.current().nextInt(cqlSessionBuilderList.size()));
+    }
+
+    public static SimpleStatement createSimpleStatement(String cql, 
ConsistencyLevel consistencyLevel) {
+        return 
SimpleStatement.builder(cql).setConsistencyLevel(consistencyLevel).build();
+    }
+
+    public static ColumnDefinitions getTableSchema(CqlSession session, String 
table) {
+        try {
+            return session.execute(String.format("select * from %s limit 1", 
table))
+                .getColumnDefinitions();
+        } catch (Exception e) {
+            throw new RuntimeException("Cannot get table schema from 
cassandra", e);
+        }
+
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
new file mode 100644
index 000000000..c2d6c129f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.ConsistencyLevel;
+import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
+import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.ToString;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+@ToString
+@NoArgsConstructor
+public class CassandraConfig implements Serializable {
+
+    public static final String HOST = "host";
+    public static final String USERNAME = "username";
+    public static final String PASSWORD = "password";
+    public static final String DATACENTER = "datacenter";
+    public static final String KEYSPACE = "keyspace";
+    public static final String TABLE = "table";
+    public static final String CQL = "cql";
+    public static final String FIELDS = "fields";
+    public static final String CONSISTENCY_LEVEL = "consistency_level";
+    public static final String BATCH_SIZE = "batch_size";
+    public static final String BATCH_TYPE = "batch_type";
+    public static final String ASYNC_WRITE = "async_write";
+
+    private String host;
+    private String username;
+    private String password;
+    private String datacenter;
+    private String keyspace;
+    private String table;
+    private String cql;
+    private List<String> fields;
+    private ConsistencyLevel consistencyLevel;
+    private Integer batchSize;
+    private DefaultBatchType batchType;
+    private Boolean asyncWrite;
+
+    public CassandraConfig(@NonNull String host, @NonNull String keyspace) {
+        this.host = host;
+        this.keyspace = keyspace;
+    }
+
+    public static CassandraConfig getCassandraConfig(Config config) {
+        CassandraConfig cassandraConfig = new CassandraConfig(
+            config.getString(HOST),
+            config.getString(KEYSPACE)
+        );
+        if (config.hasPath(USERNAME)) {
+            cassandraConfig.setUsername(config.getString(USERNAME));
+        }
+        if (config.hasPath(PASSWORD)) {
+            cassandraConfig.setPassword(config.getString(PASSWORD));
+        }
+        if (config.hasPath(DATACENTER)) {
+            cassandraConfig.setDatacenter(config.getString(DATACENTER));
+        } else {
+            cassandraConfig.setDatacenter("datacenter1");
+        }
+        if (config.hasPath(TABLE)) {
+            cassandraConfig.setTable(config.getString(TABLE));
+        }
+        if (config.hasPath(CQL)) {
+            cassandraConfig.setCql(config.getString(CQL));
+        }
+        if (config.hasPath(FIELDS)) {
+            cassandraConfig.setFields(config.getStringList(FIELDS));
+        }
+        if (config.hasPath(CONSISTENCY_LEVEL)) {
+            
cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL)));
+        } else {
+            
cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE);
+        }
+        if (config.hasPath(BATCH_SIZE)) {
+            cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE));
+        } else {
+            cassandraConfig.setBatchSize(Integer.parseInt("5000"));
+        }
+        if (config.hasPath(BATCH_TYPE)) {
+            
cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE)));
+        } else {
+            cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED);
+        }
+        if (config.hasPath(ASYNC_WRITE)) {
+            cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE));
+        } else {
+            cassandraConfig.setAsyncWrite(true);
+        }
+        return cassandraConfig;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
new file mode 100644
index 000000000..b9c1154b5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.TABLE;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@AutoService(SeaTunnelSink.class)
+public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+    private CassandraConfig cassandraConfig;
+
+    private SeaTunnelRowType seaTunnelRowType;
+
+    private ColumnDefinitions tableSchema;
+
+    @Override
+    public String getPluginName() {
+        return "Cassandra";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, 
KEYSPACE, TABLE);
+        if (!checkResult.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
checkResult.getMsg());
+        }
+        this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+        try (CqlSession session = CassandraClient.getCqlSessionBuilder(
+            cassandraConfig.getHost(),
+            cassandraConfig.getKeyspace(),
+            cassandraConfig.getUsername(),
+            cassandraConfig.getPassword(),
+            cassandraConfig.getDatacenter()
+        ).build()) {
+            List<String> fields = cassandraConfig.getFields();
+            this.tableSchema = CassandraClient.getTableSchema(session, 
cassandraConfig.getTable());
+            if (fields == null || fields.isEmpty()) {
+                List<String> newFields = new ArrayList<>();
+                for (int i = 0; i < tableSchema.size(); i++) {
+                    newFields.add(tableSchema.get(i).getName().asInternal());
+                }
+                cassandraConfig.setFields(newFields);
+            } else {
+                for (String field : fields) {
+                    if (!tableSchema.contains(field)) {
+                        throw new RuntimeException("Field " + field + " does 
not exist in table " + config.getString(TABLE));
+                    }
+                }
+            }
+        } catch (Exception e) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
e.getMessage());
+        }
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) throws IOException {
+        return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, 
tableSchema);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
new file mode 100644
index 000000000..10639f7f0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.type.DataType;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, 
Void> {
+
+    private final CassandraConfig cassandraConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final ColumnDefinitions tableSchema;
+    private final CqlSession session;
+    private BatchStatement batchStatement;
+    private List<BoundStatement> boundStatementList;
+    private List<CompletionStage<AsyncResultSet>> completionStages;
+    private final PreparedStatement preparedStatement;
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    public CassandraSinkWriter(CassandraConfig cassandraConfig, 
SeaTunnelRowType seaTunnelRowType, ColumnDefinitions tableSchema) {
+        this.cassandraConfig = cassandraConfig;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.tableSchema = tableSchema;
+        this.session = CassandraClient.getCqlSessionBuilder(
+            cassandraConfig.getHost(),
+            cassandraConfig.getKeyspace(),
+            cassandraConfig.getUsername(),
+            cassandraConfig.getPassword(),
+            cassandraConfig.getDatacenter()).build();
+        this.batchStatement = 
BatchStatement.builder(cassandraConfig.getBatchType()).build();
+        this.boundStatementList = new ArrayList<>();
+        this.completionStages = new ArrayList<>();
+        this.preparedStatement = session.prepare(initPrepareCQL());
+    }
+
+    @Override
+    public void write(SeaTunnelRow row) throws IOException {
+        BoundStatement boundStatement = this.preparedStatement.bind();
+        addIntoBatch(row, boundStatement);
+        if (counter.getAndIncrement() >= cassandraConfig.getBatchSize()) {
+            flush();
+            counter.set(0);
+        }
+    }
+
+    private void flush() {
+        if (cassandraConfig.getAsyncWrite()) {
+            completionStages.forEach(resultStage -> resultStage.whenComplete(
+                (resultSet, error) -> {
+                    if (error != null) {
+                        log.error(ExceptionUtils.getMessage(error));
+                    }
+                }
+            ));
+            completionStages.clear();
+        } else {
+            try {
+                
this.session.execute(this.batchStatement.addAll(boundStatementList));
+            } catch (Exception e) {
+                log.error("Batch insert error,Try inserting one by one!");
+                for (BoundStatement statement : boundStatementList) {
+                    this.session.execute(statement);
+                }
+            } finally {
+                this.batchStatement.clear();
+                this.boundStatementList.clear();
+            }
+        }
+
+    }
+
+    private void addIntoBatch(SeaTunnelRow row, BoundStatement boundStatement) 
{
+        try {
+            for (int i = 0; i < cassandraConfig.getFields().size(); i++) {
+                String fieldName = cassandraConfig.getFields().get(i);
+                DataType dataType = tableSchema.get(i).getType();
+                Object fieldValue = 
row.getField(seaTunnelRowType.indexOf(fieldName));
+                boundStatement = 
TypeConvertUtil.reconvertAndInject(boundStatement, i, dataType, fieldValue);
+            }
+            if (cassandraConfig.getAsyncWrite()) {
+                completionStages.add(session.executeAsync(boundStatement));
+            } else {
+                boundStatementList.add(boundStatement);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Add row data into batch error!", e);
+        }
+    }
+
+    private String initPrepareCQL() {
+        String[] placeholder = new String[cassandraConfig.getFields().size()];
+        Arrays.fill(placeholder, "?");
+        return String.format("INSERT INTO %s (%s) VALUES (%s)",
+            cassandraConfig.getTable(),
+            String.join(",", cassandraConfig.getFields()),
+            String.join(",", placeholder));
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        try {
+            if (this.session != null) {
+                this.session.close();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to close CqlSession!", e);
+        }
+
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
new file mode 100644
index 000000000..163433c47
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.CQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+    private SeaTunnelRowType rowTypeInfo;
+    private CassandraConfig cassandraConfig;
+
+    @Override
+    public String getPluginName() {
+        return "Cassandra";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, 
KEYSPACE, CQL);
+        if (!checkResult.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
checkResult.getMsg());
+        }
+        this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
+        try (CqlSession currentSession = CassandraClient.getCqlSessionBuilder(
+            cassandraConfig.getHost(),
+            cassandraConfig.getKeyspace(),
+            cassandraConfig.getUsername(),
+            cassandraConfig.getPassword(),
+            cassandraConfig.getDatacenter()).build()) {
+            Row rs = 
currentSession.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(),
 cassandraConfig.getConsistencyLevel())).one();
+            if (rs == null) {
+                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "No data in the table!");
+            }
+            int columnSize = rs.getColumnDefinitions().size();
+            String[] fieldNames = new String[columnSize];
+            SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType[columnSize];
+            for (int i = 0; i < columnSize; i++) {
+                fieldNames[i] = 
rs.getColumnDefinitions().get(i).getName().asInternal();
+                seaTunnelDataTypes[i] = 
TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType());
+            }
+            this.rowTypeInfo = new SeaTunnelRowType(fieldNames, 
seaTunnelDataTypes);
+        } catch (Exception e) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
e.getMessage());
+        }
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.rowTypeInfo;
+    }
+
+    @Override
+    public AbstractSingleSplitReader<SeaTunnelRow> 
createReader(SingleSplitReaderContext readerContext) throws Exception {
+        return new CassandraSourceReader(cassandraConfig, readerContext);
+    }
+
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
new file mode 100644
index 000000000..e3f95629a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+@Slf4j
+public class CassandraSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
+    private final CassandraConfig cassandraConfig;
+    private final SingleSplitReaderContext readerContext;
+    private CqlSession session;
+
+    CassandraSourceReader(CassandraConfig cassandraConfig, 
SingleSplitReaderContext readerContext) {
+        this.cassandraConfig = cassandraConfig;
+        this.readerContext = readerContext;
+    }
+
+    @Override
+    public void open() throws Exception {
+        session = CassandraClient.getCqlSessionBuilder(
+            cassandraConfig.getHost(),
+            cassandraConfig.getKeyspace(),
+            cassandraConfig.getUsername(),
+            cassandraConfig.getPassword(),
+            cassandraConfig.getDatacenter()
+        ).build();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (session != null) {
+            session.close();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        try {
+            ResultSet resultSet = 
session.execute(CassandraClient.createSimpleStatement(cassandraConfig.getCql(), 
cassandraConfig.getConsistencyLevel()));
+            resultSet.forEach(row -> 
output.collect(TypeConvertUtil.buildSeaTunnelRow(row)));
+        } finally {
+            this.readerContext.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java
new file mode 100644
index 000000000..1e492c62e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/util/TypeConvertUtil.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.util;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.type.DataType;
+import com.datastax.oss.driver.internal.core.type.DefaultListType;
+import com.datastax.oss.driver.internal.core.type.DefaultMapType;
+import com.datastax.oss.driver.internal.core.type.DefaultSetType;
+import com.datastax.oss.protocol.internal.ProtocolConstants;
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class TypeConvertUtil {
+    public static SeaTunnelDataType<?> convert(DataType type) {
+        switch (type.getProtocolCode()) {
+            case ProtocolConstants.DataType.VARCHAR:
+            case ProtocolConstants.DataType.VARINT:
+            case ProtocolConstants.DataType.ASCII:
+            case ProtocolConstants.DataType.UUID:
+            case ProtocolConstants.DataType.INET:
+            case ProtocolConstants.DataType.TIMEUUID:
+                return BasicType.STRING_TYPE;
+            case ProtocolConstants.DataType.TINYINT:
+                return BasicType.BYTE_TYPE;
+            case ProtocolConstants.DataType.SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case ProtocolConstants.DataType.INT:
+                return BasicType.INT_TYPE;
+            case ProtocolConstants.DataType.BIGINT:
+            case ProtocolConstants.DataType.COUNTER:
+                return BasicType.LONG_TYPE;
+            case ProtocolConstants.DataType.FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case ProtocolConstants.DataType.DOUBLE:
+            case ProtocolConstants.DataType.DECIMAL:
+                return BasicType.DOUBLE_TYPE;
+            case ProtocolConstants.DataType.BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case ProtocolConstants.DataType.TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case ProtocolConstants.DataType.DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case ProtocolConstants.DataType.TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case ProtocolConstants.DataType.BLOB:
+                return ArrayType.BYTE_ARRAY_TYPE;
+            case ProtocolConstants.DataType.MAP:
+                return new MapType<>(convert(((DefaultMapType) 
type).getKeyType()), convert(((DefaultMapType) type).getValueType()));
+            case ProtocolConstants.DataType.LIST:
+                return convertToArrayType(convert(((DefaultListType) 
type).getElementType()));
+            case ProtocolConstants.DataType.SET:
+                return convertToArrayType(convert(((DefaultSetType) 
type).getElementType()));
+            default:
+                throw new RuntimeException("not supported data type: " + type);
+        }
+    }
+
+    private static ArrayType<?, ?> convertToArrayType(SeaTunnelDataType<?> 
dataType) {
+        if (dataType.equals(BasicType.STRING_TYPE)) {
+            return ArrayType.STRING_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.BYTE_TYPE)) {
+            return ArrayType.BYTE_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.SHORT_TYPE)) {
+            return ArrayType.SHORT_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.INT_TYPE)) {
+            return ArrayType.INT_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.LONG_TYPE)) {
+            return ArrayType.LONG_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.FLOAT_TYPE)) {
+            return ArrayType.FLOAT_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.DOUBLE_TYPE)) {
+            return ArrayType.DOUBLE_ARRAY_TYPE;
+        } else if (dataType.equals(BasicType.BOOLEAN_TYPE)) {
+            return ArrayType.BOOLEAN_ARRAY_TYPE;
+        } else {
+            throw new RuntimeException("not supported data type: " + dataType);
+        }
+    }
+
+    public static SeaTunnelRow buildSeaTunnelRow(Row row) {
+        DataType subType;
+        Class<?> typeClass;
+        Object[] fields = new Object[row.size()];
+        ColumnDefinitions metaData = row.getColumnDefinitions();
+        for (int i = 0; i < row.size(); i++) {
+            switch (metaData.get(i).getType().getProtocolCode()) {
+                case ProtocolConstants.DataType.ASCII:
+                case ProtocolConstants.DataType.VARCHAR:
+                    fields[i] = row.getString(i);
+                    break;
+                case ProtocolConstants.DataType.VARINT:
+                    fields[i] = 
Objects.requireNonNull(row.getBigInteger(i)).toString();
+                    break;
+                case ProtocolConstants.DataType.TIMEUUID:
+                case ProtocolConstants.DataType.UUID:
+                    fields[i] = 
Objects.requireNonNull(row.getUuid(i)).toString();
+                    break;
+                case ProtocolConstants.DataType.INET:
+                    fields[i] = 
Objects.requireNonNull(row.getInetAddress(i)).getHostAddress();
+                    break;
+                case ProtocolConstants.DataType.TINYINT:
+                    fields[i] = row.getByte(i);
+                    break;
+                case ProtocolConstants.DataType.SMALLINT:
+                    fields[i] = row.getShort(i);
+                    break;
+                case ProtocolConstants.DataType.INT:
+                    fields[i] = row.getInt(i);
+                    break;
+                case ProtocolConstants.DataType.BIGINT:
+                    fields[i] = row.getLong(i);
+                    break;
+                case ProtocolConstants.DataType.FLOAT:
+                    fields[i] = row.getFloat(i);
+                    break;
+                case ProtocolConstants.DataType.DOUBLE:
+                    fields[i] = row.getDouble(i);
+                    break;
+                case ProtocolConstants.DataType.DECIMAL:
+                    fields[i] = 
Objects.requireNonNull(row.getBigDecimal(i)).doubleValue();
+                    break;
+                case ProtocolConstants.DataType.BOOLEAN:
+                    fields[i] = row.getBoolean(i);
+                    break;
+                case ProtocolConstants.DataType.TIME:
+                    fields[i] = row.getLocalTime(i);
+                    break;
+                case ProtocolConstants.DataType.DATE:
+                    fields[i] = row.getLocalDate(i);
+                    break;
+                case ProtocolConstants.DataType.TIMESTAMP:
+                    fields[i] = 
Timestamp.from(Objects.requireNonNull(row.getInstant(i))).toLocalDateTime();
+                    break;
+                case ProtocolConstants.DataType.BLOB:
+                    fields[i] = 
ArrayUtils.toObject(Objects.requireNonNull(row.getByteBuffer(i)).array());
+                    break;
+                case ProtocolConstants.DataType.MAP:
+                    subType = metaData.get(i).getType();
+                    fields[i] = row.getMap(i, convert(((DefaultMapType) 
subType).getKeyType()).getTypeClass(), convert(((DefaultMapType) 
subType).getValueType()).getTypeClass());
+                    break;
+                case ProtocolConstants.DataType.LIST:
+                    typeClass = convert(((DefaultListType) 
metaData.get(i).getType()).getElementType()).getTypeClass();
+                    if (String.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
String.class)).toArray(new String[0]);
+                    } else if (Byte.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Byte.class)).toArray(new Byte[0]);
+                    } else if (Short.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Short.class)).toArray(new Short[0]);
+                    } else if (Integer.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Integer.class)).toArray(new Integer[0]);
+                    } else if (Long.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Long.class)).toArray(new Long[0]);
+                    } else if (Float.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Float.class)).toArray(new Float[0]);
+                    } else if (Double.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Double.class)).toArray(new Double[0]);
+                    } else if (Boolean.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getList(i, 
Boolean.class)).toArray(new Boolean[0]);
+                    } else {
+                        throw new RuntimeException("List not supported data 
type: " + typeClass.toString());
+                    }
+                    break;
+                case ProtocolConstants.DataType.SET:
+                    typeClass = convert(((DefaultSetType) 
metaData.get(i).getType()).getElementType()).getTypeClass();
+                    if (String.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
String.class)).toArray(new String[0]);
+                    } else if (Byte.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Byte.class)).toArray(new Byte[0]);
+                    } else if (Short.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Short.class)).toArray(new Short[0]);
+                    } else if (Integer.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Integer.class)).toArray(new Integer[0]);
+                    } else if (Long.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Long.class)).toArray(new Long[0]);
+                    } else if (Float.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Float.class)).toArray(new Float[0]);
+                    } else if (Double.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Double.class)).toArray(new Double[0]);
+                    } else if (Boolean.class.equals(typeClass)) {
+                        fields[i] = Objects.requireNonNull(row.getSet(i, 
Boolean.class)).toArray(new Boolean[0]);
+                    } else {
+                        throw new RuntimeException("List not supported data 
type: " + typeClass.toString());
+                    }
+                    break;
+                default:
+                    fields[i] = row.getObject(i);
+            }
+        }
+        return new SeaTunnelRow(fields);
+    }
+
+    public static BoundStatement reconvertAndInject(BoundStatement statement, 
int index, DataType type, Object fileValue) {
+        switch (type.getProtocolCode()) {
+            case ProtocolConstants.DataType.VARCHAR:
+            case ProtocolConstants.DataType.ASCII:
+                statement = statement.setString(index, (String) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.VARINT:
+                statement = statement.setBigInteger(index, new 
BigInteger((String) fileValue));
+                return statement;
+            case ProtocolConstants.DataType.UUID:
+            case ProtocolConstants.DataType.TIMEUUID:
+                statement = statement.setUuid(index, UUID.fromString((String) 
fileValue));
+                return statement;
+            case ProtocolConstants.DataType.INET:
+                try {
+                    statement = statement.setInetAddress(index, 
InetAddress.getByName((String) fileValue));
+                } catch (UnknownHostException e) {
+                    throw new RuntimeException(e);
+                }
+                return statement;
+            case ProtocolConstants.DataType.TINYINT:
+                statement = statement.setByte(index, (Byte) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.SMALLINT:
+                statement = statement.setShort(index, (Short) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.INT:
+                statement = statement.setInt(index, (Integer) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.BIGINT:
+            case ProtocolConstants.DataType.COUNTER:
+                statement = statement.setLong(index, (Long) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.FLOAT:
+                statement = statement.setFloat(index, (Float) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.DOUBLE:
+                statement = statement.setDouble(index, (Double) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.DECIMAL:
+                statement = statement.setBigDecimal(index, 
BigDecimal.valueOf((Double) fileValue));
+                return statement;
+            case ProtocolConstants.DataType.BOOLEAN:
+                statement = statement.setBoolean(index, (Boolean) fileValue);
+                return statement;
+            case ProtocolConstants.DataType.TIME:
+                statement = statement.setLocalTime(index, (LocalTime) 
fileValue);
+                return statement;
+            case ProtocolConstants.DataType.DATE:
+                statement = statement.setLocalDate(index, (LocalDate) 
fileValue);
+                return statement;
+            case ProtocolConstants.DataType.TIMESTAMP:
+                statement = statement.setInstant(index, ((LocalDateTime) 
fileValue).atZone(ZoneId.systemDefault()).toInstant());
+                return statement;
+            case ProtocolConstants.DataType.BLOB:
+                if (fileValue.getClass().equals(Object[].class)) {
+                    fileValue = Arrays.stream((Object[]) 
fileValue).toArray(Byte[]::new);
+                }
+                statement = statement.setByteBuffer(index, 
ByteBuffer.wrap(ArrayUtils.toPrimitive((Byte[]) fileValue)));
+                return statement;
+            case ProtocolConstants.DataType.MAP:
+                statement = statement.set(index, (Map<?, ?>) fileValue, 
Map.class);
+                return statement;
+            case ProtocolConstants.DataType.LIST:
+                statement = statement.set(index, Arrays.stream((Object[]) 
fileValue).collect(Collectors.toList()), List.class);
+                return statement;
+            case ProtocolConstants.DataType.SET:
+                statement = statement.set(index, Arrays.stream((Object[]) 
fileValue).collect(Collectors.toSet()), Set.class);
+                return statement;
+            default:
+                statement = statement.set(index, fileValue, Object.class);
+                return statement;
+        }
+    }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index d2fd18c6a..267ead164 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -57,6 +57,7 @@
         <module>connector-iceberg</module>
         <module>connector-influxdb</module>
         <module>connector-amazondynamodb</module>
+        <module>connector-cassandra</module>
         <module>connector-starrocks</module>
     </modules>
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml
new file mode 100644
index 000000000..d9775f3b2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-cassandra-e2e</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-cassandra</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>cassandra</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
new file mode 100644
index 000000000..9e54b914d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.cql.BatchStatement;
+import com.datastax.oss.driver.api.core.cql.BatchType;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.uuid.Uuids;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+@Slf4j
+public class CassandraIT extends TestSuiteBase implements TestResource {
+    private static final String CASSANDRA_DOCKER_IMAGE = "cassandra";
+    private static final String HOST = "cassandra";
+    private static final Integer PORT = 9042;
+    private static final String INIT_CASSANDRA_PATH = 
"/init/cassandra_init.conf";
+    private static final String CASSANDRA_JOB_CONFIG = 
"/cassandra_to_cassandra.conf";
+    private static final String CASSANDRA_DRIVER_CONFIG = "/application.conf";
+    private static final String DATACENTER = "datacenter1";
+    private static final String KEYSPACE = "test";
+    private static final String SOURCE_TABLE = "source_table";
+    private static final String SINK_TABLE = "sink_table";
+    private static final String INSERT_CQL = "insert_cql";
+    private static final Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> 
TEST_DATASET = generateTestDataSet();
+    private Config config;
+    private CassandraContainer<?> container;
+    private CqlSession session;
+
+    @TestTemplate
+    public void testCassandra(TestContainer container) throws Exception {
+        Container.ExecResult execResult = 
container.executeJob(CASSANDRA_JOB_CONFIG);
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertNotNull(getRow());
+        compareResult();
+        clearSinkTable();
+        Assertions.assertNull(getRow());
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.container = new CassandraContainer<>(CASSANDRA_DOCKER_IMAGE)
+            .withNetwork(NETWORK)
+            .withNetworkAliases(HOST)
+            .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(CASSANDRA_DOCKER_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
PORT, PORT)));
+        Startables.deepStart(Stream.of(this.container)).join();
+        log.info("Cassandra container started");
+        Awaitility.given()
+            .ignoreExceptions()
+            .await()
+            .atMost(180L, TimeUnit.SECONDS)
+            .untilAsserted(this::initConnection);
+        this.initializeCassandraTable();
+        this.batchInsertData();
+    }
+
+    private void initializeCassandraTable() {
+        initCassandraConfig();
+        createKeyspace();
+        try {
+            
session.execute(SimpleStatement.builder(config.getString(SOURCE_TABLE)).setKeyspace(KEYSPACE).build());
+            
session.execute(SimpleStatement.builder(config.getString(SINK_TABLE)).setKeyspace(KEYSPACE).build());
+        } catch (Exception e) {
+            throw new RuntimeException("Initializing Cassandra table failed!", 
e);
+        }
+    }
+
+    private void initConnection() {
+        try {
+            File file = new File(CASSANDRA_DRIVER_CONFIG);
+            this.session = CqlSession.builder()
+                .addContactPoint(new InetSocketAddress(container.getHost(), 
container.getExposedPorts().get(0)))
+                .withLocalDatacenter(DATACENTER)
+                .withConfigLoader(DriverConfigLoader.fromFile(file))
+                .build();
+        } catch (Exception e) {
+            throw new RuntimeException("Init connection failed!", e);
+        }
+
+    }
+
+    private void batchInsertData() {
+        try {
+            BatchStatement batchStatement = 
BatchStatement.builder(BatchType.UNLOGGED).build();
+            BoundStatement boundStatement = session.prepare(
+                    
SimpleStatement.builder(config.getString(INSERT_CQL)).setKeyspace(KEYSPACE).build())
+                .bind();
+            for (SeaTunnelRow row : TEST_DATASET._2()) {
+                boundStatement = boundStatement
+                    .setLong(0, (Long) row.getField(0))
+                    .setString(1, (String) row.getField(1))
+                    .setLong(2, (Long) row.getField(2))
+                    .setByteBuffer(3, (ByteBuffer) row.getField(3))
+                    .setBoolean(4, (Boolean) row.getField(4))
+                    .setBigDecimal(5, (BigDecimal) row.getField(5))
+                    .setDouble(6, (Double) row.getField(6))
+                    .setFloat(7, (Float) row.getField(7))
+                    .setInt(8, (Integer) row.getField(8))
+                    .setInstant(9, (Instant) row.getField(9))
+                    .setUuid(10, (UUID) row.getField(10))
+                    .setString(11, (String) row.getField(11))
+                    .setBigInteger(12, (BigInteger) row.getField(12))
+                    .setUuid(13, (UUID) row.getField(13))
+                    .setInetAddress(14, (InetAddress) row.getField(14))
+                    .setLocalDate(15, (LocalDate) row.getField(15))
+                    .setShort(16, (Short) row.getField(16))
+                    .setByte(17, (Byte) row.getField(17))
+                    .setList(18, (List<Float>) row.getField(18), Float.class)
+                    .setList(19, (List<Integer>) row.getField(19), 
Integer.class)
+                    .setSet(20, (Set<Double>) row.getField(20), Double.class)
+                    .setSet(21, (Set<Long>) row.getField(21), Long.class)
+                    .setMap(22, (Map<String, Integer>) row.getField(22), 
String.class, Integer.class);
+                batchStatement = batchStatement.add(boundStatement);
+            }
+            session.execute(batchStatement);
+            batchStatement.clear();
+        } catch (Exception e) {
+            throw new RuntimeException("Batch insert data failed!", e);
+        }
+    }
+
+    private void compareResult() throws IOException {
+        String sourceCql = "select * from " + SOURCE_TABLE;
+        String sinkCql = "select * from " + SINK_TABLE;
+        List<String> columnList = 
Arrays.stream(generateTestDataSet()._1().getFieldNames()).collect(Collectors.toList());
+        ResultSet sourceResultSet = 
session.execute(SimpleStatement.builder(sourceCql).setKeyspace(KEYSPACE).build());
+        ResultSet sinkResultSet = 
session.execute(SimpleStatement.builder(sinkCql).setKeyspace(KEYSPACE).build());
+        Assertions.assertEquals(sourceResultSet.getColumnDefinitions().size(), 
sinkResultSet.getColumnDefinitions().size());
+        Iterator<Row> sourceIterator = sourceResultSet.iterator();
+        Iterator<Row> sinkIterator = sinkResultSet.iterator();
+        while (sourceIterator.hasNext()) {
+            if (sinkIterator.hasNext()) {
+                Row sourceNext = sourceIterator.next();
+                Row sinkNext = sinkIterator.next();
+                for (String column : columnList) {
+                    Object source = sourceNext.getObject(column);
+                    Object sink = sinkNext.getObject(column);
+                    if (!Objects.deepEquals(source, sink)) {
+                        InputStream sourceAsciiStream = sourceNext.get(column, 
ByteArrayInputStream.class);
+                        InputStream sinkAsciiStream = sinkNext.get(column, 
ByteArrayInputStream.class);
+                        Assertions.assertNotNull(sourceAsciiStream);
+                        Assertions.assertNotNull(sinkAsciiStream);
+                        String sourceValue = 
IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8);
+                        String sinkValue = IOUtils.toString(sinkAsciiStream, 
StandardCharsets.UTF_8);
+                        Assertions.assertEquals(sourceValue, sinkValue);
+                    }
+                    Assertions.assertTrue(true);
+                }
+            }
+        }
+
+    }
+
+    private void createKeyspace() {
+        try {
+            this.session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +
+                " WITH replication = \n" +
+                "{'class':'SimpleStrategy','replication_factor':'1'};");
+        } catch (Exception e) {
+            throw new RuntimeException("Create keyspace failed!", e);
+        }
+    }
+
+    private void clearSinkTable() {
+        try {
+            session.execute(SimpleStatement.builder(String.format("truncate 
table %s", SINK_TABLE)).setKeyspace(KEYSPACE).build());
+        } catch (Exception e) {
+            throw new RuntimeException("Test clickhouse server image failed!", 
e);
+        }
+    }
+
+    private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> 
generateTestDataSet() {
+        SeaTunnelRowType rowType = new SeaTunnelRowType(
+            new String[]{
+                "id",
+                "c_ascii",
+                "c_bigint",
+                "c_blob",
+                "c_boolean",
+                "c_decimal",
+                "c_double",
+                "c_float",
+                "c_int",
+                "c_timestamp",
+                "c_uuid",
+                "c_text",
+                "c_varint",
+                "c_timeuuid",
+                "c_inet",
+                "c_date",
+                "c_smallint",
+                "c_tinyint",
+                "c_list_float",
+                "c_list_int",
+                "c_set_double",
+                "c_set_bigint",
+                "c_map"
+            },
+            new SeaTunnelDataType[]{
+                BasicType.LONG_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.LONG_TYPE,
+                ArrayType.BYTE_ARRAY_TYPE,
+                BasicType.BOOLEAN_TYPE,
+                new DecimalType(9, 4),
+                BasicType.DOUBLE_TYPE,
+                BasicType.FLOAT_TYPE,
+                BasicType.INT_TYPE,
+                LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.STRING_TYPE,
+                BasicType.STRING_TYPE,
+                LocalTimeType.LOCAL_DATE_TYPE,
+                BasicType.SHORT_TYPE,
+                BasicType.BYTE_TYPE,
+                ArrayType.FLOAT_ARRAY_TYPE,
+                ArrayType.INT_ARRAY_TYPE,
+                ArrayType.DOUBLE_ARRAY_TYPE,
+                ArrayType.LONG_ARRAY_TYPE,
+                new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE)
+            });
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 50; ++i) {
+            SeaTunnelRow row;
+            try {
+                row = new SeaTunnelRow(
+                    new Object[]{
+                        (long) i,
+                        String.valueOf(i),
+                        (long) i,
+                        ByteBuffer.wrap(new byte[]{Byte.parseByte("1")}),
+                        Boolean.FALSE,
+                        BigDecimal.valueOf(11L, 2),
+                        Double.parseDouble("1.1"),
+                        Float.parseFloat("2.1"),
+                        i,
+                        Instant.now(),
+                        UUID.randomUUID(),
+                        "text",
+                        new BigInteger("12345678909876543210"),
+                        Uuids.timeBased(),
+                        InetAddress.getByName("1.2.3.4"),
+                        LocalDate.now(),
+                        Short.parseShort("1"),
+                        Byte.parseByte("1"),
+                        Collections.singletonList((float) i),
+                        Collections.singletonList(i),
+                        Collections.singleton(Double.valueOf("1.1")),
+                        Collections.singleton((long) i),
+                        Collections.singletonMap("key_" + i, i)
+                    });
+            } catch (UnknownHostException e) {
+                throw new RuntimeException("Generate Test DataSet Failed!", e);
+            }
+            rows.add(row);
+        }
+        return Tuple2.apply(rowType, rows);
+    }
+
+    private Row getRow() {
+        try {
+            String sql = String.format("select * from %s limit 1", SINK_TABLE);
+            ResultSet resultSet = 
session.execute(SimpleStatement.builder(sql).setKeyspace(KEYSPACE).build());
+            return resultSet.one();
+        } catch (Exception e) {
+            throw new RuntimeException("test cassandra server image failed!", 
e);
+        }
+    }
+
+    private void initCassandraConfig() {
+        File file = ContainerUtil.getResourcesFile(INIT_CASSANDRA_PATH);
+        Config config = ConfigFactory.parseFile(file);
+        assert config.hasPath(SOURCE_TABLE) && config.hasPath(SINK_TABLE) && 
config.hasPath(INSERT_CQL);
+        this.config = config;
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (this.session != null) {
+            this.session.close();
+        }
+        if (this.container != null) {
+            this.container.close();
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf
new file mode 100644
index 000000000..88be3f73c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/application.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+datastax-java-driver {
+    advanced.protocol.version = V5
+    profiles {
+        slow {
+          basic.request.timeout = 10 seconds
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf
new file mode 100644
index 000000000..4f42eb862
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/cassandra_to_cassandra.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set spark configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+ Cassandra {
+     host = "cassandra:9042"
+     username = ""
+     password = ""
+     datacenter = "datacenter1"
+     keyspace = "test"
+     cql = "select * from source_table"
+     result_table_name = "source_table"
+    }
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+  # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+  Cassandra {
+    host = "cassandra:9042"
+    username = ""
+    password = ""
+    datacenter = "datacenter1"
+    keyspace = "test"
+    async_write = "true"
+    table = "sink_table"
+   }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf
new file mode 100644
index 000000000..62b495244
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/resources/init/cassandra_init.conf
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source_table = """
+create table if not exists source_table(
+    id              bigint,
+    c_ascii         ascii,
+    c_bigint        bigint,
+    c_blob          blob,
+    c_boolean       boolean,
+    c_decimal       decimal,
+    c_double        double,
+    c_float         float,
+    c_int           int,
+    c_timestamp     timestamp,
+    c_uuid          uuid,
+    c_text          text,
+    c_varint        varint,
+    c_timeuuid      timeuuid,
+    c_inet          inet,
+    c_date          date,
+    c_smallint      smallint,
+    c_tinyint       tinyint,
+    c_list_float    list<float>,
+    c_list_int      list<int>,
+    c_set_double    set<double>,
+    c_set_bigint    set<bigint>,
+    c_map           map<text,int>,
+    PRIMARY KEY (id)
+);
+"""
+
+sink_table = """
+create table if not exists sink_table(
+    id              bigint,
+    c_ascii         ascii,
+    c_bigint        bigint,
+    c_blob          blob,
+    c_boolean       boolean,
+    c_decimal       decimal,
+    c_double        double,
+    c_float         float,
+    c_int           int,
+    c_timestamp     timestamp,
+    c_uuid          uuid,
+    c_text          text,
+    c_varint        varint,
+    c_timeuuid      timeuuid,
+    c_inet          inet,
+    c_date          date,
+    c_smallint      smallint,
+    c_tinyint       tinyint,
+    c_list_float    list<float>,
+    c_list_int      list<int>,
+    c_set_double    set<double>,
+    c_set_bigint    set<bigint>,
+    c_map           map<text,int>,
+    PRIMARY KEY (id)
+);
+"""
+
+insert_cql = """
+insert into source_table
+(
+    id,
+    c_ascii,
+    c_bigint,
+    c_blob,
+    c_boolean,
+    c_decimal,
+    c_double,
+    c_float,
+    c_int,
+    c_timestamp,
+    c_uuid,
+    c_text,
+    c_varint,
+    c_timeuuid,
+    c_inet,
+    c_date,
+    c_smallint,
+    c_tinyint,
+    c_list_float,
+    c_list_int,
+    c_set_double,
+    c_set_bigint,
+    c_map
+)
+values
+(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
+"""
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index d5cdf69aa..e5d8ca076 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -32,6 +32,7 @@
         <module>connector-influxdb-e2e</module>
         <module>connector-amazondynamodb-e2e</module>
         <module>connector-file-local-e2e</module>
+        <module>connector-cassandra-e2e</module>
     </modules>
 
     <artifactId>seatunnel-connector-v2-e2e</artifactId>

Reply via email to