This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new eebee8785b [Feature][Connector-V2] add fluss Connector (#10121)
eebee8785b is described below

commit eebee8785ba366e6d23037287a024011195878ff
Author: dyp12 <[email protected]>
AuthorDate: Mon Dec 29 10:35:51 2025 +0800

    [Feature][Connector-V2] add fluss Connector (#10121)
---
 docs/en/connector-v2/changelog/connector-fluss.md  |   6 +
 docs/en/connector-v2/sink/Fluss.md                 | 352 ++++++++++++++++++
 docs/zh/connector-v2/changelog/connector-fluss.md  |   6 +
 docs/zh/connector-v2/sink/Fluss.md                 | 351 ++++++++++++++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2/connector-fluss/pom.xml    |  51 +++
 .../seatunnel/fluss/config/FlussBaseOptions.java   |  49 +++
 .../seatunnel/fluss/config/FlussSinkOptions.java   |  20 +
 .../connectors/seatunnel/fluss/sink/FlussSink.java |  58 +++
 .../seatunnel/fluss/sink/FlussSinkFactory.java     |  53 +++
 .../seatunnel/fluss/sink/FlussSinkWriter.java      | 228 ++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   7 +
 .../connector-fluss-e2e/pom.xml                    |  82 +++++
 .../seatunnel/e2e/connector/fluss/FlussSinkIT.java | 402 +++++++++++++++++++++
 .../src/test/resources/fake_to_fluss.conf          |  96 +++++
 .../resources/fake_to_multipletable_fluss.conf     | 200 ++++++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../seatunnel/format/json/JsonToRowConverters.java |  13 +
 .../seatunnel/format/json/RowToJsonConverters.java |  10 +
 .../format/json/JsonRowDataSerDeSchemaTest.java    |  47 ++-
 21 files changed, 2020 insertions(+), 14 deletions(-)

diff --git a/docs/en/connector-v2/changelog/connector-fluss.md 
b/docs/en/connector-v2/changelog/connector-fluss.md
new file mode 100644
index 0000000000..97ff142816
--- /dev/null
+++ b/docs/en/connector-v2/changelog/connector-fluss.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+|--------|--------|---------|
+
+</details>
diff --git a/docs/en/connector-v2/sink/Fluss.md 
b/docs/en/connector-v2/sink/Fluss.md
new file mode 100644
index 0000000000..06f70d0a27
--- /dev/null
+++ b/docs/en/connector-v2/sink/Fluss.md
@@ -0,0 +1,352 @@
+import ChangeLog from '../changelog/connector-fluss.md';
+
+# Fluss
+
+> Fluss sink connector
+
+## Support These Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+- [x] [support multiple table write](../../concept/connector-v2-features.md)
+
+## Description
+
+Used to send data to Fluss. Both support streaming and batch mode.
+
+## Using Dependency
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-client</artifactId>
+            <version>0.7.0</version>
+        </dependency>
+
+## Sink Options
+
+| Name              | Type   | Required | Default | Description                
                                                                                
 |
+|-------------------|--------|----------|---------|-------------------------------------------------------------------------------------------------------------|
+| bootstrap.servers | string | yes      | -       | The bootstrap servers for 
the Fluss sink connection.                                                      
  |
+| database          | string | no       | -       | The name of Fluss 
database, If not set, the table name will be the name of the upstream db        
          |
+| table             | string | no       | -       | The name of Fluss table, 
If not set, the table name will be the name of the upstream table               
   |
+| client.config     | Map    | no       | -       | set other client config. 
Please refer to  
https://fluss.apache.org/docs/engine-flink/options/#other-options |
+
+
+### database [string]
+
+The name of Fluss database, If not set, the table name will be the name of the 
upstream db
+
+for example:
+
+1. test_${schema_name}_test
+2. sink_sinkdb
+3. ss_${database_name}
+
+
+### table [string]
+
+The name of Fluss table, If not set, the table name will be the name of the 
upstream table
+
+for example:
+1. test_${table_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+
+## Data Type Mapping
+
+| StarRocks Data type | Fluss Data type |
+|---------------------|-----------------|
+| BOOLEAN             | BOOLEAN         |
+| TINYINT             | TINYINT         |
+| SMALLINT            | SMALLINT        |
+| INT                 | INT             |
+| BIGINT              | BIGINT          |
+| FLOAT               | FLOAT           |
+| DOUBLE              | DOUBLE          |
+| DOUBLE              | DOUBLE          |
+| BYTES               | BYTES           |
+| DATE                | DATE            |
+| TIME                | TIME            |
+| TIMESTAMP           | TIMESTAMP       |
+| TIMESTAMP_TZ        | TIMESTAMP_TZ    |
+| STRING              | STRING          |
+
+## Task Example
+
+### Simple
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    tables_configs = [
+        {
+        row.num = 7
+          schema {
+            table = "test.table1"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    }
+      ]
+}
+}
+
+transform {
+}
+
+sink {
+  Fluss {
+    bootstrap.servers="fluss_coordinator_e2e:9123"
+    database = "fluss_db_${database_name}"
+    table = "fluss_tb_${table_name}"
+  }
+}
+```
+
+### Multiple table
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    tables_configs = [
+        {
+        row.num = 7
+          schema {
+            table = "test2.table1"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    },
+    {
+        row.num = 7
+          schema {
+            table = "test2.table2"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    },
+    {
+        row.num = 7
+          schema {
+            table = "test3.table3"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    }
+      ]
+}
+}
+
+transform {
+}
+
+sink {
+  Fluss {
+    bootstrap.servers="fluss_coordinator_e2e:9123"
+    database = "fluss_db_${database_name}"
+    table = "fluss_tb_${table_name}"
+  }
+}
+```
+
+
+## Changelog
+
+<ChangeLog />
+
diff --git a/docs/zh/connector-v2/changelog/connector-fluss.md 
b/docs/zh/connector-v2/changelog/connector-fluss.md
new file mode 100644
index 0000000000..97ff142816
--- /dev/null
+++ b/docs/zh/connector-v2/changelog/connector-fluss.md
@@ -0,0 +1,6 @@
+<details><summary> Change Log </summary>
+
+| Change | Commit | Version |
+|--------|--------|---------|
+
+</details>
diff --git a/docs/zh/connector-v2/sink/Fluss.md 
b/docs/zh/connector-v2/sink/Fluss.md
new file mode 100644
index 0000000000..bc32f8db8f
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Fluss.md
@@ -0,0 +1,351 @@
+import ChangeLog from '../changelog/connector-fluss.md';
+
+# Fluss
+
+> Fluss 数据接收器
+
+## 引擎支持
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## 主要特性
+
+- [ ] [精准一次](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+- [x] [支持多表写入](../../concept/connector-v2-features.md)
+
+## 描述
+
+该接收器用于将数据写入到Fluss中。支持批和流两种模式。
+
+## 依赖
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-client</artifactId>
+            <version>0.7.0</version>
+        </dependency>
+
+
+## 接收器选项
+
+| 名称                | 类型     | 是否必须 | 默认值 | Description                        
                                              |
+|-------------------|--------|------|-----|----------------------------------------------------------------------------------|
+| bootstrap.servers | string | yes  | -   | fluss 集群地址                         
                                              |
+| database          | string | no   | -   | 指定目标 Fluss 表所在的数据库的名称, 
如果没有设置该值,则表名与上游库名相同                                       |
+| table             | string | no   | -   | 指定目标 Fluss 表的名称, 
如果没有设置该值,则表名与上游表名相同                                             |
+| client.config     | Map    | no   | -   | 设置其他客户端配置. 参考  
https://fluss.apache.org/docs/engine-flink/options/#other-options |
+
+
+### database [string]
+
+database选项参数可以填入一任意库名,这个名字最终会被用作目标表的库名,并且支持变量(`${database_name}`,`${schema_name}`)。
+替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${database_name}` 将替换传递给目标端的库名。
+
+例如:
+1. test_${schema_name}_test
+2. sink_sinkdb
+3. ss_${database_name}
+
+
+### table [string]
+
+table选项参数可以填入一任意表名,这个名字最终会被用作目标表的表名,并且支持变量(`${table_name}`,`${schema_name}`)。
+替换规则如下:`${schema_name}` 将替换传递给目标端的 SCHEMA 名称,`${table_name}` 将替换传递给目标端的表名。
+
+例如:
+1. test_${schema_name}_test
+2. sink_sinktable
+3. ss_${table_name}
+
+## 数据类型映射
+
+| FLuss数据类型    | SeaTunnel数据类型 |
+|--------------|---------------|
+| BOOLEAN      | BOOLEAN       |
+| TINYINT      | TINYINT       |
+| SMALLINT     | SMALLINT      |
+| INT          | INT           |
+| BIGINT       | BIGINT        |
+| FLOAT        | FLOAT         |
+| DOUBLE       | DOUBLE        |
+| DOUBLE       | DOUBLE        |
+| BYTES        | BYTES         |
+| DATE         | DATE          |
+| TIME         | TIME          |
+| TIMESTAMP    | TIMESTAMP     |
+| TIMESTAMP_TZ | TIMESTAMP_TZ  |
+| STRING       | STRING        |
+
+
+## 任务示例
+
+### 简单示例
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    tables_configs = [
+        {
+        row.num = 7
+          schema {
+            table = "test.table1"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    }
+      ]
+}
+}
+
+transform {
+}
+
+sink {
+  Fluss {
+    bootstrap.servers="fluss_coordinator_e2e:9123"
+    database = "fluss_db_${database_name}"
+    table = "fluss_tb_${table_name}"
+  }
+}
+```
+### 多表写入
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    tables_configs = [
+        {
+        row.num = 7
+          schema {
+            table = "test2.table1"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    },
+    {
+        row.num = 7
+          schema {
+            table = "test2.table2"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    },
+    {
+        row.num = 7
+          schema {
+            table = "test3.table3"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    }
+      ]
+}
+}
+
+transform {
+}
+
+sink {
+  Fluss {
+    bootstrap.servers="fluss_coordinator_e2e:9123"
+    database = "fluss_db_${database_name}"
+    table = "fluss_tb_${table_name}"
+  }
+}
+```
+
+## 变更日志
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 4172161f5e..74b84a4758 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -150,6 +150,7 @@ seatunnel.sink.GraphQL = connector-graphql
 seatunnel.sink.Aerospike = connector-aerospike
 seatunnel.sink.SensorsData = connector-sensorsdata
 seatunnel.sink.HugeGraph = connector-hugegraph
+seatunnel.sink.Fluss = connector-fluss
 
 # For custom transforms, make sure to use the 
seatunnel.transform.[PluginIdentifier]=[JarPerfix] naming convention. For 
example:
 # seatunnel.transform.Sql = seatunnel-transforms-v2
diff --git a/seatunnel-connectors-v2/connector-fluss/pom.xml 
b/seatunnel-connectors-v2/connector-fluss/pom.xml
new file mode 100644
index 0000000000..8db032b6f9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fluss/pom.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-fluss</artifactId>
+    <name>SeaTunnel : Connectors V2 : Fluss</name>
+
+    <properties>
+        <fluss.client.version>0.7.0</fluss.client.version>
+        <connector.name>connector.fluss</connector.name>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-client</artifactId>
+            <version>${fluss.client.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java
 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java
new file mode 100644
index 0000000000..e582c760ec
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussBaseOptions.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.fluss.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class FlussBaseOptions implements Serializable {
+    public static final String CONNECTOR_IDENTITY = "Fluss";
+    public static final Option<String> BOOTSTRAP_SERVERS =
+            Options.key("bootstrap.servers")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Fluss cluster address");
+    public static final Option<String> DATABASE =
+            Options.key("database")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The name of Fluss database");
+
+    public static final Option<String> TABLE =
+            Options.key("table")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The name of Fluss table");
+
+    public static final Option<Map<String, String>> CLIENT_CONFIG =
+            Options.key("client.config")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription("The parameter of Fluss client add to 
Connection ");
+}
diff --git 
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java
 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java
new file mode 100644
index 0000000000..293770eba9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/config/FlussSinkOptions.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.config;
+
+public class FlussSinkOptions extends FlussBaseOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java
 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java
new file mode 100644
index 0000000000..771d414c70
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSink.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Optional;
+
+@Slf4j
+public class FlussSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
+
+    private final ReadonlyConfig pluginConfig;
+    private final CatalogTable catalogTable;
+
+    public FlussSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+        this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public FlussSinkWriter createWriter(SinkWriter.Context context) {
+        return new FlussSinkWriter(context, catalogTable, pluginConfig);
+    }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
+
+    @Override
+    public String getPluginName() {
+        return FlussSinkOptions.CONNECTOR_IDENTITY;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java
 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java
new file mode 100644
index 0000000000..13ee142468
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
+
+@AutoService(Factory.class)
+public class FlussSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return FlussSinkOptions.CONNECTOR_IDENTITY;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(FlussSinkOptions.BOOTSTRAP_SERVERS)
+                .optional(FlussSinkOptions.DATABASE)
+                .optional(FlussSinkOptions.TABLE)
+                .optional(FlussSinkOptions.CLIENT_CONFIG)
+                .optional(MULTI_TABLE_SINK_REPLICA)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new FlussSink(context.getOptions(), 
context.getCatalogTable());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java
 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java
new file mode 100644
index 0000000000..91881d30e5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-fluss/src/main/java/org/apache/seatunnel/connectors/seatunnel/fluss/sink/FlussSinkWriter.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fluss.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.fluss.config.FlussSinkOptions;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+public class FlussSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
+        implements SupportMultiTableSinkWriter<Void> {
+
+    private Connection connection;
+    private TableWriter writer;
+    private Table table;
+    private String dbName;
+    private String tableName;
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    public FlussSinkWriter(
+            SinkWriter.Context context, CatalogTable catalogTable, 
ReadonlyConfig pluginConfig) {
+        seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+        Configuration flussConfig = new Configuration();
+        flussConfig.setString(
+                FlussSinkOptions.BOOTSTRAP_SERVERS.key(),
+                pluginConfig.get(FlussSinkOptions.BOOTSTRAP_SERVERS));
+        Optional<Map<String, String>> clientConfig =
+                pluginConfig.getOptional(FlussSinkOptions.CLIENT_CONFIG);
+        if (clientConfig.isPresent()) {
+            clientConfig
+                    .get()
+                    .forEach(
+                            (k, v) -> {
+                                flussConfig.setString(k, v);
+                            });
+        }
+        log.info("Connect to Fluss with config: {}", flussConfig);
+        connection = ConnectionFactory.createConnection(flussConfig);
+        log.info("Connect to Fluss success");
+        dbName =
+                pluginConfig
+                        .getOptional(FlussSinkOptions.DATABASE)
+                        .orElseGet(() -> 
catalogTable.getTableId().getDatabaseName());
+        tableName =
+                pluginConfig
+                        .getOptional(FlussSinkOptions.TABLE)
+                        .orElseGet(() -> 
catalogTable.getTableId().getTableName());
+        TablePath tablePath = TablePath.of(dbName, tableName);
+        table = connection.getTable(tablePath);
+        if (table.getTableInfo().hasPrimaryKey()) {
+            log.info("Table {} has primary key, use upsert writer", tableName);
+            writer = table.newUpsert().createWriter();
+        } else {
+            log.info("Table {} has no primary key, use append writer", 
tableName);
+            writer = table.newAppend().createWriter();
+        }
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) {
+        RowKind rowKind = element.getRowKind();
+        GenericRow genericRow = new GenericRow(element.getFields().length);
+        for (int i = 0; i < element.getFields().length; i++) {
+            genericRow.setField(
+                    i,
+                    convert(
+                            seaTunnelRowType.getFieldType(i),
+                            seaTunnelRowType.getFieldName(i),
+                            element.getField(i)));
+        }
+
+        if (writer instanceof UpsertWriter) {
+            UpsertWriter upsertWriter = (UpsertWriter) writer;
+            switch (rowKind) {
+                case INSERT:
+                case UPDATE_AFTER:
+                    upsertWriter.upsert(genericRow);
+                    break;
+                case DELETE:
+                    upsertWriter.delete(genericRow);
+                    break;
+                case UPDATE_BEFORE:
+                    return;
+                default:
+                    throw CommonError.unsupportedRowKind(
+                            FlussSinkOptions.CONNECTOR_IDENTITY, tableName, 
rowKind.shortString());
+            }
+        } else if (writer instanceof AppendWriter) {
+            AppendWriter appendWriter = (AppendWriter) writer;
+            switch (rowKind) {
+                case INSERT:
+                case UPDATE_AFTER:
+                    appendWriter.append(genericRow);
+                    break;
+                case DELETE:
+                case UPDATE_BEFORE:
+                    return;
+                default:
+                    throw CommonError.unsupportedRowKind(
+                            FlussSinkOptions.CONNECTOR_IDENTITY, tableName, 
rowKind.shortString());
+            }
+        } else {
+            throw CommonError.unsupportedOperation(
+                    FlussSinkOptions.CONNECTOR_IDENTITY, 
writer.getClass().getName());
+        }
+    }
+
+    @Override
+    public Optional<Void> prepareCommit(long checkpointId) throws IOException {
+        writer.flush();
+        return super.prepareCommit(checkpointId);
+    }
+
+    @Override
+    public void close() {
+        log.info("Close Fluss table.");
+        try {
+            if (table != null) {
+                table.close();
+            }
+        } catch (Exception e) {
+            throw CommonError.closeFailed("Close Fluss table failed.", e);
+        }
+
+        log.info("Close Fluss connection.");
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            throw CommonError.closeFailed("Close Fluss connection failed.", e);
+        }
+    }
+
+    protected Object convert(SeaTunnelDataType dataType, String fieldName, 
Object val) {
+        if (val == null) {
+            return null;
+        }
+        switch (dataType.getSqlType()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BYTES:
+                return val;
+            case STRING:
+                return BinaryString.fromString((String) val);
+            case DECIMAL:
+                return Decimal.fromBigDecimal(
+                        (BigDecimal) val,
+                        ((DecimalType) dataType).getPrecision(),
+                        ((DecimalType) dataType).getScale());
+            case DATE:
+                return (int) ((LocalDate) val).toEpochDay();
+            case TIME:
+                return (int) (((LocalTime) val).toNanoOfDay() / 1_000_000);
+            case TIMESTAMP:
+                return TimestampNtz.fromLocalDateTime((LocalDateTime) val);
+            case TIMESTAMP_TZ:
+                if (val instanceof Instant) {
+                    return TimestampLtz.fromInstant((Instant) val);
+                } else if (val instanceof OffsetDateTime) {
+                    return TimestampLtz.fromInstant(((OffsetDateTime) 
val).toInstant());
+                }
+                throw CommonError.unsupportedDataType(
+                        FlussSinkOptions.CONNECTOR_IDENTITY,
+                        dataType.getSqlType().name(),
+                        fieldName);
+            default:
+                throw CommonError.unsupportedDataType(
+                        FlussSinkOptions.CONNECTOR_IDENTITY,
+                        dataType.getSqlType().name(),
+                        fieldName);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index c587da3740..37e0e668a6 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -89,6 +89,7 @@
         <module>connector-graphql</module>
         <module>connector-aerospike</module>
         <module>connector-sensorsdata</module>
+        <module>connector-fluss</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index be4ba48396..6093ac3fb3 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -657,6 +657,13 @@
                     <scope>provided</scope>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-fluss</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+
                 <!-- jdbc driver -->
                 <dependency>
                     <groupId>com.aliyun.phoenix</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml
new file mode 100644
index 0000000000..4105b14d24
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-fluss-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Fluss</name>
+
+    <dependencies>
+        <!-- SeaTunnel connectors -->
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fluss</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-cdc-mysql</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- test dependencies on TestContainers -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java
new file mode 100644
index 0000000000..04230c362e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/java/org/apache/seatunnel/e2e/connector/fluss/FlussSinkIT.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.fluss;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.client.table.scanner.log.LogScanner;
+import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.DatabaseDescriptor;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.CloseableIterator;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+public class FlussSinkIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "fluss/fluss:0.7.0";
+    private static final String DOCKER_ZK_IMAGE = "zookeeper:3.9.2";
+
+    private static final String FLUSS_Coordinator_HOST = 
"fluss_coordinator_e2e";
+    private static final String FLUSS_Tablet_HOST = "fluss_tablet_e2e";
+    private static final String ZK_HOST = "zk_e2e";
+    private static final int ZK_PORT = 2181;
+    private static final int FLUSS_Coordinator_PORT = 9123;
+    private static final int FLUSS_Tablet_PORT = 9124;
+    private static final int FLUSS_Coordinator_LOCAL_PORT = 8123;
+    private static final int FLUSS_Tablet_LOCAL_PORT = 8124;
+
+    private GenericContainer<?> zookeeperServer;
+    private GenericContainer<?> coordinatorServer;
+    private GenericContainer<?> tabletServer;
+
+    private Connection flussConnection;
+
+    private static final String DB_NAME = "fluss_db_test";
+    private static final String DB_NAME_2 = "fluss_db_test2";
+    private static final String DB_NAME_3 = "fluss_db_test3";
+    private static final String TABLE_NAME = "fluss_tb_table1";
+    private static final String TABLE_NAME_2 = "fluss_tb_table2";
+    private static final String TABLE_NAME_3 = "fluss_tb_table3";
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        createZookeeperContainer();
+        createFlussContainer();
+    }
+
+    private void createFlussContainer() {
+        log.info("Starting FlussServer container...");
+        String coordinatorEnv =
+                String.format(
+                        "zookeeper.address: %s:%d\n"
+                                + "bind.listeners: INTERNAL://%s:%d, 
LOCALCLIENT://%s:%d \n"
+                                + "advertised.listeners: INTERNAL://%s:%d, 
LOCALCLIENT://localhost:%d\n"
+                                + "internal.listener.name: INTERNAL",
+                        ZK_HOST,
+                        ZK_PORT,
+                        FLUSS_Coordinator_HOST,
+                        FLUSS_Coordinator_PORT,
+                        FLUSS_Coordinator_HOST,
+                        FLUSS_Coordinator_LOCAL_PORT,
+                        FLUSS_Coordinator_HOST,
+                        FLUSS_Coordinator_PORT,
+                        FLUSS_Coordinator_LOCAL_PORT);
+        coordinatorServer =
+                new GenericContainer<>(DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(FLUSS_Coordinator_HOST)
+                        .withEnv("FLUSS_PROPERTIES", coordinatorEnv)
+                        .withCommand("coordinatorServer")
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger("coordinatorServer")));
+        coordinatorServer.setPortBindings(
+                Lists.newArrayList(
+                        String.format(
+                                "%s:%s",
+                                FLUSS_Coordinator_LOCAL_PORT, 
FLUSS_Coordinator_LOCAL_PORT)));
+        Startables.deepStart(Stream.of(coordinatorServer)).join();
+        given().ignoreExceptions()
+                .await()
+                .atMost(120, TimeUnit.SECONDS)
+                .pollInterval(5, TimeUnit.SECONDS)
+                .until(
+                        () ->
+                                checkPort(
+                                        coordinatorServer.getHost(),
+                                        FLUSS_Coordinator_LOCAL_PORT,
+                                        1000));
+        log.info("coordinatorServer container start success");
+
+        String tabletEnv =
+                String.format(
+                        "zookeeper.address: %s:%d\n"
+                                + "bind.listeners: INTERNAL://%s:%d, 
LOCALCLIENT://%s:%d\n"
+                                + "advertised.listeners: INTERNAL://%s:%d, 
LOCALCLIENT://localhost:%d\n"
+                                + "internal.listener.name: INTERNAL\n"
+                                + "tablet-server.id: 0\n"
+                                + "kv.snapshot.interval: 0s\n"
+                                + "data.dir: /tmp/fluss/data\n"
+                                + "remote.data.dir: /tmp/fluss/remote-data",
+                        ZK_HOST,
+                        ZK_PORT,
+                        FLUSS_Tablet_HOST,
+                        FLUSS_Tablet_PORT,
+                        FLUSS_Tablet_HOST,
+                        FLUSS_Tablet_LOCAL_PORT,
+                        FLUSS_Tablet_HOST,
+                        FLUSS_Tablet_PORT,
+                        FLUSS_Tablet_LOCAL_PORT);
+        tabletServer =
+                new GenericContainer<>(DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(FLUSS_Tablet_HOST)
+                        .withEnv("FLUSS_PROPERTIES", tabletEnv)
+                        .withCommand("tabletServer")
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger("tabletServer")));
+        tabletServer.setPortBindings(
+                Lists.newArrayList(
+                        String.format("%s:%s", FLUSS_Tablet_LOCAL_PORT, 
FLUSS_Tablet_LOCAL_PORT)));
+        Startables.deepStart(Stream.of(tabletServer)).join();
+        given().ignoreExceptions()
+                .await()
+                .atMost(120, TimeUnit.SECONDS)
+                .pollInterval(5, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeConnection);
+        log.info("tabletServer container start success");
+        log.info("FlussServer Containers are started");
+    }
+
+    private void createZookeeperContainer() {
+        log.info("Starting ZookeeperServer container...");
+        zookeeperServer =
+                new GenericContainer<>(DOCKER_ZK_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(ZK_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(DOCKER_ZK_IMAGE)));
+        zookeeperServer.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", ZK_PORT, ZK_PORT)));
+        Startables.deepStart(Stream.of(zookeeperServer)).join();
+        given().ignoreExceptions()
+                .await()
+                .atMost(60, TimeUnit.SECONDS)
+                .pollInterval(5, TimeUnit.SECONDS)
+                .until(() -> checkPort(zookeeperServer.getHost(), ZK_PORT, 
1000));
+        log.info("ZookeeperServer Containers are started");
+    }
+
+    private void initializeConnection() throws ExecutionException, 
InterruptedException {
+        Configuration flussConfig = new Configuration();
+        flussConfig.setString(
+                "bootstrap.servers",
+                coordinatorServer.getHost() + ":" + 
FLUSS_Coordinator_LOCAL_PORT);
+        flussConnection = ConnectionFactory.createConnection(flussConfig);
+        createDb(flussConnection, DB_NAME);
+    }
+
+    public void createDb(Connection connection, String dbName)
+            throws ExecutionException, InterruptedException {
+        Admin admin = connection.getAdmin();
+        DatabaseDescriptor descriptor = DatabaseDescriptor.builder().build();
+        admin.dropDatabase(dbName, true, true).get();
+        admin.createDatabase(dbName, descriptor, true).get();
+    }
+
+    public Schema getFlussSchema() {
+        return Schema.newBuilder()
+                .column("fbytes", DataTypes.BYTES())
+                .column("fboolean", DataTypes.BOOLEAN())
+                .column("fint", DataTypes.INT())
+                .column("ftinyint", DataTypes.TINYINT())
+                .column("fsmallint", DataTypes.SMALLINT())
+                .column("fbigint", DataTypes.BIGINT())
+                .column("ffloat", DataTypes.FLOAT())
+                .column("fdouble", DataTypes.DOUBLE())
+                .column("fdecimal", DataTypes.DECIMAL(30, 8))
+                .column("fstring", DataTypes.STRING())
+                .column("fdate", DataTypes.DATE())
+                .column("ftime", DataTypes.TIME())
+                .column("ftimestamp", DataTypes.TIMESTAMP())
+                .column("ftimestamp_ltz", DataTypes.TIMESTAMP_LTZ())
+                .primaryKey("fstring")
+                .build();
+    }
+
+    public void createTable(Connection connection, String dbName, String 
tableName, Schema schema)
+            throws ExecutionException, InterruptedException {
+        Admin admin = connection.getAdmin();
+        TableDescriptor tableDescriptor = 
TableDescriptor.builder().schema(schema).build();
+        TablePath tablePath = TablePath.of(dbName, tableName);
+        admin.dropTable(tablePath, true).get();
+        admin.createTable(tablePath, tableDescriptor, true).get(); // blocking 
call
+    }
+
+    public static boolean checkPort(String host, int port, int timeoutMs) 
throws IOException {
+        try (Socket socket = new Socket()) {
+            socket.connect(new java.net.InetSocketAddress(host, port), 
timeoutMs);
+            return true;
+        } catch (Exception e) {
+            throw e;
+        }
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (tabletServer != null) {
+            tabletServer.close();
+        }
+        if (coordinatorServer != null) {
+            coordinatorServer.close();
+        }
+        if (zookeeperServer != null) {
+            zookeeperServer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testFlussSink(TestContainer container) throws Exception {
+        log.info(" create fluss table");
+        createDb(flussConnection, DB_NAME);
+        createTable(flussConnection, DB_NAME, TABLE_NAME, getFlussSchema());
+        Container.ExecResult execFake2fluss = 
container.executeJob("/fake_to_fluss.conf");
+        Assertions.assertEquals(0, execFake2fluss.getExitCode(), 
execFake2fluss.getStderr());
+        checkFlussData(DB_NAME, TABLE_NAME);
+    }
+
+    @TestTemplate
+    public void testFlussMultiTableSink(TestContainer container) throws 
Exception {
+        log.info(" create fluss tables");
+        createDb(flussConnection, DB_NAME_2);
+        createDb(flussConnection, DB_NAME_3);
+        createTable(flussConnection, DB_NAME_2, TABLE_NAME, getFlussSchema());
+        createTable(flussConnection, DB_NAME_2, TABLE_NAME_2, 
getFlussSchema());
+        createTable(flussConnection, DB_NAME_3, TABLE_NAME_3, 
getFlussSchema());
+
+        Container.ExecResult execFake2fluss =
+                container.executeJob("/fake_to_multipletable_fluss.conf");
+        Assertions.assertEquals(0, execFake2fluss.getExitCode(), 
execFake2fluss.getStderr());
+        checkFlussData(DB_NAME_2, TABLE_NAME);
+        checkFlussData(DB_NAME_2, TABLE_NAME_2);
+        checkFlussData(DB_NAME_3, TABLE_NAME_3);
+    }
+
+    public void checkFlussData(String dbName, String tableName) throws 
IOException {
+        // check log data
+        List<GenericRow> streamData =
+                getFlussTableStreamData(flussConnection, dbName, tableName, 
10);
+        checkFlussTableStreamData(streamData);
+        // check data
+        List<GenericRow> data = getFlussTableData(flussConnection, dbName, 
tableName, 10);
+        checkFlussTableData(data);
+    }
+
+    public void checkFlussTableData(List<GenericRow> streamData) {
+        Assertions.assertEquals(3, streamData.size());
+        List<String> expectedResult =
+                Arrays.asList(
+                        "([109, 105, 73, 90, 
106],true,1940337748,73,17489,7408919466156976747,9.434991E37,3.140411637757371E307,4029933791018936000000.00000000,aaaaa,20091,9010000,2025-05-27T21:56:09,2025-09-27T18:54:08Z)",
+                        "([109, 105, 73, 90, 
106],true,90650390,37,22504,5851888708829345169,2.6221706E36,1.8915341983748786E307,3093109630614623000000.00000000,bbbbb,20089,76964000,2025-05-08T05:26:18,2025-08-04T08:49:45Z)",
+                        "([109, 105, 73, 90, 
106],true,388742243,89,15831,159071788675312856,7.310445E37,1.2166972324288247E308,7994947075691901000000.00000000,ddddd,20092,55687000,2025-07-18T08:59:49,2025-09-12T15:46:25Z)");
+        ArrayList<String> result = new ArrayList<>();
+        for (GenericRow streamDatum : streamData) {
+            result.add(streamDatum.toString());
+        }
+        Assertions.assertEquals(expectedResult, result);
+    }
+
+    public void checkFlussTableStreamData(List<GenericRow> streamData) {
+        Assertions.assertEquals(7, streamData.size());
+        List<String> expectedResult =
+                Arrays.asList(
+                        "([109, 105, 73, 90, 
106],true,1940337748,73,17489,7408919466156976747,9.434991E37,3.140411637757371E307,4029933791018936000000.00000000,aaaaa,20091,9010000,2025-05-27T21:56:09,2025-09-27T18:54:08Z)",
+                        "([109, 105, 73, 90, 
106],true,90650390,37,22504,5851888708829345169,2.6221706E36,1.8915341983748786E307,3093109630614623000000.00000000,bbbbb,20089,76964000,2025-05-08T05:26:18,2025-08-04T08:49:45Z)",
+                        "([109, 105, 73, 90, 
106],true,2146418323,79,19821,6393905306944584839,2.0462337E38,1.4868114385836557E308,5594947262031770000000.00000000,ccccc,20367,79840000,2025-03-25T01:49:14,2025-07-03T03:52:06Z)",
+                        "([109, 105, 73, 90, 
106],true,2146418323,79,19821,6393905306944584839,2.0462337E38,1.4868114385836557E308,5594947262031770000000.00000000,ccccc,20367,79840000,2025-03-25T01:49:14,2025-07-03T03:52:06Z)",
+                        "([109, 105, 73, 90, 
106],true,82794384,27,30339,5826566947079347516,2.2137477E37,1.7737681870839753E308,3984670873242882300000.00000000,ddddd,20344,37972000,2025-01-27T19:20:51,2025-11-06T18:38:54Z)",
+                        "([109, 105, 73, 90, 
106],true,82794384,27,30339,5826566947079347516,2.2137477E37,1.7737681870839753E308,3984670873242882300000.00000000,ddddd,20344,37972000,2025-01-27T19:20:51,2025-11-06T18:38:54Z)",
+                        "([109, 105, 73, 90, 
106],true,388742243,89,15831,159071788675312856,7.310445E37,1.2166972324288247E308,7994947075691901000000.00000000,ddddd,20092,55687000,2025-07-18T08:59:49,2025-09-12T15:46:25Z)");
+        ArrayList<String> result = new ArrayList<>();
+        for (GenericRow streamDatum : streamData) {
+            result.add(streamDatum.toString());
+        }
+        Assertions.assertEquals(expectedResult, result);
+    }
+
+    public List<GenericRow> getFlussTableStreamData(
+            Connection connection, String dbName, String tableName, int 
scanNum) {
+        TablePath tablePath = TablePath.of(dbName, tableName);
+        Table table = connection.getTable(tablePath);
+        LogScanner logScanner = table.newScan().createLogScanner();
+        int numBuckets = table.getTableInfo().getNumBuckets();
+        for (int i = 0; i < numBuckets; i++) {
+            logScanner.subscribeFromBeginning(i);
+        }
+        int scanned = 0;
+        List<GenericRow> rows = new ArrayList<>();
+
+        while (true) {
+            if (scanned > scanNum) break;
+            log.info("Polling for stream records...");
+            ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+            for (TableBucket bucket : scanRecords.buckets()) {
+                for (ScanRecord record : scanRecords.records(bucket)) {
+                    GenericRow row = (GenericRow) record.getRow();
+                    rows.add(row);
+                }
+            }
+            scanned++;
+        }
+        return rows;
+    }
+
+    public List<GenericRow> getFlussTableData(
+            Connection connection, String dbName, String tableName, int 
scanNum)
+            throws IOException {
+        TablePath tablePath = TablePath.of(dbName, tableName);
+        Table table = connection.getTable(tablePath);
+        LogScanner logScanner = table.newScan().createLogScanner();
+        int numBuckets = table.getTableInfo().getNumBuckets();
+        for (int i = 0; i < numBuckets; i++) {
+            logScanner.subscribeFromBeginning(i);
+        }
+        int scanned = 0;
+        List<GenericRow> rows = new ArrayList<>();
+
+        while (true) {
+            if (scanned > scanNum) break;
+            log.info("Polling for records...");
+            ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
+            for (TableBucket bucket : scanRecords.buckets()) {
+                CloseableIterator<InternalRow> data =
+                        table.newScan()
+                                .limit(10)
+                                .createBatchScanner(bucket)
+                                .pollBatch(Duration.ofSeconds(5));
+                while (data.hasNext()) {
+                    rows.add((GenericRow) data.next());
+                }
+            }
+            scanned++;
+        }
+        return rows;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf
new file mode 100644
index 0000000000..27ea0435bb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_fluss.conf
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    tables_configs = [
+        {
+        row.num = 7
+          schema {
+            table = "test.table1"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    }
+      ]
+}
+}
+
+transform {
+}
+
+sink {
+  Fluss {
+    bootstrap.servers="fluss_coordinator_e2e:9123"
+    database = "fluss_db_${database_name}"
+    table = "fluss_tb_${table_name}"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf
new file mode 100644
index 0000000000..b1b1307652
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-fluss-e2e/src/test/resources/fake_to_multipletable_fluss.conf
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    tables_configs = [
+        {
+        row.num = 7
+          schema {
+            table = "test2.table1"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    },
+    {
+        row.num = 7
+          schema {
+            table = "test2.table2"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    },
+    {
+        row.num = 7
+          schema {
+            table = "test3.table3"
+            fields {
+               fbytes = bytes
+                   fboolean = boolean
+                   fint = int
+                   ftinyint = tinyint
+                   fsmallint = smallint
+                   fbigint = bigint
+                   ffloat = float
+                   fdouble = double
+                   fdecimal = "decimal(30, 8)"
+                   fstring = string
+                   fdate = date
+                   ftime = time
+                   ftimestamp = timestamp
+                   ftimestamp_ltz = timestamp_tz
+                   }
+           }
+           rows = [
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 1940337748, 73, 17489, 
7408919466156976747, 9.434991E37, 3.140411637757371E307, 
4029933791018936061944.80602290, "aaaaa", "2025-01-03", "02:30:10", 
"2025-05-27T21:56:09", "2025-09-28T02:54:08+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 90650390, 37, 22504, 5851888708829345169, 
2.6221706E36, 1.8915341983748786E307, 3093109630614622831876.71725344, "bbbbb", 
"2025-01-01", "21:22:44", "2025-05-08T05:26:18", "2025-08-04T16:49:45+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = DELETE
+        fields = ["bWlJWmo=", true, 2146418323, 79, 19821, 
6393905306944584839, 2.0462337E38, 1.4868114385836557E308, 
5594947262031769994080.35717665, "ccccc", "2025-10-06", "22:10:40", 
"2025-03-25T01:49:14", "2025-07-03T11:52:06+08:00"]
+      }
+      {
+        kind = INSERT
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = ["bWlJWmo=", true, 82794384, 27, 30339, 5826566947079347516, 
2.2137477E37, 1.7737681870839753E308, 3984670873242882274814.90739768, "ddddd", 
"2025-09-13", "10:32:52", "2025-01-27T19:20:51", "2025-11-07T02:38:54+08:00"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = ["bWlJWmo=", true, 388742243, 89, 15831, 159071788675312856, 
7.310445E37, 1.2166972324288247E308, 7994947075691901110245.55960937, "ddddd", 
"2025-01-04", "15:28:07", "2025-07-18T08:59:49", "2025-09-12T23:46:25+08:00"]
+      }
+    ]
+    }
+      ]
+}
+}
+
+transform {
+}
+
+sink {
+  Fluss {
+    bootstrap.servers="fluss_coordinator_e2e:9123"
+    database = "fluss_db_${database_name}"
+    table = "fluss_tb_${table_name}"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 8d885e75a5..a60fe67ef2 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -90,6 +90,7 @@
         <module>connector-aerospike-e2e</module>
         <module>connector-sensorsdata-e2e</module>
         <module>connector-hugegraph-e2e</module>
+        <module>connector-fluss-e2e</module>
     </modules>
 
     <dependencies>
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
index 949392d172..a452db6436 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java
@@ -41,6 +41,7 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.temporal.ChronoField;
@@ -155,6 +156,13 @@ public class JsonToRowConverters implements Serializable {
                         return convertToLocalDateTime(jsonNode, fieldName);
                     }
                 };
+            case TIMESTAMP_TZ:
+                return new JsonToObjectConverter() {
+                    @Override
+                    public Object convert(JsonNode jsonNode, String fieldName) 
{
+                        return convertToOffsetDateTime(jsonNode, fieldName);
+                    }
+                };
             case FLOAT:
                 return new JsonToObjectConverter() {
                     @Override
@@ -284,6 +292,11 @@ public class JsonToRowConverters implements Serializable {
         return LocalDateTime.of(localDate, localTime);
     }
 
+    private OffsetDateTime convertToOffsetDateTime(JsonNode jsonNode, String 
fieldName) {
+        String datetimeStr = jsonNode.asText();
+        return OffsetDateTime.parse(datetimeStr);
+    }
+
     private String convertToString(JsonNode jsonNode) {
         if (jsonNode.isContainerNode()) {
             return jsonNode.toString();
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
index 2cf8ae092e..13a30442d1 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/RowToJsonConverters.java
@@ -37,6 +37,7 @@ import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.function.Function;
@@ -44,6 +45,7 @@ import java.util.function.IntFunction;
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
 
 public class RowToJsonConverters implements Serializable {
 
@@ -183,6 +185,14 @@ public class RowToJsonConverters implements Serializable {
                                 
.textNode(ISO_LOCAL_DATE_TIME.format((LocalDateTime) value));
                     }
                 };
+            case TIMESTAMP_TZ:
+                return new RowToJsonConverter() {
+                    @Override
+                    public JsonNode convert(ObjectMapper mapper, JsonNode 
reuse, Object value) {
+                        return mapper.getNodeFactory()
+                                
.textNode(ISO_OFFSET_DATE_TIME.format((OffsetDateTime) value));
+                    }
+                };
             case ARRAY:
                 return createArrayConverter((ArrayType) type);
             case MAP:
diff --git 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
index 8471756b8b..13d8b5c820 100644
--- 
a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java
@@ -47,6 +47,7 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.TemporalQueries;
 import java.util.HashMap;
@@ -76,6 +77,7 @@ public class JsonRowDataSerDeSchemaTest {
         String name = "asdlkjasjkdla998y1122";
         LocalDate date = LocalDate.parse("1990-10-14");
         LocalTime time = LocalTime.parse("12:12:43");
+        OffsetDateTime offsetDateTime = 
OffsetDateTime.parse("2025-09-12T23:46:25+08:00");
         Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
         Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 
12:12:43.123456789");
         Map<String, Long> map = new HashMap<>();
@@ -100,6 +102,7 @@ public class JsonRowDataSerDeSchemaTest {
         root.put("name", name);
         root.put("date", "1990-10-14");
         root.put("time", "12:12:43");
+        root.put("timestamp_tz", "2025-09-12T23:46:25+08:00");
         root.put("timestamp3", "1990-10-14T12:12:43.123");
         root.put("timestamp9", "1990-10-14T12:12:43.123456789");
         root.putObject("map").put("element", 123);
@@ -121,6 +124,7 @@ public class JsonRowDataSerDeSchemaTest {
                             "name",
                             "date",
                             "time",
+                            "timestamp_tz",
                             "timestamp3",
                             "timestamp9",
                             "map",
@@ -136,6 +140,7 @@ public class JsonRowDataSerDeSchemaTest {
                             STRING_TYPE,
                             LocalTimeType.LOCAL_DATE_TYPE,
                             LocalTimeType.LOCAL_TIME_TYPE,
+                            LocalTimeType.OFFSET_DATE_TIME_TYPE,
                             LocalTimeType.LOCAL_DATE_TIME_TYPE,
                             LocalTimeType.LOCAL_DATE_TIME_TYPE,
                             new MapType(STRING_TYPE, LONG_TYPE),
@@ -150,6 +155,7 @@ public class JsonRowDataSerDeSchemaTest {
                                         "name",
                                         "date",
                                         "time",
+                                        "timestamp_tz",
                                         "timestamp3",
                                         "timestamp9",
                                         "map",
@@ -164,6 +170,7 @@ public class JsonRowDataSerDeSchemaTest {
                                         STRING_TYPE,
                                         LocalTimeType.LOCAL_DATE_TIME_TYPE,
                                         LocalTimeType.LOCAL_TIME_TYPE,
+                                        LocalTimeType.OFFSET_DATE_TIME_TYPE,
                                         LocalTimeType.LOCAL_DATE_TIME_TYPE,
                                         LocalTimeType.LOCAL_DATE_TIME_TYPE,
                                         new MapType(STRING_TYPE, LONG_TYPE),
@@ -175,7 +182,7 @@ public class JsonRowDataSerDeSchemaTest {
         JsonDeserializationSchema deserializationSchema =
                 new JsonDeserializationSchema(catalogTables, false, false);
 
-        SeaTunnelRow expected = new SeaTunnelRow(13);
+        SeaTunnelRow expected = new SeaTunnelRow(14);
         expected.setField(0, true);
         expected.setField(1, intValue);
         expected.setField(2, longValue);
@@ -183,13 +190,14 @@ public class JsonRowDataSerDeSchemaTest {
         expected.setField(4, name);
         expected.setField(5, date);
         expected.setField(6, time);
-        expected.setField(7, timestamp3.toLocalDateTime());
-        expected.setField(8, timestamp9.toLocalDateTime());
-        expected.setField(9, map);
-        expected.setField(10, multiSet);
-        expected.setField(11, nestedMap);
-
-        SeaTunnelRow rowFieldRow = new SeaTunnelRow(12);
+        expected.setField(7, offsetDateTime);
+        expected.setField(8, timestamp3.toLocalDateTime());
+        expected.setField(9, timestamp9.toLocalDateTime());
+        expected.setField(10, map);
+        expected.setField(11, multiSet);
+        expected.setField(12, nestedMap);
+
+        SeaTunnelRow rowFieldRow = new SeaTunnelRow(13);
         rowFieldRow.setField(0, true);
         rowFieldRow.setField(1, intValue);
         rowFieldRow.setField(2, longValue);
@@ -197,13 +205,14 @@ public class JsonRowDataSerDeSchemaTest {
         rowFieldRow.setField(4, name);
         rowFieldRow.setField(5, timestamp3.toLocalDateTime());
         rowFieldRow.setField(6, time);
-        rowFieldRow.setField(7, timestamp3.toLocalDateTime());
-        rowFieldRow.setField(8, timestamp9.toLocalDateTime());
-        rowFieldRow.setField(9, map);
-        rowFieldRow.setField(10, multiSet);
-        rowFieldRow.setField(11, nestedMap);
+        rowFieldRow.setField(7, offsetDateTime);
+        rowFieldRow.setField(8, timestamp3.toLocalDateTime());
+        rowFieldRow.setField(9, timestamp9.toLocalDateTime());
+        rowFieldRow.setField(10, map);
+        rowFieldRow.setField(11, multiSet);
+        rowFieldRow.setField(12, nestedMap);
 
-        expected.setField(12, rowFieldRow);
+        expected.setField(13, rowFieldRow);
 
         SeaTunnelRow seaTunnelRow = 
deserializationSchema.deserialize(serializedJson);
         assertEquals(expected, seaTunnelRow);
@@ -678,6 +687,16 @@ public class JsonRowDataSerDeSchemaTest {
         assertEquals(
                 "{\"timestamp\":\"2022-09-24T22:45:00.000123456\"}",
                 new String(new JsonSerializationSchema(schema, 
"\\N").serialize(row)));
+
+        schema =
+                new SeaTunnelRowType(
+                        new String[] {"timestamp_tz"},
+                        new SeaTunnelDataType[] 
{LocalTimeType.OFFSET_DATE_TIME_TYPE});
+        OffsetDateTime offsetDateTime = 
OffsetDateTime.parse("2025-09-12T23:46:25+08:00");
+        row = new SeaTunnelRow(new Object[] {offsetDateTime});
+        assertEquals(
+                "{\"timestamp_tz\":\"2025-09-12T23:46:25+08:00\"}",
+                new String(new JsonSerializationSchema(schema, 
"\\N").serialize(row)));
     }
 
     @Test

Reply via email to