This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 720ed0c756 [Feature][Connector-redis] fix redis cluster bug and add 
cluster e2e (#9869)
720ed0c756 is described below

commit 720ed0c756a483b096c43ecdd752499f09927ff5
Author: JeremyXin <[email protected]>
AuthorDate: Sat Oct 18 19:46:33 2025 +0800

    [Feature][Connector-redis] fix redis cluster bug and add cluster e2e (#9869)
    
    Co-authored-by: JeremyXin <[email protected]>
---
 docs/en/connector-v2/sink/Redis.md                 |  14 +-
 docs/en/connector-v2/source/Redis.md               |  34 +-
 docs/zh/connector-v2/sink/Redis.md                 |  49 ++-
 docs/zh/connector-v2/source/Redis.md               |  34 +-
 seatunnel-connectors-v2/connector-redis/pom.xml    |   6 +
 .../seatunnel/redis/client/RedisClient.java        |  15 +-
 .../seatunnel/redis/client/RedisClusterClient.java |  86 +++-
 .../seatunnel/redis/client/RedisSingleClient.java  |  16 +
 .../seatunnel/redis/config/JedisWrapper.java       |  79 ++++
 .../seatunnel/redis/config/RedisBaseOptions.java   |   9 +-
 .../seatunnel/redis/config/RedisParameters.java    |  17 +-
 .../seatunnel/redis/exception/RedisErrorCode.java  |   5 +-
 .../seatunnel/redis/sink/RedisSinkWriter.java      | 115 +++--
 .../seatunnel/redis/source/RedisSource.java        |  52 ++-
 .../seatunnel/redis/sink/RedisSinkWriterTest.java  | 114 +++++
 .../e2e/connector/redis/RedisClusterIT.java        | 473 +++++++++++++++++++++
 .../resources/cluster-redis-to-redis-scan.conf     |  43 ++
 .../cluster-redis-to-redis-type-hash.conf          | 111 +++++
 .../resources/cluster-redis-to-redis-type-key.conf | 111 +++++
 .../cluster-redis-to-redis-type-list.conf          | 110 +++++
 .../resources/cluster-redis-to-redis-type-set.conf | 110 +++++
 .../cluster-redis-to-redis-type-zset.conf          | 110 +++++
 .../container/seatunnel/SeaTunnelContainer.java    |   4 +-
 23 files changed, 1623 insertions(+), 94 deletions(-)

diff --git a/docs/en/connector-v2/sink/Redis.md 
b/docs/en/connector-v2/sink/Redis.md
index 1e6a2861a7..f05eaca3ec 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -32,6 +32,7 @@ Used to write data to Redis.
 | value_field        | string  | no                    | -             |
 | hash_key_field     | string  | no                    | -             |
 | hash_value_field   | string  | no                    | -             |
+| field_delimiter    | string  | no                    | ','           |
 | common-options     |         | no                    | -             |
 
 ### host [string]
@@ -119,7 +120,7 @@ redis nodes information, used in cluster mode, must like as 
the following format
 
 ### format [string]
 
-The format of upstream data, now only support `json`, `text` will be supported 
later, default `json`.
+The format of upstream data, currently support `json`, `text` format, default 
`json`.
 
 When you assign format is `json`, for example:
 
@@ -134,9 +135,18 @@ Connector will generate data as the following and write it 
to redis:
 ```json
 
 {"code":  200, "data":  "get success", "success":  "true"}
+```
 
+when you assign format is `text`, and set field_delimiter to `#`, connector 
will generate data as the following and write it to redis:
+```text
+200#get success#true
 ```
 
+### field_delimiter [string]
+Field delimiter, used to tell connector how to slice and dice fields.
+
+Currently, only need to be configured when format is `text`. default is ",".
+
 ### expire [long]
 
 Set redis expiration time, the unit is second. The default value is -1, keys 
do not automatically expire by default.
@@ -219,7 +229,7 @@ custom key:
 Redis {
   host = localhost
   port = 6379
-  key = "name:{name}"
+  key = "name:${name}"
   support_custom_key = true
   data_type = key
 }
diff --git a/docs/en/connector-v2/source/Redis.md 
b/docs/en/connector-v2/source/Redis.md
index 99b3678c8d..f1daa06c9a 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -37,6 +37,7 @@ Used to read data from Redis.
 | schema              | config | yes when format=json           | -            
 |
 | format              | string | no                             | json         
 |
 | single_field_name   | string | yes when read_key_enabled=true | -            
 |
+| field_delimiter     | string | no                             | ','          
 |
 | common-options      |        | no                             | -            
 |
 
 ### host [string]
@@ -252,21 +253,44 @@ connector will generate data as the following:
 | ---- | ----------- | ------- |
 | 200  | get success | true    |
 
-when you assign format is `text`, connector will do nothing for upstream data, 
for example:
+when you assign format is `text`, you can choose to specify the schema 
information or not. 
 
-upstream data is the following:
-
-```json
-{"code":  200, "data":  "get success", "success":  true}
+For example, upstream data is the following:
 
+```text
+200#get success#true
 ```
 
+If you do not assign data schema connector will treat the upstream data as the 
following:
+
+| content                                                  |
+| -------------------------------------------------------- |
+| 200#get success#true |
+
+If you assign data schema, you should also assign the option `schema` and 
`field_delimiter` as following:
+
+```hocon
+field_delimiter = "#"
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
 connector will generate data as the following:
 
 | content                                                  |
 | -------------------------------------------------------- |
 | {"code":  200, "data":  "get success", "success":  true} |
 
+### field_delimiter [string]
+Field delimiter, used to tell connector how to slice and dice fields.
+
+Currently, only need to be configured when format is text. default is ",".
+
 ### schema [config]
 
 #### fields [config]
diff --git a/docs/zh/connector-v2/sink/Redis.md 
b/docs/zh/connector-v2/sink/Redis.md
index 8391c5c0be..72e36aa003 100644
--- a/docs/zh/connector-v2/sink/Redis.md
+++ b/docs/zh/connector-v2/sink/Redis.md
@@ -17,22 +17,23 @@ import ChangeLog from '../changelog/connector-redis.md';
 | name               | type    |       required        | default value |
 |--------------------|---------|-----------------------|---------------|
 | host               | string  | `mode=single`时必须      | -             |
-| port               | int     | no                    | 6379          |
-| key                | string  | yes                   | -             |
-| data_type          | string  | yes                   | -             |
-| batch_size         | int     | no                    | 10            |
-| user               | string  | no                    | -             |
-| auth               | string  | no                    | -             |
-| db_num             | int     | no                    | 0             |
-| mode               | string  | no                    | single        |
-| nodes              | list    | yes when mode=cluster | -             |
-| format             | string  | no                    | json          |
-| expire             | long    | no                    | -1            |
-| support_custom_key | boolean | no                    | false         |
-| value_field        | string  | no                    | -             |
-| hash_key_field     | string  | no                    | -             |
-| hash_value_field   | string  | no                    | -             |
-| common-options     |         | no                    | -             |
+| port               | int     | 否                 | 6379          |
+| key                | string  | 是                 | -             |
+| data_type          | string  | 是                 | -             |
+| batch_size         | int     | 否                 | 10            |
+| user               | string  | 否                 | -             |
+| auth               | string  | 否                 | -             |
+| db_num             | int     | 否                 | 0             |
+| mode               | string  | 否                 | single        |
+| nodes              | list    | `mode=cluster`时必须 | -             |
+| format             | string  | 否                 | json          |
+| expire             | long    | 否                 | -1            |
+| support_custom_key | boolean | 否                 | false         |
+| value_field        | string  | 否                 | -             |
+| hash_key_field     | string  | 否                 | -             |
+| hash_value_field   | string  | 否                 | -             |
+| field_delimiter    | string  | 否                 | ","           |
+| common-options     |         | 否                 | -             |
 
 ### host [string]
 
@@ -114,7 +115,7 @@ Redis 节点信息,在集群模式下使用,必须按如下格式:
 
 ### format [string]
 
-上游数据的格式,目前只支持 `json`,以后会支持 `text`,默认 `json`。
+上游数据的格式,目前只支持 `json`,`text`,默认 `json`。
 
 当你指定格式为 `json` 时,例如:
 
@@ -130,6 +131,18 @@ Redis 节点信息,在集群模式下使用,必须按如下格式:
 {"code":  200, "data":  "获取成功", "success":  "true"}
 ```
 
+当你指定format为`text`,并设置field_delimiter为`#`时,连接器将生成如下数据并将其写入redis:
+
+```text
+200#get success#true
+```
+
+### field_delimiter [string]
+字段分隔符,用于告诉连接器如何分割字段。
+
+目前仅当格式为text时需要配置。默认为","。
+
+
 ### expire [long]
 
 设置 Redis 的过期时间,单位为秒。默认值为 -1,表示键不会自动过期。
@@ -210,7 +223,7 @@ Redis {
 Redis {
   host = localhost
   port = 6379
-  key = "name:{name}"
+  key = "name:${name}"
   support_custom_key = true
   data_type = key
 }
diff --git a/docs/zh/connector-v2/source/Redis.md 
b/docs/zh/connector-v2/source/Redis.md
index b2edca5ab2..6f9ce5f947 100644
--- a/docs/zh/connector-v2/source/Redis.md
+++ b/docs/zh/connector-v2/source/Redis.md
@@ -34,6 +34,7 @@ import ChangeLog from '../changelog/connector-redis.md';
 | nodes               | list   | `mode=cluster` 时必须 | -      |
 | schema              | config | `format=json` 时必须  | -      |
 | format              | string | 否                  | json   |
+| field_delimiter     | string | 否                  | ','    |
 | common-options      |        | 否                  | -      |
 
 ### host [string]
@@ -203,21 +204,44 @@ schema {
 | ---- | ----------- | ------- |
 | 200  | get success | true    |
 
-当指定格式为 `text` 时,连接器不会对上游数据做任何处理,例如:
+当指定格式为 `text` 时,可以选择是否指定schema参数。
 
-当上游数据如下时:
+例如, 当上游数据如下时:
 
-```json
-{"code":  200, "data":  "get success", "success":  true}
+```text
+200#get success#true
+```
+
+如果不指定schema参数,连接器将按照以下方式处理上游数据:
+
+| content                                                  |
+| -------------------------------------------------------- |
+| 200#get success#true |
+
+如果指定schema参数,此时需要同时配置`schema`和`field_delimiter`,如下所示:
+```hocon
+field_delimiter = "#"
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
 
 ```
 
-连接器将会生成如下格式数据:
+连接器将生成如下数据:
 
 | content                                                  |
 | -------------------------------------------------------- |
 | {"code":  200, "data":  "get success", "success":  true} |
 
+### field_delimiter [string]
+字段分隔符,用于告诉连接器如何分割字段。
+
+目前仅当格式为text时需要配置。默认为","。
+
 ### schema [config]
 
 #### fields [config]
diff --git a/seatunnel-connectors-v2/connector-redis/pom.xml 
b/seatunnel-connectors-v2/connector-redis/pom.xml
index 56e08122d9..755d1fb509 100644
--- a/seatunnel-connectors-v2/connector-redis/pom.xml
+++ b/seatunnel-connectors-v2/connector-redis/pom.xml
@@ -47,6 +47,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>redis.clients</groupId>
             <artifactId>jedis</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
index af7894795d..39362d27cc 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public abstract class RedisClient extends Jedis {
+public abstract class RedisClient {
 
     protected final RedisParameters redisParameters;
 
@@ -62,14 +62,14 @@ public abstract class RedisClient extends Jedis {
         if (redisVersion <= REDIS_5) {
             return scanOnRedis5(cursor, scanParams, type);
         } else {
-            return jedis.scan(cursor, scanParams, type.name());
+            return scanKeyResult(cursor, scanParams, type);
         }
     }
 
     // When the version is earlier than redis5, scan command does not support 
type
     private ScanResult<String> scanOnRedis5(
             String cursor, ScanParams scanParams, RedisDataType type) {
-        ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+        ScanResult<String> scanResult = scanKeyResult(cursor, scanParams, 
null);
         String resultCursor = scanResult.getCursor();
         List<String> keys = scanResult.getResult();
         List<String> typeKeys = new ArrayList<>(keys.size());
@@ -82,6 +82,15 @@ public abstract class RedisClient extends Jedis {
         return new ScanResult<>(resultCursor, typeKeys);
     }
 
+    public void close() {
+        if (jedis != null) {
+            jedis.close();
+        }
+    }
+
+    public abstract ScanResult<String> scanKeyResult(
+            String cursor, ScanParams scanParams, RedisDataType type);
+
     public abstract List<String> batchGetString(List<String> keys);
 
     public abstract List<List<String>> batchGetList(List<String> keys);
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
index 6b12e0d40c..40dac98dae 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -18,12 +18,16 @@
 package org.apache.seatunnel.connectors.seatunnel.redis.client;
 
 import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.JedisWrapper;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 
 import org.apache.commons.collections4.CollectionUtils;
 
+import redis.clients.jedis.ConnectionPool;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -31,8 +35,14 @@ import java.util.Map;
 import java.util.Set;
 
 public class RedisClusterClient extends RedisClient {
+    private final List<Map.Entry<String, ConnectionPool>> nodes;
+    private final JedisWrapper jedisWrapper;
+
     public RedisClusterClient(RedisParameters redisParameters, Jedis jedis, 
int redisVersion) {
         super(redisParameters, jedis, redisVersion);
+
+        this.jedisWrapper = (JedisWrapper) jedis;
+        this.nodes = new 
ArrayList<>(jedisWrapper.getClusterNodes().entrySet());
     }
 
     @Override
@@ -103,9 +113,9 @@ public class RedisClusterClient extends RedisClient {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
             if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
-                RedisDataType.STRING.del(this, keys.get(i), values.get(i));
+                RedisDataType.STRING.del(jedis, keys.get(i), values.get(i));
             } else {
-                RedisDataType.STRING.set(this, keys.get(i), values.get(i), 
expireSeconds);
+                RedisDataType.STRING.set(jedis, keys.get(i), values.get(i), 
expireSeconds);
             }
         }
     }
@@ -116,9 +126,9 @@ public class RedisClusterClient extends RedisClient {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
             if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
-                RedisDataType.LIST.del(this, keys.get(i), values.get(i));
+                RedisDataType.LIST.del(jedis, keys.get(i), values.get(i));
             } else {
-                RedisDataType.LIST.set(this, keys.get(i), values.get(i), 
expireSeconds);
+                RedisDataType.LIST.set(jedis, keys.get(i), values.get(i), 
expireSeconds);
             }
         }
     }
@@ -129,9 +139,9 @@ public class RedisClusterClient extends RedisClient {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
             if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
-                RedisDataType.SET.del(this, keys.get(i), values.get(i));
+                RedisDataType.SET.del(jedis, keys.get(i), values.get(i));
             } else {
-                RedisDataType.SET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+                RedisDataType.SET.set(jedis, keys.get(i), values.get(i), 
expireSeconds);
             }
         }
     }
@@ -142,9 +152,9 @@ public class RedisClusterClient extends RedisClient {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
             if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
-                RedisDataType.HASH.del(this, keys.get(i), values.get(i));
+                RedisDataType.HASH.del(jedis, keys.get(i), values.get(i));
             } else {
-                RedisDataType.HASH.set(this, keys.get(i), values.get(i), 
expireSeconds);
+                RedisDataType.HASH.set(jedis, keys.get(i), values.get(i), 
expireSeconds);
             }
         }
     }
@@ -155,10 +165,66 @@ public class RedisClusterClient extends RedisClient {
         int size = keys.size();
         for (int i = 0; i < size; i++) {
             if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) == 
RowKind.UPDATE_BEFORE) {
-                RedisDataType.ZSET.del(this, keys.get(i), values.get(i));
+                RedisDataType.ZSET.del(jedis, keys.get(i), values.get(i));
+            } else {
+                RedisDataType.ZSET.set(jedis, keys.get(i), values.get(i), 
expireSeconds);
+            }
+        }
+    }
+
+    /** In cluster mode, traverse and scan each node key */
+    @Override
+    public ScanResult<String> scanKeyResult(
+            final String cursor, final ScanParams params, final RedisDataType 
type) {
+        // Create a composite cursor to traverse the cluster nodes
+        // the format is "Node Index:Node cursor"
+        int nodeIndex = 0;
+        String nodeCursor = cursor;
+        boolean isFirstScan = !cursor.contains(":");
+
+        if (!ScanParams.SCAN_POINTER_START.equals(cursor) && 
cursor.contains(":")) {
+            String[] parts = cursor.split(":", 2);
+            nodeIndex = Integer.parseInt(parts[0]);
+            nodeCursor = parts[1];
+        }
+
+        // All nodes have been scanned
+        if (nodeIndex >= nodes.size()) {
+            return new ScanResult<>(ScanParams.SCAN_POINTER_START, new 
ArrayList<>());
+        }
+
+        List<String> resultKeys;
+        String nextCursor;
+
+        Map.Entry<String, ConnectionPool> connectionPoolEntry = 
nodes.get(nodeIndex);
+        Jedis jedis = jedisWrapper.getJedis(connectionPoolEntry.getKey());
+
+        // Perform the scan operation
+        ScanResult<String> scanResult;
+        if (type != null) {
+            // redis 7
+            scanResult = jedis.scan(nodeCursor, params, type.name());
+        } else {
+            // redis 5
+            scanResult = jedis.scan(nodeCursor, params);
+        }
+
+        resultKeys = new ArrayList<>(scanResult.getResult());
+
+        // Generate the next cursor
+        if (!isFirstScan && 
ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) {
+            // The current node scan has been completed. Move to the next node
+            nodeIndex++;
+            if (nodeIndex < nodes.size()) {
+                nextCursor = nodeIndex + ":" + ScanParams.SCAN_POINTER_START;
             } else {
-                RedisDataType.ZSET.set(this, keys.get(i), values.get(i), 
expireSeconds);
+                nextCursor = ScanParams.SCAN_POINTER_START;
             }
+        } else {
+            // The current node has not been fully scanned. Update the 
composite cursor
+            nextCursor = nodeIndex + ":" + scanResult.getCursor();
         }
+
+        return new ScanResult<>(nextCursor, resultKeys);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index 07efefe337..c9148fc46f 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.client;
 
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
@@ -29,6 +30,8 @@ import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Pipeline;
 import redis.clients.jedis.Response;
 import redis.clients.jedis.exceptions.JedisException;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -260,6 +263,19 @@ public class RedisSingleClient extends RedisClient {
         processResponses(responses);
     }
 
+    @Override
+    public ScanResult<String> scanKeyResult(
+            String cursor, ScanParams scanParams, RedisDataType type) {
+
+        if (type == null) {
+            // redis 5
+            return jedis.scan(cursor, scanParams);
+        } else {
+            // redis 7
+            return jedis.scan(cursor, scanParams, type.name());
+        }
+    }
+
     private void processResponses(List<Response<?>> responseList) {
         try {
             for (Response<?> response : responseList) {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
index 8348f3561f..cac0441b16 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
@@ -17,16 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redis.config;
 
+import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
+
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.ConnectionPool;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisCluster;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_INFO_ERROR;
 
+@Slf4j
 public class JedisWrapper extends Jedis {
     private final JedisCluster jedisCluster;
+    private final Map<String, Jedis> jedisPoolMap = new ConcurrentHashMap<>();
 
     public JedisWrapper(@NonNull JedisCluster jedisCluster) {
         this.jedisCluster = jedisCluster;
@@ -82,8 +92,77 @@ public class JedisWrapper extends Jedis {
         return jedisCluster.zrange(key, start, stop);
     }
 
+    @Override
+    public String info() {
+        Map<String, ConnectionPool> nodes = jedisCluster.getClusterNodes();
+        if (nodes.isEmpty()) {
+            throw new RedisConnectorException(
+                    GET_REDIS_INFO_ERROR, "No available nodes in cluster");
+        }
+
+        // Traverse all nodes and try to obtain the info
+        for (Map.Entry<String, ConnectionPool> entry : nodes.entrySet()) {
+            try {
+                Jedis jedis = getJedis(entry.getKey());
+                return jedis.info();
+            } catch (Exception e) {
+                log.warn("Failed to get info from node: {}", entry.getKey(), 
e);
+            }
+        }
+
+        throw new RedisConnectorException(
+                GET_REDIS_INFO_ERROR, "Failed to get redis info from all node 
in cluster");
+    }
+
+    @Override
+    public String type(String key) {
+        return jedisCluster.type(key);
+    }
+
+    public Map<String, ConnectionPool> getClusterNodes() {
+        return jedisCluster.getClusterNodes();
+    }
+
+    @Override
+    public long expire(final String key, final long seconds) {
+        return jedisCluster.expire(key, seconds);
+    }
+
     @Override
     public void close() {
         jedisCluster.close();
+        jedisPoolMap.values().forEach(Jedis::close);
+        jedisPoolMap.clear();
+    }
+
+    public Jedis getJedis(String node) {
+        Jedis jedis = jedisPoolMap.get(node);
+        if (jedis != null) {
+            return jedis;
+        }
+
+        // Lazy initialization
+        Map<String, ConnectionPool> clusterNodes = 
jedisCluster.getClusterNodes();
+        ConnectionPool connectionPool = clusterNodes.get(node);
+        if (connectionPool == null) {
+            throw new RedisConnectorException(
+                    RedisErrorCode.REDIS_CONNECTION_ERROR, "Node not found in 
cluster: " + node);
+        }
+
+        return getOrCreateJedis(node, connectionPool);
+    }
+
+    private Jedis getOrCreateJedis(String node, ConnectionPool connectionPool) 
{
+        return jedisPoolMap.computeIfAbsent(
+                node,
+                k -> {
+                    try {
+                        return new Jedis(connectionPool.getResource());
+                    } catch (Exception e) {
+                        throw new RedisConnectorException(
+                                RedisErrorCode.REDIS_CONNECTION_ERROR,
+                                "Redis connection error. node: " + node);
+                    }
+                });
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
index 7f398c9bcd..2694c61b9d 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
@@ -109,8 +109,15 @@ public class RedisBaseOptions {
                             "batch_size is used to control the size of a batch 
of data during read and write operations"
                                     + ",default 10");
 
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription(
+                            "The separator between columns in a row of data. 
Only needed by `text` file format. default is ','");
+
     public enum Format {
         JSON,
-        // TEXT will be supported later
+        TEXT,
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 5f4fee58b9..0ca8d0bfb3 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -26,8 +26,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient
 import 
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 
-import org.apache.logging.log4j.core.util.Assert;
-
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import redis.clients.jedis.ConnectionPoolConfig;
@@ -42,6 +40,7 @@ import java.util.List;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_VERSION_INFO_FAILED;
 import static 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.INVALID_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.REDIS_NODE_EMPTY_ERROR;
 
 @Data
 @Slf4j
@@ -66,6 +65,8 @@ public class RedisParameters implements Serializable {
     private String valueField;
     private String hashKeyField;
     private String hashValueField;
+    private String fieldDelimiter;
+    private RedisBaseOptions.Format format;
 
     private int redisVersion;
 
@@ -138,6 +139,12 @@ public class RedisParameters implements Serializable {
         if (config.getOptional(RedisSinkOptions.HASH_VALUE_FIELD).isPresent()) 
{
             this.hashValueField = 
config.get(RedisSinkOptions.HASH_VALUE_FIELD);
         }
+
+        // set format, default json
+        this.format = config.get(RedisBaseOptions.FORMAT);
+
+        // set field delimiter, only need when format is TEXT
+        this.fieldDelimiter = config.get(RedisBaseOptions.FIELD_DELIMITER);
     }
 
     public RedisClient buildRedisClient() {
@@ -192,7 +199,10 @@ public class RedisParameters implements Serializable {
                 return jedis;
             case CLUSTER:
                 HashSet<HostAndPort> nodes = new HashSet<>();
-                Assert.requireNonEmpty(redisNodes, "nodes parameter must not 
be empty");
+                if (redisNodes.isEmpty()) {
+                    throw new RedisConnectorException(
+                            REDIS_NODE_EMPTY_ERROR, "Redis nodes parameter 
must not be empty");
+                }
                 for (String redisNode : redisNodes) {
                     String[] splits = redisNode.split(":");
                     if (splits.length != 2) {
@@ -220,7 +230,6 @@ public class RedisParameters implements Serializable {
                     jedisCluster = new JedisCluster(nodes);
                 }
                 JedisWrapper jedisWrapper = new JedisWrapper(jedisCluster);
-                jedisWrapper.select(dbNum);
                 return jedisWrapper;
             default:
                 // do nothing
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
index a82a1425de..076a1a69c2 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
@@ -21,7 +21,10 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 public enum RedisErrorCode implements SeaTunnelErrorCode {
     GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the 
redis version"),
     INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"),
-    GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write 
response");
+    GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write 
response"),
+    GET_REDIS_INFO_ERROR("RedisErrorCode-04", "Failed to get redis info in 
cluster mode."),
+    REDIS_NODE_EMPTY_ERROR("RedisErrorCode-05", "Redis nodes parameter is 
empty"),
+    REDIS_CONNECTION_ERROR("RedisErrorCode-06", "Redis connection error");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 9207477d84..0adab4300f 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -19,19 +19,26 @@ package 
org.apache.seatunnel.connectors.seatunnel.redis.sink;
 
 import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.PlaceholderUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,12 +47,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+@Slf4j
 public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
         implements SupportMultiTableSinkWriter<Void> {
-    private static final String REDIS_GROUP_DELIMITER = ":";
-    private static final String LEFT_PLACEHOLDER_MARKER = "{";
-    private static final String RIGHT_PLACEHOLDER_MARKER = "}";
+    private static final Pattern LEGACY_PLACEHOLDER_PATTERN =
+            Pattern.compile("(?<!\\$)\\{([^}]+)\\}");
+    private static final Pattern PLACEHOLDER_PATTERN = 
Pattern.compile("\\$\\{([^}]+)\\}");
     private final SeaTunnelRowType seaTunnelRowType;
     private final RedisParameters redisParameters;
     private final SerializationSchema serializationSchema;
@@ -60,9 +70,7 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters 
redisParameters) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.redisParameters = redisParameters;
-        // TODO according to format to initialize serializationSchema
-        // Now temporary using json serializationSchema
-        this.serializationSchema = new 
JsonSerializationSchema(seaTunnelRowType);
+        this.serializationSchema = createSerializationSchema(redisParameters, 
seaTunnelRowType);
         this.redisClient = redisParameters.buildRedisClient();
         this.batchSize = redisParameters.getBatchSize();
         this.rowKinds = new ArrayList<>(batchSize);
@@ -81,6 +89,8 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         if (keyBuffer.size() >= batchSize) {
             flush();
         }
+
+        log.debug("write redis key: {}, value: {}, rowKind: {}", key, value, 
element.getRowKind());
     }
 
     private String getKey(SeaTunnelRow element, List<String> fields) {
@@ -101,28 +111,40 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
         }
     }
 
-    private String getCustomKey(SeaTunnelRow element, List<String> fields, 
String keyField) {
-        String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
-        StringBuilder key = new StringBuilder();
-        for (int i = 0; i < keyFieldSegments.length; i++) {
-            String keyFieldSegment = keyFieldSegments[i];
-            if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER)
-                    && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
-                String realKeyField = keyFieldSegment.substring(1, 
keyFieldSegment.length() - 1);
-                if (fields.contains(realKeyField)) {
-                    Object realFieldValue = 
element.getField(fields.indexOf(realKeyField));
-                    key.append(realFieldValue == null ? "" : 
realFieldValue.toString());
-                } else {
-                    key.append(keyFieldSegment);
-                }
-            } else {
-                key.append(keyFieldSegment);
-            }
-            if (i != keyFieldSegments.length - 1) {
-                key.append(REDIS_GROUP_DELIMITER);
-            }
+    protected String getCustomKey(SeaTunnelRow element, List<String> fields, 
String keyField) {
+        // First, detect and convert the old format placeholders to the new 
format
+        String normalizedKeyField = normalizePlaceholders(keyField);
+
+        Matcher matcher = PLACEHOLDER_PATTERN.matcher(normalizedKeyField);
+
+        Map<String, String> placeholderValues = new HashMap<>();
+
+        while (matcher.find()) {
+            String fieldName = matcher.group(1);
+            String fieldValue = getFieldValue(element, fields, fieldName);
+            placeholderValues.put(fieldName, fieldValue);
+        }
+
+        return placeholderValues.keySet().stream()
+                .reduce(
+                        normalizedKeyField,
+                        (result, placeholderName) -> {
+                            return PlaceholderUtils.replacePlaceholders(
+                                    result,
+                                    placeholderName,
+                                    placeholderValues.get(placeholderName),
+                                    null);
+                        });
+    }
+
+    private String getFieldValue(SeaTunnelRow element, List<String> fields, 
String fieldName) {
+        if (fields.contains(fieldName)) {
+            Object fieldValue = element.getField(fields.indexOf(fieldName));
+            return fieldValue == null ? "" : fieldValue.toString();
+        } else {
+            // If the field does not exist, return the original field name
+            return fieldName;
         }
-        return key.toString();
     }
 
     private String getValue(SeaTunnelRow element, List<String> fields) {
@@ -219,6 +241,45 @@ public class RedisSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
                 "UnSupport redisDataType,only support 
string,list,hash,set,zset");
     }
 
+    private SerializationSchema createSerializationSchema(
+            RedisParameters redisParameters, SeaTunnelRowType rowType) {
+
+        RedisBaseOptions.Format format = redisParameters.getFormat();
+
+        switch (format) {
+            case JSON:
+                return new JsonSerializationSchema(rowType);
+            case TEXT:
+                String fieldDelimiter = redisParameters.getFieldDelimiter();
+                return TextSerializationSchema.builder()
+                        .seaTunnelRowType(rowType)
+                        .delimiter(fieldDelimiter)
+                        .build();
+            default:
+                throw new RedisConnectorException(
+                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                        String.format(
+                                "PluginName: %s, PluginType: %s, Message: %s",
+                                RedisBaseOptions.CONNECTOR_IDENTITY,
+                                PluginType.SINK,
+                                "Unsupported format: " + format));
+        }
+    }
+
+    private String normalizePlaceholders(String input) {
+        if (input == null) {
+            return input;
+        }
+
+        Matcher legacyMatcher = LEGACY_PLACEHOLDER_PATTERN.matcher(input);
+        if (legacyMatcher.find()) {
+            // Convert legacy format {fieldName} to ${fieldName}
+            return legacyMatcher.replaceAll("\\$\\{$1\\}");
+        }
+
+        return input;
+    }
+
     @Override
     public void close() throws IOException {
         flush();
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 27deb707d8..caf972b211 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -21,7 +21,7 @@ import 
org.apache.seatunnel.shade.com.google.common.collect.Lists;
 
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import java.util.List;
 
@@ -54,25 +55,42 @@ public class RedisSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
     public RedisSource(ReadonlyConfig readonlyConfig) {
 
         this.redisParameters.buildWithConfig(readonlyConfig);
+
+        createCatalogTableAndDeserializationSchema(readonlyConfig);
+    }
+
+    private void createCatalogTableAndDeserializationSchema(ReadonlyConfig 
readonlyConfig) {
         // TODO: use format SPI
         // default use json format
-        if (readonlyConfig.getOptional(RedisBaseOptions.FORMAT).isPresent()) {
-            if 
(!readonlyConfig.getOptional(SinkConnectorCommonOptions.SCHEMA).isPresent()) {
-                throw new RedisConnectorException(
-                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                        String.format(
-                                "PluginName: %s, PluginType: %s, Message: %s",
-                                getPluginName(),
-                                PluginType.SOURCE,
-                                "Must config schema when format parameter been 
config"));
-            }
+        RedisBaseOptions.Format format = 
readonlyConfig.get(RedisBaseOptions.FORMAT);
+
+        // if config schema, create deserialization schema and catalog table 
by config
+        // else create catalog with simple text
+        if 
(readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
+            this.catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
+            this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
 
-            RedisBaseOptions.Format format = 
readonlyConfig.get(RedisBaseOptions.FORMAT);
-            if (RedisBaseOptions.Format.JSON.equals(format)) {
-                this.catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
-                this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
-                this.deserializationSchema =
-                        new JsonDeserializationSchema(catalogTable, false, 
false);
+            switch (format) {
+                case JSON:
+                    this.deserializationSchema =
+                            new JsonDeserializationSchema(catalogTable, false, 
false);
+                    break;
+                case TEXT:
+                    String fieldDelimiter = 
readonlyConfig.get(RedisBaseOptions.FIELD_DELIMITER);
+                    this.deserializationSchema =
+                            TextDeserializationSchema.builder()
+                                    .seaTunnelRowType(seaTunnelRowType)
+                                    .delimiter(fieldDelimiter)
+                                    .build();
+                    break;
+                default:
+                    throw new RedisConnectorException(
+                            SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                            String.format(
+                                    "PluginName: %s, PluginType: %s, Message: 
%s",
+                                    getPluginName(),
+                                    PluginType.SOURCE,
+                                    "Unsupported format: " + format));
             }
         } else {
             this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriterTest.java
new file mode 100644
index 0000000000..b3f2259189
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+
+import static org.mockito.Mockito.when;
+
+public class RedisSinkWriterTest {
+
+    private RedisClient mockRedisClient;
+
+    private RedisParameters mockRedisParameters;
+
+    private SeaTunnelRowType rowType;
+    private RedisSinkWriter redisSinkWriter;
+
+    @BeforeEach
+    void setUp() {
+        rowType =
+                new SeaTunnelRowType(
+                        new String[] {"id", "name", "age", "email"},
+                        new SeaTunnelDataType<?>[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE
+                        });
+
+        mockRedisParameters = Mockito.mock(RedisParameters.class);
+        mockRedisClient = Mockito.mock(RedisClient.class);
+
+        
when(mockRedisParameters.buildRedisClient()).thenReturn(mockRedisClient);
+        when(mockRedisParameters.getBatchSize()).thenReturn(3);
+        
when(mockRedisParameters.getFormat()).thenReturn(RedisBaseOptions.Format.JSON);
+        when(mockRedisParameters.getFieldDelimiter()).thenReturn(",");
+    }
+
+    @Test
+    void testGetCustomKey() {
+        // Set custom key mode
+        
when(mockRedisParameters.getKeyField()).thenReturn("user:${id}:profile");
+        when(mockRedisParameters.getSupportCustomKey()).thenReturn(true);
+        
when(mockRedisParameters.getRedisDataType()).thenReturn(RedisDataType.STRING);
+        when(mockRedisParameters.getExpire()).thenReturn(3600L);
+
+        redisSinkWriter = new RedisSinkWriter(rowType, mockRedisParameters);
+
+        // create test data
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "Alice", 25, 
"[email protected]"});
+        row.setRowKind(RowKind.INSERT);
+
+        String customKey =
+                redisSinkWriter.getCustomKey(
+                        row,
+                        Arrays.asList(rowType.getFieldNames()),
+                        mockRedisParameters.getKeyField());
+
+        Assertions.assertEquals("user:1:profile", customKey);
+    }
+
+    @Test
+    public void testLegacyCustomKey() {
+        
when(mockRedisParameters.getKeyField()).thenReturn("user:{id}:profile");
+
+        when(mockRedisParameters.getSupportCustomKey()).thenReturn(true);
+        
when(mockRedisParameters.getRedisDataType()).thenReturn(RedisDataType.STRING);
+        when(mockRedisParameters.getExpire()).thenReturn(3600L);
+
+        redisSinkWriter = new RedisSinkWriter(rowType, mockRedisParameters);
+
+        // create test data
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "Alice", 25, 
"[email protected]"});
+        row.setRowKind(RowKind.INSERT);
+
+        String customKey =
+                redisSinkWriter.getCustomKey(
+                        row,
+                        Arrays.asList(rowType.getFieldNames()),
+                        mockRedisParameters.getKeyField());
+
+        Assertions.assertEquals("user:1:profile", customKey);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
new file mode 100644
index 0000000000..f57214ae0c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.redis;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.ConnectionPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisClusterIT extends TestSuiteBase implements TestResource {
+
+    private static final int REDIS_CLUSTER_SIZE = 3;
+
+    private GenericContainer<?>[] redisClusterNodes;
+    private JedisCluster jedisCluster;
+
+    private RedisContainerInfo redisContainerInfo =
+            new RedisContainerInfo("redis-cluster-e2e", 6379, "SeaTunnel", 
"redis:7");
+
+    private static final int[] REDIS_PORTS = {6379, 6380, 6381};
+    private static final int[] REDIS_BUS_PORTS = {16379, 16380, 16381};
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        setupRedisContainer();
+        createRedisCluster();
+        waitForRedisClusterReady();
+        initJedisCluster();
+        initSourceData();
+    }
+
+    private void setupRedisContainer() {
+        redisClusterNodes = new GenericContainer[REDIS_CLUSTER_SIZE];
+
+        for (int i = 0; i < REDIS_CLUSTER_SIZE; i++) {
+            String nodeName = "redis-cluster-" + (i + 1);
+            int redisPort = REDIS_PORTS[i];
+            int busPort = REDIS_BUS_PORTS[i];
+
+            // Get the host machine's IP address
+            String hostIp = getHostIpAddress();
+            String redisCommand =
+                    String.format(
+                            "redis-server --cluster-enabled yes --port %d 
--protected-mode no "
+                                    + "--bind 0.0.0.0 --cluster-announce-ip %s 
--cluster-announce-port %d "
+                                    + "--cluster-announce-bus-port %d 
--requirepass %s",
+                            redisPort,
+                            hostIp,
+                            redisPort,
+                            busPort,
+                            redisContainerInfo.getPassword());
+
+            redisClusterNodes[i] =
+                    new 
GenericContainer<>(DockerImageName.parse(redisContainerInfo.getImageName()))
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases(nodeName)
+                            .withExposedPorts(redisPort, busPort)
+                            .withLogConsumer(
+                                    new Slf4jLogConsumer(
+                                            DockerLoggerFactory.getLogger(
+                                                    
redisContainerInfo.getImageName())))
+                            .withCommand("sh", "-c", redisCommand)
+                            .waitingFor(
+                                    new HostPortWaitStrategy()
+                                            
.withStartupTimeout(Duration.ofMinutes(2)));
+
+            // Set the fixed port mapping
+            redisClusterNodes[i].setPortBindings(
+                    Arrays.asList(redisPort + ":" + redisPort, busPort + ":" + 
busPort));
+        }
+
+        Startables.deepStart(Stream.of(redisClusterNodes)).join();
+        log.info("Redis cluster nodes started with ports: {}", 
Arrays.toString(REDIS_PORTS));
+    }
+
+    private void createRedisCluster() {
+        try {
+            String hostIp = getHostIpAddress();
+            StringBuilder clusterCreateCmd =
+                    new StringBuilder(
+                            "redis-cli --cluster create --cluster-replicas 0 
--cluster-yes ");
+
+            for (int port : REDIS_PORTS) {
+                
clusterCreateCmd.append(hostIp).append(":").append(port).append(" ");
+            }
+
+            clusterCreateCmd.append("-a 
").append(redisContainerInfo.getPassword());
+
+            log.info("Creating cluster with command: {}", clusterCreateCmd);
+
+            Container.ExecResult result =
+                    redisClusterNodes[0].execInContainer("sh", "-c", 
clusterCreateCmd.toString());
+
+            // Wait for the cluster to be created
+            Thread.sleep(5000);
+
+            if (result.getExitCode() != 0) {
+                throw new RuntimeException("Failed to create Redis cluster: " 
+ result.getStderr());
+            }
+
+            log.info("Redis cluster created successfully");
+        } catch (Exception e) {
+            throw new RuntimeException("Error creating Redis cluster", e);
+        }
+    }
+
+    private void waitForRedisClusterReady() {
+        log.info("Waiting for Redis cluster to be ready...");
+
+        int maxRetries = 10;
+        int retryCount = 0;
+
+        while (retryCount < maxRetries) {
+            try {
+                boolean allReady = true;
+
+                for (int i = 0; i < REDIS_CLUSTER_SIZE; i++) {
+                    Container.ExecResult result =
+                            redisClusterNodes[i].execInContainer(
+                                    "redis-cli",
+                                    "-p",
+                                    String.valueOf(REDIS_PORTS[i]),
+                                    "-a",
+                                    redisContainerInfo.getPassword(),
+                                    "ping");
+
+                    if (!"PONG".equals(result.getStdout().trim())) {
+                        allReady = false;
+                        break;
+                    }
+                }
+
+                if (allReady) {
+                    log.info("All Redis nodes are ready after {} attempts", 
retryCount + 1);
+                    return;
+                }
+
+            } catch (Exception e) {
+                log.debug(
+                        "Redis readiness check failed, attempt {}: {}",
+                        retryCount + 1,
+                        e.getMessage());
+            }
+
+            retryCount++;
+            try {
+                Thread.sleep(3000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        throw new RuntimeException("Redis cluster failed to become ready 
within timeout");
+    }
+
+    private void initJedisCluster() {
+        Set<HostAndPort> jedisClusterNodes = new HashSet<>();
+
+        String hostIp = getHostIpAddress();
+        for (int port : REDIS_PORTS) {
+            jedisClusterNodes.add(new HostAndPort(hostIp, port));
+        }
+
+        ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
+
+        try {
+            this.jedisCluster =
+                    new JedisCluster(
+                            jedisClusterNodes,
+                            10000,
+                            10000,
+                            3,
+                            redisContainerInfo.getPassword(),
+                            poolConfig);
+
+            log.info("JedisCluster initialized successfully");
+
+        } catch (Exception e) {
+            log.error("Failed to create JedisCluster", e);
+            throw e;
+        }
+    }
+
+    private void initSourceData() {
+        JsonSerializationSchema jsonSerializationSchema =
+                new JsonSerializationSchema(generateTestDataSet().getKey());
+        List<SeaTunnelRow> rows = generateTestDataSet().getValue();
+
+        for (int i = 0; i < rows.size(); i++) {
+            jedisCluster.set(
+                    "key_test" + i, new 
String(jsonSerializationSchema.serialize(rows.get(i))));
+        }
+
+        log.info("Initialized {} test records in Redis cluster", rows.size());
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (jedisCluster != null) {
+            try {
+                jedisCluster.close();
+
+                log.info("JedisCluster closed successfully");
+            } catch (Exception e) {
+                log.warn("Error closing JedisCluster", e);
+            }
+        }
+
+        if (redisClusterNodes != null) {
+            for (GenericContainer<?> container : redisClusterNodes) {
+                if (container != null) {
+                    try {
+                        container.close();
+                    } catch (Exception e) {
+                        log.warn("Error stopping container", e);
+                    }
+                }
+            }
+        }
+    }
+
+    @TestTemplate
+    public void testRedisClusterScan(TestContainer container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult =
+                    container.executeJob("/cluster-redis-to-redis-scan.conf");
+            Assertions.assertEquals(0, execResult.getExitCode());
+
+            long listLength = jedisCluster.llen("key_list");
+            Assertions.assertEquals(100, listLength);
+        } finally {
+            jedisCluster.del("key_list");
+            Assertions.assertEquals(0, jedisCluster.llen("key_list"));
+        }
+    }
+
+    @TestTemplate
+    public void testRedisClusterCustomValueWithKeyType(TestContainer container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult =
+                    
container.executeJob("/cluster-redis-to-redis-type-key.conf");
+            Assertions.assertEquals(0, execResult.getExitCode());
+
+            int count = 0;
+            for (int i = 0; i < 100; i++) {
+                String data = jedisCluster.get("cluster-key-value-check-" + i);
+                if (data != null) {
+                    Assertions.assertEquals("string", data);
+                    count++;
+                }
+            }
+            Assertions.assertEquals(100, count);
+        } finally {
+            for (int i = 0; i < 100; i++) {
+                jedisCluster.del("cluster-key-value-check-" + i);
+            }
+        }
+    }
+
+    @TestTemplate
+    public void testRedisClusterCustomValueWithSetType(TestContainer container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult =
+                    
container.executeJob("/cluster-redis-to-redis-type-set.conf");
+            Assertions.assertEquals(0, execResult.getExitCode());
+
+            long amount = jedisCluster.scard("cluster-set-value-check");
+            Assertions.assertEquals(100, amount);
+        } finally {
+            jedisCluster.del("cluster-set-value-check");
+        }
+    }
+
+    @TestTemplate
+    public void testRedisClusterCustomValueWithListType(TestContainer 
container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult =
+                    
container.executeJob("/cluster-redis-to-redis-type-list.conf");
+            Assertions.assertEquals(0, execResult.getExitCode());
+
+            long amount = jedisCluster.llen("cluster-list-value-check");
+            Assertions.assertEquals(100, amount);
+        } finally {
+            jedisCluster.del("cluster-list-value-check");
+        }
+    }
+
+    @TestTemplate
+    public void testRedisClusterCustomValueWithZSetType(TestContainer 
container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult =
+                    
container.executeJob("/cluster-redis-to-redis-type-zset.conf");
+            Assertions.assertEquals(0, execResult.getExitCode());
+
+            long amount = jedisCluster.zcard("cluster-zset-value-check");
+            Assertions.assertEquals(100, amount);
+        } finally {
+            jedisCluster.del("cluster-zset-value-check");
+        }
+    }
+
+    @TestTemplate
+    public void testRedisClusterCustomValueWithHashType(TestContainer 
container)
+            throws IOException, InterruptedException {
+        try {
+            Container.ExecResult execResult =
+                    
container.executeJob("/cluster-redis-to-redis-type-hash.conf");
+            Assertions.assertEquals(0, execResult.getExitCode());
+
+            long amount = jedisCluster.hlen("cluster-hash-value-check");
+            Assertions.assertEquals(100, amount);
+            for (int i = 0; i < 100; i++) {
+                Assertions.assertEquals(
+                        "string", 
jedisCluster.hget("cluster-hash-value-check", String.valueOf(i)));
+            }
+        } finally {
+            jedisCluster.del("cluster-hash-value-check");
+        }
+    }
+
+    protected Pair<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet() 
{
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {
+                            "id",
+                            "c_map",
+                            "c_array",
+                            "c_string",
+                            "c_boolean",
+                            "c_tinyint",
+                            "c_smallint",
+                            "c_int",
+                            "c_bigint",
+                            "c_float",
+                            "c_double",
+                            "c_decimal",
+                            "c_bytes",
+                            "c_date",
+                            "c_timestamp"
+                        },
+                        new SeaTunnelDataType[] {
+                            BasicType.LONG_TYPE,
+                            new MapType<>(BasicType.STRING_TYPE, 
BasicType.SHORT_TYPE),
+                            ArrayType.BYTE_ARRAY_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.BOOLEAN_TYPE,
+                            BasicType.BYTE_TYPE,
+                            BasicType.SHORT_TYPE,
+                            BasicType.INT_TYPE,
+                            BasicType.LONG_TYPE,
+                            BasicType.FLOAT_TYPE,
+                            BasicType.DOUBLE_TYPE,
+                            new DecimalType(2, 1),
+                            PrimitiveByteArrayType.INSTANCE,
+                            LocalTimeType.LOCAL_DATE_TYPE,
+                            LocalTimeType.LOCAL_DATE_TIME_TYPE
+                        });
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                Long.valueOf(i),
+                                Collections.singletonMap("key", 
Short.parseShort("1")),
+                                new Byte[] {Byte.parseByte("1")},
+                                "string",
+                                Boolean.FALSE,
+                                Byte.parseByte("1"),
+                                Short.parseShort("1"),
+                                Integer.parseInt("1"),
+                                Long.parseLong("1"),
+                                Float.parseFloat("1.1"),
+                                Double.parseDouble("1.1"),
+                                BigDecimal.valueOf(11, 1),
+                                "test".getBytes(),
+                                LocalDate.now(),
+                                LocalDateTime.now()
+                            });
+            rows.add(row);
+        }
+        return Pair.of(rowType, rows);
+    }
+
+    private String getHostIpAddress() {
+        String ip = "";
+        try {
+            Enumeration<NetworkInterface> networkInterfaces =
+                    NetworkInterface.getNetworkInterfaces();
+            while (networkInterfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = 
networkInterfaces.nextElement();
+                Enumeration<InetAddress> inetAddresses = 
networkInterface.getInetAddresses();
+                while (inetAddresses.hasMoreElements()) {
+                    InetAddress inetAddress = inetAddresses.nextElement();
+                    if (!inetAddress.isLoopbackAddress() && inetAddress 
instanceof Inet4Address) {
+                        ip = inetAddress.getHostAddress();
+                    }
+                }
+            }
+        } catch (SocketException ex) {
+            ex.printStackTrace();
+        }
+        return ip;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-scan.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-scan.conf
new file mode 100644
index 0000000000..71bdf85b9f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-scan.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    keys = "key_test*"
+    data_type = string
+    batch_size = 33
+  }
+}
+
+sink {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    key = "key_list"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-hash.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-hash.conf
new file mode 100644
index 0000000000..17bc037794
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-hash.conf
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    keys = "key_test*"
+    data_type = key
+    batch_size = 33
+    format = "json"
+        schema = {
+          table = "RedisDatabase.RedisTable"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "c_map"
+              type = "map<string, smallint>"
+            },
+            {
+              name = "c_array"
+              type = "array<tinyint>"
+            },
+            {
+              name = "c_string"
+              type = "string"
+            },
+            {
+              name = "c_boolean"
+              type = "boolean"
+            },
+            {
+              name = "c_tinyint"
+              type = "tinyint"
+            },
+            {
+              name = "c_smallint"
+              type = "smallint"
+            },
+            {
+              name = "c_int"
+              type = "int"
+            },
+            {
+              name = "c_bigint"
+              type = "bigint"
+            },
+            {
+              name = "c_float"
+              type = "float"
+            },
+            {
+              name = "c_double"
+              type = "double"
+            },
+            {
+              name = "c_decimal"
+              type = "decimal(2,1)"
+            },
+            {
+              name = "c_bytes"
+              type = "bytes"
+            },
+            {
+              name = "c_date"
+              type = "date"
+            },
+            {
+              name = "c_timestamp"
+              type = "timestamp"
+            }
+          ]
+        }
+  }
+}
+
+sink {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    key = "cluster-hash-value-check"
+    hash_key_field = "id"
+    hash_value_field = "c_string"
+    data_type = hash
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-key.conf
new file mode 100644
index 0000000000..efce13dbb1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-key.conf
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    keys = "key_test*"
+    data_type = key
+    batch_size = 33
+    format = "json"
+        schema = {
+          table = "RedisDatabase.RedisTable"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "c_map"
+              type = "map<string, smallint>"
+            },
+            {
+              name = "c_array"
+              type = "array<tinyint>"
+            },
+            {
+              name = "c_string"
+              type = "string"
+            },
+            {
+              name = "c_boolean"
+              type = "boolean"
+            },
+            {
+              name = "c_tinyint"
+              type = "tinyint"
+            },
+            {
+              name = "c_smallint"
+              type = "smallint"
+            },
+            {
+              name = "c_int"
+              type = "int"
+            },
+            {
+              name = "c_bigint"
+              type = "bigint"
+            },
+            {
+              name = "c_float"
+              type = "float"
+            },
+            {
+              name = "c_double"
+              type = "double"
+            },
+            {
+              name = "c_decimal"
+              type = "decimal(2,1)"
+            },
+            {
+              name = "c_bytes"
+              type = "bytes"
+            },
+            {
+              name = "c_date"
+              type = "date"
+            },
+            {
+              name = "c_timestamp"
+              type = "timestamp"
+            }
+          ]
+        }
+  }
+}
+
+sink {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    key = "cluster-key-value-check-${id}"
+    support_custom_key = true
+    value_field = "c_string"
+    data_type = key
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-list.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-list.conf
new file mode 100644
index 0000000000..b55a5f925b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-list.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    keys = "key_test*"
+    data_type = key
+    batch_size = 33
+    format = "json"
+        schema = {
+          table = "RedisDatabase.RedisTable"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "c_map"
+              type = "map<string, smallint>"
+            },
+            {
+              name = "c_array"
+              type = "array<tinyint>"
+            },
+            {
+              name = "c_string"
+              type = "string"
+            },
+            {
+              name = "c_boolean"
+              type = "boolean"
+            },
+            {
+              name = "c_tinyint"
+              type = "tinyint"
+            },
+            {
+              name = "c_smallint"
+              type = "smallint"
+            },
+            {
+              name = "c_int"
+              type = "int"
+            },
+            {
+              name = "c_bigint"
+              type = "bigint"
+            },
+            {
+              name = "c_float"
+              type = "float"
+            },
+            {
+              name = "c_double"
+              type = "double"
+            },
+            {
+              name = "c_decimal"
+              type = "decimal(2,1)"
+            },
+            {
+              name = "c_bytes"
+              type = "bytes"
+            },
+            {
+              name = "c_date"
+              type = "date"
+            },
+            {
+              name = "c_timestamp"
+              type = "timestamp"
+            }
+          ]
+        }
+  }
+}
+
+sink {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    key = "cluster-list-value-check"
+    value_field = "id"
+    data_type = list
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-set.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-set.conf
new file mode 100644
index 0000000000..9eceea2f8b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-set.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    keys = "key_test*"
+    data_type = key
+    batch_size = 33
+    format = "json"
+        schema = {
+          table = "RedisDatabase.RedisTable"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "c_map"
+              type = "map<string, smallint>"
+            },
+            {
+              name = "c_array"
+              type = "array<tinyint>"
+            },
+            {
+              name = "c_string"
+              type = "string"
+            },
+            {
+              name = "c_boolean"
+              type = "boolean"
+            },
+            {
+              name = "c_tinyint"
+              type = "tinyint"
+            },
+            {
+              name = "c_smallint"
+              type = "smallint"
+            },
+            {
+              name = "c_int"
+              type = "int"
+            },
+            {
+              name = "c_bigint"
+              type = "bigint"
+            },
+            {
+              name = "c_float"
+              type = "float"
+            },
+            {
+              name = "c_double"
+              type = "double"
+            },
+            {
+              name = "c_decimal"
+              type = "decimal(2,1)"
+            },
+            {
+              name = "c_bytes"
+              type = "bytes"
+            },
+            {
+              name = "c_date"
+              type = "date"
+            },
+            {
+              name = "c_timestamp"
+              type = "timestamp"
+            }
+          ]
+        }
+  }
+}
+
+sink {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    key = "cluster-set-value-check"
+    value_field = "id"
+    data_type = set
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-zset.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-zset.conf
new file mode 100644
index 0000000000..fbdefbb458
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-zset.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    keys = "key_test*"
+    data_type = key
+    batch_size = 33
+    format = "json"
+        schema = {
+          table = "RedisDatabase.RedisTable"
+          columns = [
+            {
+              name = "id"
+              type = "bigint"
+            },
+            {
+              name = "c_map"
+              type = "map<string, smallint>"
+            },
+            {
+              name = "c_array"
+              type = "array<tinyint>"
+            },
+            {
+              name = "c_string"
+              type = "string"
+            },
+            {
+              name = "c_boolean"
+              type = "boolean"
+            },
+            {
+              name = "c_tinyint"
+              type = "tinyint"
+            },
+            {
+              name = "c_smallint"
+              type = "smallint"
+            },
+            {
+              name = "c_int"
+              type = "int"
+            },
+            {
+              name = "c_bigint"
+              type = "bigint"
+            },
+            {
+              name = "c_float"
+              type = "float"
+            },
+            {
+              name = "c_double"
+              type = "double"
+            },
+            {
+              name = "c_decimal"
+              type = "decimal(2,1)"
+            },
+            {
+              name = "c_bytes"
+              type = "bytes"
+            },
+            {
+              name = "c_date"
+              type = "date"
+            },
+            {
+              name = "c_timestamp"
+              type = "timestamp"
+            }
+          ]
+        }
+  }
+}
+
+sink {
+  Redis {
+    nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379", 
"redis-cluster-2:6379"]
+    mode = "CLUSTER"
+    auth = "SeaTunnel"
+    key = "cluster-zset-value-check"
+    value_field = "id"
+    data_type = zset
+    batch_size = 33
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 2c1ef8076a..207e793cec 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -424,7 +424,9 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || s.startsWith("LeaseRenewer")
                 // The read of hdfs which has the thread that is all in 
running status
                 || s.startsWith("org.apache.hadoop.hdfs.PeerCache")
-                || s.startsWith("java-sdk-progress-listener-callback-thread");
+                || s.startsWith("java-sdk-progress-listener-callback-thread")
+                // redis pool evictor daemon thread
+                || s.startsWith("commons-pool-evictor");
     }
 
     private void classLoaderObjectCheck(Integer maxSize) throws IOException, 
InterruptedException {


Reply via email to