This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 720ed0c756 [Feature][Connector-redis] fix redis cluster bug and add
cluster e2e (#9869)
720ed0c756 is described below
commit 720ed0c756a483b096c43ecdd752499f09927ff5
Author: JeremyXin <[email protected]>
AuthorDate: Sat Oct 18 19:46:33 2025 +0800
[Feature][Connector-redis] fix redis cluster bug and add cluster e2e (#9869)
Co-authored-by: JeremyXin <[email protected]>
---
docs/en/connector-v2/sink/Redis.md | 14 +-
docs/en/connector-v2/source/Redis.md | 34 +-
docs/zh/connector-v2/sink/Redis.md | 49 ++-
docs/zh/connector-v2/source/Redis.md | 34 +-
seatunnel-connectors-v2/connector-redis/pom.xml | 6 +
.../seatunnel/redis/client/RedisClient.java | 15 +-
.../seatunnel/redis/client/RedisClusterClient.java | 86 +++-
.../seatunnel/redis/client/RedisSingleClient.java | 16 +
.../seatunnel/redis/config/JedisWrapper.java | 79 ++++
.../seatunnel/redis/config/RedisBaseOptions.java | 9 +-
.../seatunnel/redis/config/RedisParameters.java | 17 +-
.../seatunnel/redis/exception/RedisErrorCode.java | 5 +-
.../seatunnel/redis/sink/RedisSinkWriter.java | 115 +++--
.../seatunnel/redis/source/RedisSource.java | 52 ++-
.../seatunnel/redis/sink/RedisSinkWriterTest.java | 114 +++++
.../e2e/connector/redis/RedisClusterIT.java | 473 +++++++++++++++++++++
.../resources/cluster-redis-to-redis-scan.conf | 43 ++
.../cluster-redis-to-redis-type-hash.conf | 111 +++++
.../resources/cluster-redis-to-redis-type-key.conf | 111 +++++
.../cluster-redis-to-redis-type-list.conf | 110 +++++
.../resources/cluster-redis-to-redis-type-set.conf | 110 +++++
.../cluster-redis-to-redis-type-zset.conf | 110 +++++
.../container/seatunnel/SeaTunnelContainer.java | 4 +-
23 files changed, 1623 insertions(+), 94 deletions(-)
diff --git a/docs/en/connector-v2/sink/Redis.md
b/docs/en/connector-v2/sink/Redis.md
index 1e6a2861a7..f05eaca3ec 100644
--- a/docs/en/connector-v2/sink/Redis.md
+++ b/docs/en/connector-v2/sink/Redis.md
@@ -32,6 +32,7 @@ Used to write data to Redis.
| value_field | string | no | - |
| hash_key_field | string | no | - |
| hash_value_field | string | no | - |
+| field_delimiter | string | no | ',' |
| common-options | | no | - |
### host [string]
@@ -119,7 +120,7 @@ redis nodes information, used in cluster mode, must like as
the following format
### format [string]
-The format of upstream data, now only support `json`, `text` will be supported
later, default `json`.
+The format of upstream data, currently support `json`, `text` format, default
`json`.
When you assign format is `json`, for example:
@@ -134,9 +135,18 @@ Connector will generate data as the following and write it
to redis:
```json
{"code": 200, "data": "get success", "success": "true"}
+```
+when you assign format is `text`, and set field_delimiter to `#`, connector
will generate data as the following and write it to redis:
+```text
+200#get success#true
```
+### field_delimiter [string]
+Field delimiter, used to tell connector how to slice and dice fields.
+
+Currently, only need to be configured when format is `text`. default is ",".
+
### expire [long]
Set redis expiration time, the unit is second. The default value is -1, keys
do not automatically expire by default.
@@ -219,7 +229,7 @@ custom key:
Redis {
host = localhost
port = 6379
- key = "name:{name}"
+ key = "name:${name}"
support_custom_key = true
data_type = key
}
diff --git a/docs/en/connector-v2/source/Redis.md
b/docs/en/connector-v2/source/Redis.md
index 99b3678c8d..f1daa06c9a 100644
--- a/docs/en/connector-v2/source/Redis.md
+++ b/docs/en/connector-v2/source/Redis.md
@@ -37,6 +37,7 @@ Used to read data from Redis.
| schema | config | yes when format=json | -
|
| format | string | no | json
|
| single_field_name | string | yes when read_key_enabled=true | -
|
+| field_delimiter | string | no | ','
|
| common-options | | no | -
|
### host [string]
@@ -252,21 +253,44 @@ connector will generate data as the following:
| ---- | ----------- | ------- |
| 200 | get success | true |
-when you assign format is `text`, connector will do nothing for upstream data,
for example:
+when you assign format is `text`, you can choose to specify the schema
information or not.
-upstream data is the following:
-
-```json
-{"code": 200, "data": "get success", "success": true}
+For example, upstream data is the following:
+```text
+200#get success#true
```
+If you do not assign data schema connector will treat the upstream data as the
following:
+
+| content |
+| -------------------------------------------------------- |
+| 200#get success#true |
+
+If you assign data schema, you should also assign the option `schema` and
`field_delimiter` as following:
+
+```hocon
+field_delimiter = "#"
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
connector will generate data as the following:
| content |
| -------------------------------------------------------- |
| {"code": 200, "data": "get success", "success": true} |
+### field_delimiter [string]
+Field delimiter, used to tell connector how to slice and dice fields.
+
+Currently, only need to be configured when format is text. default is ",".
+
### schema [config]
#### fields [config]
diff --git a/docs/zh/connector-v2/sink/Redis.md
b/docs/zh/connector-v2/sink/Redis.md
index 8391c5c0be..72e36aa003 100644
--- a/docs/zh/connector-v2/sink/Redis.md
+++ b/docs/zh/connector-v2/sink/Redis.md
@@ -17,22 +17,23 @@ import ChangeLog from '../changelog/connector-redis.md';
| name | type | required | default value |
|--------------------|---------|-----------------------|---------------|
| host | string | `mode=single`时必须 | - |
-| port | int | no | 6379 |
-| key | string | yes | - |
-| data_type | string | yes | - |
-| batch_size | int | no | 10 |
-| user | string | no | - |
-| auth | string | no | - |
-| db_num | int | no | 0 |
-| mode | string | no | single |
-| nodes | list | yes when mode=cluster | - |
-| format | string | no | json |
-| expire | long | no | -1 |
-| support_custom_key | boolean | no | false |
-| value_field | string | no | - |
-| hash_key_field | string | no | - |
-| hash_value_field | string | no | - |
-| common-options | | no | - |
+| port | int | 否 | 6379 |
+| key | string | 是 | - |
+| data_type | string | 是 | - |
+| batch_size | int | 否 | 10 |
+| user | string | 否 | - |
+| auth | string | 否 | - |
+| db_num | int | 否 | 0 |
+| mode | string | 否 | single |
+| nodes | list | `mode=cluster`时必须 | - |
+| format | string | 否 | json |
+| expire | long | 否 | -1 |
+| support_custom_key | boolean | 否 | false |
+| value_field | string | 否 | - |
+| hash_key_field | string | 否 | - |
+| hash_value_field | string | 否 | - |
+| field_delimiter | string | 否 | "," |
+| common-options | | 否 | - |
### host [string]
@@ -114,7 +115,7 @@ Redis 节点信息,在集群模式下使用,必须按如下格式:
### format [string]
-上游数据的格式,目前只支持 `json`,以后会支持 `text`,默认 `json`。
+上游数据的格式,目前只支持 `json`,`text`,默认 `json`。
当你指定格式为 `json` 时,例如:
@@ -130,6 +131,18 @@ Redis 节点信息,在集群模式下使用,必须按如下格式:
{"code": 200, "data": "获取成功", "success": "true"}
```
+当你指定format为`text`,并设置field_delimiter为`#`时,连接器将生成如下数据并将其写入redis:
+
+```text
+200#get success#true
+```
+
+### field_delimiter [string]
+字段分隔符,用于告诉连接器如何分割字段。
+
+目前仅当格式为text时需要配置。默认为","。
+
+
### expire [long]
设置 Redis 的过期时间,单位为秒。默认值为 -1,表示键不会自动过期。
@@ -210,7 +223,7 @@ Redis {
Redis {
host = localhost
port = 6379
- key = "name:{name}"
+ key = "name:${name}"
support_custom_key = true
data_type = key
}
diff --git a/docs/zh/connector-v2/source/Redis.md
b/docs/zh/connector-v2/source/Redis.md
index b2edca5ab2..6f9ce5f947 100644
--- a/docs/zh/connector-v2/source/Redis.md
+++ b/docs/zh/connector-v2/source/Redis.md
@@ -34,6 +34,7 @@ import ChangeLog from '../changelog/connector-redis.md';
| nodes | list | `mode=cluster` 时必须 | - |
| schema | config | `format=json` 时必须 | - |
| format | string | 否 | json |
+| field_delimiter | string | 否 | ',' |
| common-options | | 否 | - |
### host [string]
@@ -203,21 +204,44 @@ schema {
| ---- | ----------- | ------- |
| 200 | get success | true |
-当指定格式为 `text` 时,连接器不会对上游数据做任何处理,例如:
+当指定格式为 `text` 时,可以选择是否指定schema参数。
-当上游数据如下时:
+例如, 当上游数据如下时:
-```json
-{"code": 200, "data": "get success", "success": true}
+```text
+200#get success#true
+```
+
+如果不指定schema参数,连接器将按照以下方式处理上游数据:
+
+| content |
+| -------------------------------------------------------- |
+| 200#get success#true |
+
+如果指定schema参数,此时需要同时配置`schema`和`field_delimiter`,如下所示:
+```hocon
+field_delimiter = "#"
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
```
-连接器将会生成如下格式数据:
+连接器将生成如下数据:
| content |
| -------------------------------------------------------- |
| {"code": 200, "data": "get success", "success": true} |
+### field_delimiter [string]
+字段分隔符,用于告诉连接器如何分割字段。
+
+目前仅当格式为text时需要配置。默认为","。
+
### schema [config]
#### fields [config]
diff --git a/seatunnel-connectors-v2/connector-redis/pom.xml
b/seatunnel-connectors-v2/connector-redis/pom.xml
index 56e08122d9..755d1fb509 100644
--- a/seatunnel-connectors-v2/connector-redis/pom.xml
+++ b/seatunnel-connectors-v2/connector-redis/pom.xml
@@ -47,6 +47,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-text</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
index af7894795d..39362d27cc 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClient.java
@@ -30,7 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public abstract class RedisClient extends Jedis {
+public abstract class RedisClient {
protected final RedisParameters redisParameters;
@@ -62,14 +62,14 @@ public abstract class RedisClient extends Jedis {
if (redisVersion <= REDIS_5) {
return scanOnRedis5(cursor, scanParams, type);
} else {
- return jedis.scan(cursor, scanParams, type.name());
+ return scanKeyResult(cursor, scanParams, type);
}
}
// When the version is earlier than redis5, scan command does not support
type
private ScanResult<String> scanOnRedis5(
String cursor, ScanParams scanParams, RedisDataType type) {
- ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
+ ScanResult<String> scanResult = scanKeyResult(cursor, scanParams,
null);
String resultCursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
List<String> typeKeys = new ArrayList<>(keys.size());
@@ -82,6 +82,15 @@ public abstract class RedisClient extends Jedis {
return new ScanResult<>(resultCursor, typeKeys);
}
+ public void close() {
+ if (jedis != null) {
+ jedis.close();
+ }
+ }
+
+ public abstract ScanResult<String> scanKeyResult(
+ String cursor, ScanParams scanParams, RedisDataType type);
+
public abstract List<String> batchGetString(List<String> keys);
public abstract List<List<String>> batchGetList(List<String> keys);
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
index 6b12e0d40c..40dac98dae 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisClusterClient.java
@@ -18,12 +18,16 @@
package org.apache.seatunnel.connectors.seatunnel.redis.client;
import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.JedisWrapper;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.commons.collections4.CollectionUtils;
+import redis.clients.jedis.ConnectionPool;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
import java.util.ArrayList;
import java.util.List;
@@ -31,8 +35,14 @@ import java.util.Map;
import java.util.Set;
public class RedisClusterClient extends RedisClient {
+ private final List<Map.Entry<String, ConnectionPool>> nodes;
+ private final JedisWrapper jedisWrapper;
+
public RedisClusterClient(RedisParameters redisParameters, Jedis jedis,
int redisVersion) {
super(redisParameters, jedis, redisVersion);
+
+ this.jedisWrapper = (JedisWrapper) jedis;
+ this.nodes = new
ArrayList<>(jedisWrapper.getClusterNodes().entrySet());
}
@Override
@@ -103,9 +113,9 @@ public class RedisClusterClient extends RedisClient {
int size = keys.size();
for (int i = 0; i < size; i++) {
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
- RedisDataType.STRING.del(this, keys.get(i), values.get(i));
+ RedisDataType.STRING.del(jedis, keys.get(i), values.get(i));
} else {
- RedisDataType.STRING.set(this, keys.get(i), values.get(i),
expireSeconds);
+ RedisDataType.STRING.set(jedis, keys.get(i), values.get(i),
expireSeconds);
}
}
}
@@ -116,9 +126,9 @@ public class RedisClusterClient extends RedisClient {
int size = keys.size();
for (int i = 0; i < size; i++) {
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
- RedisDataType.LIST.del(this, keys.get(i), values.get(i));
+ RedisDataType.LIST.del(jedis, keys.get(i), values.get(i));
} else {
- RedisDataType.LIST.set(this, keys.get(i), values.get(i),
expireSeconds);
+ RedisDataType.LIST.set(jedis, keys.get(i), values.get(i),
expireSeconds);
}
}
}
@@ -129,9 +139,9 @@ public class RedisClusterClient extends RedisClient {
int size = keys.size();
for (int i = 0; i < size; i++) {
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
- RedisDataType.SET.del(this, keys.get(i), values.get(i));
+ RedisDataType.SET.del(jedis, keys.get(i), values.get(i));
} else {
- RedisDataType.SET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ RedisDataType.SET.set(jedis, keys.get(i), values.get(i),
expireSeconds);
}
}
}
@@ -142,9 +152,9 @@ public class RedisClusterClient extends RedisClient {
int size = keys.size();
for (int i = 0; i < size; i++) {
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
- RedisDataType.HASH.del(this, keys.get(i), values.get(i));
+ RedisDataType.HASH.del(jedis, keys.get(i), values.get(i));
} else {
- RedisDataType.HASH.set(this, keys.get(i), values.get(i),
expireSeconds);
+ RedisDataType.HASH.set(jedis, keys.get(i), values.get(i),
expireSeconds);
}
}
}
@@ -155,10 +165,66 @@ public class RedisClusterClient extends RedisClient {
int size = keys.size();
for (int i = 0; i < size; i++) {
if (rowKinds.get(i) == RowKind.DELETE || rowKinds.get(i) ==
RowKind.UPDATE_BEFORE) {
- RedisDataType.ZSET.del(this, keys.get(i), values.get(i));
+ RedisDataType.ZSET.del(jedis, keys.get(i), values.get(i));
+ } else {
+ RedisDataType.ZSET.set(jedis, keys.get(i), values.get(i),
expireSeconds);
+ }
+ }
+ }
+
+ /** In cluster mode, traverse and scan each node key */
+ @Override
+ public ScanResult<String> scanKeyResult(
+ final String cursor, final ScanParams params, final RedisDataType
type) {
+ // Create a composite cursor to traverse the cluster nodes
+ // the format is "Node Index:Node cursor"
+ int nodeIndex = 0;
+ String nodeCursor = cursor;
+ boolean isFirstScan = !cursor.contains(":");
+
+ if (!ScanParams.SCAN_POINTER_START.equals(cursor) &&
cursor.contains(":")) {
+ String[] parts = cursor.split(":", 2);
+ nodeIndex = Integer.parseInt(parts[0]);
+ nodeCursor = parts[1];
+ }
+
+ // All nodes have been scanned
+ if (nodeIndex >= nodes.size()) {
+ return new ScanResult<>(ScanParams.SCAN_POINTER_START, new
ArrayList<>());
+ }
+
+ List<String> resultKeys;
+ String nextCursor;
+
+ Map.Entry<String, ConnectionPool> connectionPoolEntry =
nodes.get(nodeIndex);
+ Jedis jedis = jedisWrapper.getJedis(connectionPoolEntry.getKey());
+
+ // Perform the scan operation
+ ScanResult<String> scanResult;
+ if (type != null) {
+ // redis 7
+ scanResult = jedis.scan(nodeCursor, params, type.name());
+ } else {
+ // redis 5
+ scanResult = jedis.scan(nodeCursor, params);
+ }
+
+ resultKeys = new ArrayList<>(scanResult.getResult());
+
+ // Generate the next cursor
+ if (!isFirstScan &&
ScanParams.SCAN_POINTER_START.equals(scanResult.getCursor())) {
+ // The current node scan has been completed. Move to the next node
+ nodeIndex++;
+ if (nodeIndex < nodes.size()) {
+ nextCursor = nodeIndex + ":" + ScanParams.SCAN_POINTER_START;
} else {
- RedisDataType.ZSET.set(this, keys.get(i), values.get(i),
expireSeconds);
+ nextCursor = ScanParams.SCAN_POINTER_START;
}
+ } else {
+ // The current node has not been fully scanned. Update the
composite cursor
+ nextCursor = nodeIndex + ":" + scanResult.getCursor();
}
+
+ return new ScanResult<>(nextCursor, resultKeys);
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
index 07efefe337..c9148fc46f 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/client/RedisSingleClient.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.client;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
@@ -29,6 +30,8 @@ import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.exceptions.JedisException;
+import redis.clients.jedis.params.ScanParams;
+import redis.clients.jedis.resps.ScanResult;
import java.util.ArrayList;
import java.util.List;
@@ -260,6 +263,19 @@ public class RedisSingleClient extends RedisClient {
processResponses(responses);
}
+ @Override
+ public ScanResult<String> scanKeyResult(
+ String cursor, ScanParams scanParams, RedisDataType type) {
+
+ if (type == null) {
+ // redis 5
+ return jedis.scan(cursor, scanParams);
+ } else {
+ // redis 7
+ return jedis.scan(cursor, scanParams, type.name());
+ }
+ }
+
private void processResponses(List<Response<?>> responseList) {
try {
for (Response<?> response : responseList) {
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
index 8348f3561f..cac0441b16 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java
@@ -17,16 +17,26 @@
package org.apache.seatunnel.connectors.seatunnel.redis.config;
+import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode;
+
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.ConnectionPool;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_INFO_ERROR;
+@Slf4j
public class JedisWrapper extends Jedis {
private final JedisCluster jedisCluster;
+ private final Map<String, Jedis> jedisPoolMap = new ConcurrentHashMap<>();
public JedisWrapper(@NonNull JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
@@ -82,8 +92,77 @@ public class JedisWrapper extends Jedis {
return jedisCluster.zrange(key, start, stop);
}
+ @Override
+ public String info() {
+ Map<String, ConnectionPool> nodes = jedisCluster.getClusterNodes();
+ if (nodes.isEmpty()) {
+ throw new RedisConnectorException(
+ GET_REDIS_INFO_ERROR, "No available nodes in cluster");
+ }
+
+ // Traverse all nodes and try to obtain the info
+ for (Map.Entry<String, ConnectionPool> entry : nodes.entrySet()) {
+ try {
+ Jedis jedis = getJedis(entry.getKey());
+ return jedis.info();
+ } catch (Exception e) {
+ log.warn("Failed to get info from node: {}", entry.getKey(),
e);
+ }
+ }
+
+ throw new RedisConnectorException(
+ GET_REDIS_INFO_ERROR, "Failed to get redis info from all node
in cluster");
+ }
+
+ @Override
+ public String type(String key) {
+ return jedisCluster.type(key);
+ }
+
+ public Map<String, ConnectionPool> getClusterNodes() {
+ return jedisCluster.getClusterNodes();
+ }
+
+ @Override
+ public long expire(final String key, final long seconds) {
+ return jedisCluster.expire(key, seconds);
+ }
+
@Override
public void close() {
jedisCluster.close();
+ jedisPoolMap.values().forEach(Jedis::close);
+ jedisPoolMap.clear();
+ }
+
+ public Jedis getJedis(String node) {
+ Jedis jedis = jedisPoolMap.get(node);
+ if (jedis != null) {
+ return jedis;
+ }
+
+ // Lazy initialization
+ Map<String, ConnectionPool> clusterNodes =
jedisCluster.getClusterNodes();
+ ConnectionPool connectionPool = clusterNodes.get(node);
+ if (connectionPool == null) {
+ throw new RedisConnectorException(
+ RedisErrorCode.REDIS_CONNECTION_ERROR, "Node not found in
cluster: " + node);
+ }
+
+ return getOrCreateJedis(node, connectionPool);
+ }
+
+ private Jedis getOrCreateJedis(String node, ConnectionPool connectionPool)
{
+ return jedisPoolMap.computeIfAbsent(
+ node,
+ k -> {
+ try {
+ return new Jedis(connectionPool.getResource());
+ } catch (Exception e) {
+ throw new RedisConnectorException(
+ RedisErrorCode.REDIS_CONNECTION_ERROR,
+ "Redis connection error. node: " + node);
+ }
+ });
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
index 7f398c9bcd..2694c61b9d 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisBaseOptions.java
@@ -109,8 +109,15 @@ public class RedisBaseOptions {
"batch_size is used to control the size of a batch
of data during read and write operations"
+ ",default 10");
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field_delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription(
+ "The separator between columns in a row of data.
Only needed by `text` file format. default is ','");
+
public enum Format {
JSON,
- // TEXT will be supported later
+ TEXT,
}
}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
index 5f4fee58b9..0ca8d0bfb3 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java
@@ -26,8 +26,6 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClusterClient
import
org.apache.seatunnel.connectors.seatunnel.redis.client.RedisSingleClient;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
-import org.apache.logging.log4j.core.util.Assert;
-
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.ConnectionPoolConfig;
@@ -42,6 +40,7 @@ import java.util.List;
import static
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.GET_REDIS_VERSION_INFO_FAILED;
import static
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.INVALID_CONFIG;
+import static
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisErrorCode.REDIS_NODE_EMPTY_ERROR;
@Data
@Slf4j
@@ -66,6 +65,8 @@ public class RedisParameters implements Serializable {
private String valueField;
private String hashKeyField;
private String hashValueField;
+ private String fieldDelimiter;
+ private RedisBaseOptions.Format format;
private int redisVersion;
@@ -138,6 +139,12 @@ public class RedisParameters implements Serializable {
if (config.getOptional(RedisSinkOptions.HASH_VALUE_FIELD).isPresent())
{
this.hashValueField =
config.get(RedisSinkOptions.HASH_VALUE_FIELD);
}
+
+ // set format, default json
+ this.format = config.get(RedisBaseOptions.FORMAT);
+
+ // set field delimiter, only need when format is TEXT
+ this.fieldDelimiter = config.get(RedisBaseOptions.FIELD_DELIMITER);
}
public RedisClient buildRedisClient() {
@@ -192,7 +199,10 @@ public class RedisParameters implements Serializable {
return jedis;
case CLUSTER:
HashSet<HostAndPort> nodes = new HashSet<>();
- Assert.requireNonEmpty(redisNodes, "nodes parameter must not
be empty");
+ if (redisNodes.isEmpty()) {
+ throw new RedisConnectorException(
+ REDIS_NODE_EMPTY_ERROR, "Redis nodes parameter
must not be empty");
+ }
for (String redisNode : redisNodes) {
String[] splits = redisNode.split(":");
if (splits.length != 2) {
@@ -220,7 +230,6 @@ public class RedisParameters implements Serializable {
jedisCluster = new JedisCluster(nodes);
}
JedisWrapper jedisWrapper = new JedisWrapper(jedisCluster);
- jedisWrapper.select(dbNum);
return jedisWrapper;
default:
// do nothing
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
index a82a1425de..076a1a69c2 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/exception/RedisErrorCode.java
@@ -21,7 +21,10 @@ import
org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum RedisErrorCode implements SeaTunnelErrorCode {
GET_REDIS_VERSION_INFO_FAILED("RedisErrorCode-01", "Failed to get the
redis version"),
INVALID_CONFIG("RedisErrorCode-02", "Invalid redis Config"),
- GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write
response");
+ GET_RESPONSE_FAILED("RedisErrorCode-03", "Failed to get the write
response"),
+ GET_REDIS_INFO_ERROR("RedisErrorCode-04", "Failed to get redis info in
cluster mode."),
+ REDIS_NODE_EMPTY_ERROR("RedisErrorCode-05", "Redis nodes parameter is
empty"),
+ REDIS_CONNECTION_ERROR("RedisErrorCode-06", "Redis connection error");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
index 9207477d84..0adab4300f 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriter.java
@@ -19,19 +19,26 @@ package
org.apache.seatunnel.connectors.seatunnel.redis.sink;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.PlaceholderUtils;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,12 +47,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+@Slf4j
public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
- private static final String REDIS_GROUP_DELIMITER = ":";
- private static final String LEFT_PLACEHOLDER_MARKER = "{";
- private static final String RIGHT_PLACEHOLDER_MARKER = "}";
+ private static final Pattern LEGACY_PLACEHOLDER_PATTERN =
+ Pattern.compile("(?<!\\$)\\{([^}]+)\\}");
+ private static final Pattern PLACEHOLDER_PATTERN =
Pattern.compile("\\$\\{([^}]+)\\}");
private final SeaTunnelRowType seaTunnelRowType;
private final RedisParameters redisParameters;
private final SerializationSchema serializationSchema;
@@ -60,9 +70,7 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters
redisParameters) {
this.seaTunnelRowType = seaTunnelRowType;
this.redisParameters = redisParameters;
- // TODO according to format to initialize serializationSchema
- // Now temporary using json serializationSchema
- this.serializationSchema = new
JsonSerializationSchema(seaTunnelRowType);
+ this.serializationSchema = createSerializationSchema(redisParameters,
seaTunnelRowType);
this.redisClient = redisParameters.buildRedisClient();
this.batchSize = redisParameters.getBatchSize();
this.rowKinds = new ArrayList<>(batchSize);
@@ -81,6 +89,8 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
if (keyBuffer.size() >= batchSize) {
flush();
}
+
+ log.debug("write redis key: {}, value: {}, rowKind: {}", key, value,
element.getRowKind());
}
private String getKey(SeaTunnelRow element, List<String> fields) {
@@ -101,28 +111,40 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
}
}
- private String getCustomKey(SeaTunnelRow element, List<String> fields,
String keyField) {
- String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
- StringBuilder key = new StringBuilder();
- for (int i = 0; i < keyFieldSegments.length; i++) {
- String keyFieldSegment = keyFieldSegments[i];
- if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER)
- && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
- String realKeyField = keyFieldSegment.substring(1,
keyFieldSegment.length() - 1);
- if (fields.contains(realKeyField)) {
- Object realFieldValue =
element.getField(fields.indexOf(realKeyField));
- key.append(realFieldValue == null ? "" :
realFieldValue.toString());
- } else {
- key.append(keyFieldSegment);
- }
- } else {
- key.append(keyFieldSegment);
- }
- if (i != keyFieldSegments.length - 1) {
- key.append(REDIS_GROUP_DELIMITER);
- }
+ protected String getCustomKey(SeaTunnelRow element, List<String> fields,
String keyField) {
+ // First, detect and convert the old format placeholders to the new
format
+ String normalizedKeyField = normalizePlaceholders(keyField);
+
+ Matcher matcher = PLACEHOLDER_PATTERN.matcher(normalizedKeyField);
+
+ Map<String, String> placeholderValues = new HashMap<>();
+
+ while (matcher.find()) {
+ String fieldName = matcher.group(1);
+ String fieldValue = getFieldValue(element, fields, fieldName);
+ placeholderValues.put(fieldName, fieldValue);
+ }
+
+ return placeholderValues.keySet().stream()
+ .reduce(
+ normalizedKeyField,
+ (result, placeholderName) -> {
+ return PlaceholderUtils.replacePlaceholders(
+ result,
+ placeholderName,
+ placeholderValues.get(placeholderName),
+ null);
+ });
+ }
+
+ private String getFieldValue(SeaTunnelRow element, List<String> fields,
String fieldName) {
+ if (fields.contains(fieldName)) {
+ Object fieldValue = element.getField(fields.indexOf(fieldName));
+ return fieldValue == null ? "" : fieldValue.toString();
+ } else {
+ // If the field does not exist, return the original field name
+ return fieldName;
}
- return key.toString();
}
private String getValue(SeaTunnelRow element, List<String> fields) {
@@ -219,6 +241,45 @@ public class RedisSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void>
"UnSupport redisDataType,only support
string,list,hash,set,zset");
}
+ private SerializationSchema createSerializationSchema(
+ RedisParameters redisParameters, SeaTunnelRowType rowType) {
+
+ RedisBaseOptions.Format format = redisParameters.getFormat();
+
+ switch (format) {
+ case JSON:
+ return new JsonSerializationSchema(rowType);
+ case TEXT:
+ String fieldDelimiter = redisParameters.getFieldDelimiter();
+ return TextSerializationSchema.builder()
+ .seaTunnelRowType(rowType)
+ .delimiter(fieldDelimiter)
+ .build();
+ default:
+ throw new RedisConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ RedisBaseOptions.CONNECTOR_IDENTITY,
+ PluginType.SINK,
+ "Unsupported format: " + format));
+ }
+ }
+
+ private String normalizePlaceholders(String input) {
+ if (input == null) {
+ return input;
+ }
+
+ Matcher legacyMatcher = LEGACY_PLACEHOLDER_PATTERN.matcher(input);
+ if (legacyMatcher.find()) {
+ // Convert legacy format {fieldName} to ${fieldName}
+ return legacyMatcher.replaceAll("\\$\\{$1\\}");
+ }
+
+ return input;
+ }
+
@Override
public void close() throws IOException {
flush();
diff --git
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
index 27deb707d8..caf972b211 100644
---
a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
+++
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSource.java
@@ -21,7 +21,7 @@ import
org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
+import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import
org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
import java.util.List;
@@ -54,25 +55,42 @@ public class RedisSource extends
AbstractSingleSplitSource<SeaTunnelRow> {
public RedisSource(ReadonlyConfig readonlyConfig) {
this.redisParameters.buildWithConfig(readonlyConfig);
+
+ createCatalogTableAndDeserializationSchema(readonlyConfig);
+ }
+
+ private void createCatalogTableAndDeserializationSchema(ReadonlyConfig
readonlyConfig) {
// TODO: use format SPI
// default use json format
- if (readonlyConfig.getOptional(RedisBaseOptions.FORMAT).isPresent()) {
- if
(!readonlyConfig.getOptional(SinkConnectorCommonOptions.SCHEMA).isPresent()) {
- throw new RedisConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(),
- PluginType.SOURCE,
- "Must config schema when format parameter been
config"));
- }
+ RedisBaseOptions.Format format =
readonlyConfig.get(RedisBaseOptions.FORMAT);
+
+ // if config schema, create deserialization schema and catalog table
by config
+ // else create catalog with simple text
+ if
(readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
+ this.catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
- RedisBaseOptions.Format format =
readonlyConfig.get(RedisBaseOptions.FORMAT);
- if (RedisBaseOptions.Format.JSON.equals(format)) {
- this.catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
- this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
- this.deserializationSchema =
- new JsonDeserializationSchema(catalogTable, false,
false);
+ switch (format) {
+ case JSON:
+ this.deserializationSchema =
+ new JsonDeserializationSchema(catalogTable, false,
false);
+ break;
+ case TEXT:
+ String fieldDelimiter =
readonlyConfig.get(RedisBaseOptions.FIELD_DELIMITER);
+ this.deserializationSchema =
+ TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(fieldDelimiter)
+ .build();
+ break;
+ default:
+ throw new RedisConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message:
%s",
+ getPluginName(),
+ PluginType.SOURCE,
+ "Unsupported format: " + format));
}
} else {
this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
diff --git
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriterTest.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriterTest.java
new file mode 100644
index 0000000000..b3f2259189
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkWriterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.redis.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+
+import static org.mockito.Mockito.when;
+
+public class RedisSinkWriterTest {
+
+ private RedisClient mockRedisClient;
+
+ private RedisParameters mockRedisParameters;
+
+ private SeaTunnelRowType rowType;
+ private RedisSinkWriter redisSinkWriter;
+
+ @BeforeEach
+ void setUp() {
+ rowType =
+ new SeaTunnelRowType(
+ new String[] {"id", "name", "age", "email"},
+ new SeaTunnelDataType<?>[] {
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.STRING_TYPE
+ });
+
+ mockRedisParameters = Mockito.mock(RedisParameters.class);
+ mockRedisClient = Mockito.mock(RedisClient.class);
+
+
when(mockRedisParameters.buildRedisClient()).thenReturn(mockRedisClient);
+ when(mockRedisParameters.getBatchSize()).thenReturn(3);
+
when(mockRedisParameters.getFormat()).thenReturn(RedisBaseOptions.Format.JSON);
+ when(mockRedisParameters.getFieldDelimiter()).thenReturn(",");
+ }
+
+ @Test
+ void testGetCustomKey() {
+ // Set custom key mode
+
when(mockRedisParameters.getKeyField()).thenReturn("user:${id}:profile");
+ when(mockRedisParameters.getSupportCustomKey()).thenReturn(true);
+
when(mockRedisParameters.getRedisDataType()).thenReturn(RedisDataType.STRING);
+ when(mockRedisParameters.getExpire()).thenReturn(3600L);
+
+ redisSinkWriter = new RedisSinkWriter(rowType, mockRedisParameters);
+
+ // create test data
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "Alice", 25,
"[email protected]"});
+ row.setRowKind(RowKind.INSERT);
+
+ String customKey =
+ redisSinkWriter.getCustomKey(
+ row,
+ Arrays.asList(rowType.getFieldNames()),
+ mockRedisParameters.getKeyField());
+
+ Assertions.assertEquals("user:1:profile", customKey);
+ }
+
+ @Test
+ public void testLegacyCustomKey() {
+
when(mockRedisParameters.getKeyField()).thenReturn("user:{id}:profile");
+
+ when(mockRedisParameters.getSupportCustomKey()).thenReturn(true);
+
when(mockRedisParameters.getRedisDataType()).thenReturn(RedisDataType.STRING);
+ when(mockRedisParameters.getExpire()).thenReturn(3600L);
+
+ redisSinkWriter = new RedisSinkWriter(rowType, mockRedisParameters);
+
+ // create test data
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "Alice", 25,
"[email protected]"});
+ row.setRowKind(RowKind.INSERT);
+
+ String customKey =
+ redisSinkWriter.getCustomKey(
+ row,
+ Arrays.asList(rowType.getFieldNames()),
+ mockRedisParameters.getKeyField());
+
+ Assertions.assertEquals("user:1:profile", customKey);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
new file mode 100644
index 0000000000..f57214ae0c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.redis;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.lang3.tuple.Pair;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.ConnectionPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+@Slf4j
+public class RedisClusterIT extends TestSuiteBase implements TestResource {
+
+ private static final int REDIS_CLUSTER_SIZE = 3;
+
+ private GenericContainer<?>[] redisClusterNodes;
+ private JedisCluster jedisCluster;
+
+ private RedisContainerInfo redisContainerInfo =
+ new RedisContainerInfo("redis-cluster-e2e", 6379, "SeaTunnel",
"redis:7");
+
+ private static final int[] REDIS_PORTS = {6379, 6380, 6381};
+ private static final int[] REDIS_BUS_PORTS = {16379, 16380, 16381};
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ setupRedisContainer();
+ createRedisCluster();
+ waitForRedisClusterReady();
+ initJedisCluster();
+ initSourceData();
+ }
+
+ private void setupRedisContainer() {
+ redisClusterNodes = new GenericContainer[REDIS_CLUSTER_SIZE];
+
+ for (int i = 0; i < REDIS_CLUSTER_SIZE; i++) {
+ String nodeName = "redis-cluster-" + (i + 1);
+ int redisPort = REDIS_PORTS[i];
+ int busPort = REDIS_BUS_PORTS[i];
+
+ // Get the host machine's IP address
+ String hostIp = getHostIpAddress();
+ String redisCommand =
+ String.format(
+ "redis-server --cluster-enabled yes --port %d
--protected-mode no "
+ + "--bind 0.0.0.0 --cluster-announce-ip %s
--cluster-announce-port %d "
+ + "--cluster-announce-bus-port %d
--requirepass %s",
+ redisPort,
+ hostIp,
+ redisPort,
+ busPort,
+ redisContainerInfo.getPassword());
+
+ redisClusterNodes[i] =
+ new
GenericContainer<>(DockerImageName.parse(redisContainerInfo.getImageName()))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(nodeName)
+ .withExposedPorts(redisPort, busPort)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+
redisContainerInfo.getImageName())))
+ .withCommand("sh", "-c", redisCommand)
+ .waitingFor(
+ new HostPortWaitStrategy()
+
.withStartupTimeout(Duration.ofMinutes(2)));
+
+ // Set the fixed port mapping
+ redisClusterNodes[i].setPortBindings(
+ Arrays.asList(redisPort + ":" + redisPort, busPort + ":" +
busPort));
+ }
+
+ Startables.deepStart(Stream.of(redisClusterNodes)).join();
+ log.info("Redis cluster nodes started with ports: {}",
Arrays.toString(REDIS_PORTS));
+ }
+
+ private void createRedisCluster() {
+ try {
+ String hostIp = getHostIpAddress();
+ StringBuilder clusterCreateCmd =
+ new StringBuilder(
+ "redis-cli --cluster create --cluster-replicas 0
--cluster-yes ");
+
+ for (int port : REDIS_PORTS) {
+
clusterCreateCmd.append(hostIp).append(":").append(port).append(" ");
+ }
+
+ clusterCreateCmd.append("-a
").append(redisContainerInfo.getPassword());
+
+ log.info("Creating cluster with command: {}", clusterCreateCmd);
+
+ Container.ExecResult result =
+ redisClusterNodes[0].execInContainer("sh", "-c",
clusterCreateCmd.toString());
+
+ // Wait for the cluster to be created
+ Thread.sleep(5000);
+
+ if (result.getExitCode() != 0) {
+ throw new RuntimeException("Failed to create Redis cluster: "
+ result.getStderr());
+ }
+
+ log.info("Redis cluster created successfully");
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating Redis cluster", e);
+ }
+ }
+
+ private void waitForRedisClusterReady() {
+ log.info("Waiting for Redis cluster to be ready...");
+
+ int maxRetries = 10;
+ int retryCount = 0;
+
+ while (retryCount < maxRetries) {
+ try {
+ boolean allReady = true;
+
+ for (int i = 0; i < REDIS_CLUSTER_SIZE; i++) {
+ Container.ExecResult result =
+ redisClusterNodes[i].execInContainer(
+ "redis-cli",
+ "-p",
+ String.valueOf(REDIS_PORTS[i]),
+ "-a",
+ redisContainerInfo.getPassword(),
+ "ping");
+
+ if (!"PONG".equals(result.getStdout().trim())) {
+ allReady = false;
+ break;
+ }
+ }
+
+ if (allReady) {
+ log.info("All Redis nodes are ready after {} attempts",
retryCount + 1);
+ return;
+ }
+
+ } catch (Exception e) {
+ log.debug(
+ "Redis readiness check failed, attempt {}: {}",
+ retryCount + 1,
+ e.getMessage());
+ }
+
+ retryCount++;
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ throw new RuntimeException("Redis cluster failed to become ready
within timeout");
+ }
+
+ private void initJedisCluster() {
+ Set<HostAndPort> jedisClusterNodes = new HashSet<>();
+
+ String hostIp = getHostIpAddress();
+ for (int port : REDIS_PORTS) {
+ jedisClusterNodes.add(new HostAndPort(hostIp, port));
+ }
+
+ ConnectionPoolConfig poolConfig = new ConnectionPoolConfig();
+
+ try {
+ this.jedisCluster =
+ new JedisCluster(
+ jedisClusterNodes,
+ 10000,
+ 10000,
+ 3,
+ redisContainerInfo.getPassword(),
+ poolConfig);
+
+ log.info("JedisCluster initialized successfully");
+
+ } catch (Exception e) {
+ log.error("Failed to create JedisCluster", e);
+ throw e;
+ }
+ }
+
+ private void initSourceData() {
+ JsonSerializationSchema jsonSerializationSchema =
+ new JsonSerializationSchema(generateTestDataSet().getKey());
+ List<SeaTunnelRow> rows = generateTestDataSet().getValue();
+
+ for (int i = 0; i < rows.size(); i++) {
+ jedisCluster.set(
+ "key_test" + i, new
String(jsonSerializationSchema.serialize(rows.get(i))));
+ }
+
+ log.info("Initialized {} test records in Redis cluster", rows.size());
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (jedisCluster != null) {
+ try {
+ jedisCluster.close();
+
+ log.info("JedisCluster closed successfully");
+ } catch (Exception e) {
+ log.warn("Error closing JedisCluster", e);
+ }
+ }
+
+ if (redisClusterNodes != null) {
+ for (GenericContainer<?> container : redisClusterNodes) {
+ if (container != null) {
+ try {
+ container.close();
+ } catch (Exception e) {
+ log.warn("Error stopping container", e);
+ }
+ }
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testRedisClusterScan(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
+ container.executeJob("/cluster-redis-to-redis-scan.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long listLength = jedisCluster.llen("key_list");
+ Assertions.assertEquals(100, listLength);
+ } finally {
+ jedisCluster.del("key_list");
+ Assertions.assertEquals(0, jedisCluster.llen("key_list"));
+ }
+ }
+
+ @TestTemplate
+ public void testRedisClusterCustomValueWithKeyType(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
+
container.executeJob("/cluster-redis-to-redis-type-key.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ int count = 0;
+ for (int i = 0; i < 100; i++) {
+ String data = jedisCluster.get("cluster-key-value-check-" + i);
+ if (data != null) {
+ Assertions.assertEquals("string", data);
+ count++;
+ }
+ }
+ Assertions.assertEquals(100, count);
+ } finally {
+ for (int i = 0; i < 100; i++) {
+ jedisCluster.del("cluster-key-value-check-" + i);
+ }
+ }
+ }
+
+ @TestTemplate
+ public void testRedisClusterCustomValueWithSetType(TestContainer container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
+
container.executeJob("/cluster-redis-to-redis-type-set.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long amount = jedisCluster.scard("cluster-set-value-check");
+ Assertions.assertEquals(100, amount);
+ } finally {
+ jedisCluster.del("cluster-set-value-check");
+ }
+ }
+
+ @TestTemplate
+ public void testRedisClusterCustomValueWithListType(TestContainer
container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
+
container.executeJob("/cluster-redis-to-redis-type-list.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long amount = jedisCluster.llen("cluster-list-value-check");
+ Assertions.assertEquals(100, amount);
+ } finally {
+ jedisCluster.del("cluster-list-value-check");
+ }
+ }
+
+ @TestTemplate
+ public void testRedisClusterCustomValueWithZSetType(TestContainer
container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
+
container.executeJob("/cluster-redis-to-redis-type-zset.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long amount = jedisCluster.zcard("cluster-zset-value-check");
+ Assertions.assertEquals(100, amount);
+ } finally {
+ jedisCluster.del("cluster-zset-value-check");
+ }
+ }
+
+ @TestTemplate
+ public void testRedisClusterCustomValueWithHashType(TestContainer
container)
+ throws IOException, InterruptedException {
+ try {
+ Container.ExecResult execResult =
+
container.executeJob("/cluster-redis-to-redis-type-hash.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ long amount = jedisCluster.hlen("cluster-hash-value-check");
+ Assertions.assertEquals(100, amount);
+ for (int i = 0; i < 100; i++) {
+ Assertions.assertEquals(
+ "string",
jedisCluster.hget("cluster-hash-value-check", String.valueOf(i)));
+ }
+ } finally {
+ jedisCluster.del("cluster-hash-value-check");
+ }
+ }
+
+ protected Pair<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet()
{
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.LONG_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ Long.valueOf(i),
+ Collections.singletonMap("key",
Short.parseShort("1")),
+ new Byte[] {Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ rows.add(row);
+ }
+ return Pair.of(rowType, rows);
+ }
+
+ private String getHostIpAddress() {
+ String ip = "";
+ try {
+ Enumeration<NetworkInterface> networkInterfaces =
+ NetworkInterface.getNetworkInterfaces();
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface =
networkInterfaces.nextElement();
+ Enumeration<InetAddress> inetAddresses =
networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ if (!inetAddress.isLoopbackAddress() && inetAddress
instanceof Inet4Address) {
+ ip = inetAddress.getHostAddress();
+ }
+ }
+ }
+ } catch (SocketException ex) {
+ ex.printStackTrace();
+ }
+ return ip;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-scan.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-scan.conf
new file mode 100644
index 0000000000..71bdf85b9f
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-scan.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = string
+ batch_size = 33
+ }
+}
+
+sink {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ key = "key_list"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-hash.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-hash.conf
new file mode 100644
index 0000000000..17bc037794
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-hash.conf
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = key
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ key = "cluster-hash-value-check"
+ hash_key_field = "id"
+ hash_value_field = "c_string"
+ data_type = hash
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-key.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-key.conf
new file mode 100644
index 0000000000..efce13dbb1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-key.conf
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = key
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ key = "cluster-key-value-check-${id}"
+ support_custom_key = true
+ value_field = "c_string"
+ data_type = key
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-list.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-list.conf
new file mode 100644
index 0000000000..b55a5f925b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-list.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = key
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ key = "cluster-list-value-check"
+ value_field = "id"
+ data_type = list
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-set.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-set.conf
new file mode 100644
index 0000000000..9eceea2f8b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-set.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = key
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ key = "cluster-set-value-check"
+ value_field = "id"
+ data_type = set
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-zset.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-zset.conf
new file mode 100644
index 0000000000..fbdefbb458
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/cluster-redis-to-redis-type-zset.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ keys = "key_test*"
+ data_type = key
+ batch_size = 33
+ format = "json"
+ schema = {
+ table = "RedisDatabase.RedisTable"
+ columns = [
+ {
+ name = "id"
+ type = "bigint"
+ },
+ {
+ name = "c_map"
+ type = "map<string, smallint>"
+ },
+ {
+ name = "c_array"
+ type = "array<tinyint>"
+ },
+ {
+ name = "c_string"
+ type = "string"
+ },
+ {
+ name = "c_boolean"
+ type = "boolean"
+ },
+ {
+ name = "c_tinyint"
+ type = "tinyint"
+ },
+ {
+ name = "c_smallint"
+ type = "smallint"
+ },
+ {
+ name = "c_int"
+ type = "int"
+ },
+ {
+ name = "c_bigint"
+ type = "bigint"
+ },
+ {
+ name = "c_float"
+ type = "float"
+ },
+ {
+ name = "c_double"
+ type = "double"
+ },
+ {
+ name = "c_decimal"
+ type = "decimal(2,1)"
+ },
+ {
+ name = "c_bytes"
+ type = "bytes"
+ },
+ {
+ name = "c_date"
+ type = "date"
+ },
+ {
+ name = "c_timestamp"
+ type = "timestamp"
+ }
+ ]
+ }
+ }
+}
+
+sink {
+ Redis {
+ nodes = ["redis-cluster-0:6379", "redis-cluster-1:6379",
"redis-cluster-2:6379"]
+ mode = "CLUSTER"
+ auth = "SeaTunnel"
+ key = "cluster-zset-value-check"
+ value_field = "id"
+ data_type = zset
+ batch_size = 33
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 2c1ef8076a..207e793cec 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -424,7 +424,9 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
|| s.startsWith("LeaseRenewer")
// The read of hdfs which has the thread that is all in
running status
|| s.startsWith("org.apache.hadoop.hdfs.PeerCache")
- || s.startsWith("java-sdk-progress-listener-callback-thread");
+ || s.startsWith("java-sdk-progress-listener-callback-thread")
+ // redis pool evictor daemon thread
+ || s.startsWith("commons-pool-evictor");
}
private void classLoaderObjectCheck(Integer maxSize) throws IOException,
InterruptedException {