This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3930010386 [Improve][E2E] Improve redis e2e for delete (#10018)
3930010386 is described below

commit 39300103860e0ece4bc97cd2c6fef1092a461d2b
Author: 老王 <[email protected]>
AuthorDate: Tue Dec 2 17:46:58 2025 +0800

    [Improve][E2E] Improve redis e2e for delete (#10018)
---
 seatunnel-connectors-v2/connector-redis/pom.xml    |  14 +
 .../redis/config}/RedisContainerInfo.java          |   5 +-
 .../connectors/seatunnel/redis/Redis5Test.java     |  12 +-
 .../connectors/seatunnel/redis/Redis7Test.java     |  12 +-
 .../seatunnel/redis/RedisTemplateTest.java         | 396 +++++++++++++++++++++
 .../seatunnel/e2e/connector/redis/Redis5IT.java    |   2 +
 .../seatunnel/e2e/connector/redis/Redis7IT.java    |   2 +
 .../e2e/connector/redis/RedisClusterIT.java        |   1 +
 .../e2e/connector/redis/RedisMasterAndSlaveIT.java |   1 +
 .../connector/redis/RedisTestCaseTemplateIT.java   |  77 +---
 .../resources/fake-to-redis-test-delete-key.conf   |  87 -----
 .../resources/fake-to-redis-test-delete-list.conf  |  86 -----
 .../resources/fake-to-redis-test-delete-set.conf   |  86 -----
 .../resources/fake-to-redis-test-delete-zset.conf  |  86 -----
 14 files changed, 444 insertions(+), 423 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-redis/pom.xml 
b/seatunnel-connectors-v2/connector-redis/pom.xml
index 755d1fb509..eb260775c0 100644
--- a/seatunnel-connectors-v2/connector-redis/pom.xml
+++ b/seatunnel-connectors-v2/connector-redis/pom.xml
@@ -59,6 +59,20 @@
             <version>${jedis.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisContainerInfo.java
similarity index 89%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
rename to 
seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisContainerInfo.java
index 61b55a6594..f06def7dce 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisContainerInfo.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisContainerInfo.java
@@ -14,8 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.e2e.connector.redis;
+package org.apache.seatunnel.connectors.seatunnel.redis.config;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
+@VisibleForTesting
 public class RedisContainerInfo {
     private final String host;
     private final int port;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis5Test.java
similarity index 69%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
copy to 
seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis5Test.java
index 6df6e80d6e..1601f20191 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis5Test.java
@@ -14,9 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.e2e.connector.redis;
+package org.apache.seatunnel.connectors.seatunnel.redis;
 
-public class Redis5IT extends RedisTestCaseTemplateIT {
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+@DisabledOnOs(
+        value = OS.WINDOWS,
+        disabledReason = "There is no docker environment on the windows test 
system")
+public class Redis5Test extends RedisTemplateTest {
 
     @Override
     public RedisContainerInfo getRedisContainerInfo() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis7Test.java
similarity index 69%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
copy to 
seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis7Test.java
index dfa46e886a..a2f008e799 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/Redis7Test.java
@@ -14,9 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.e2e.connector.redis;
+package org.apache.seatunnel.connectors.seatunnel.redis;
 
-public class Redis7IT extends RedisTestCaseTemplateIT {
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+@DisabledOnOs(
+        value = OS.WINDOWS,
+        disabledReason = "There is no docker environment on the windows test 
system")
+public class Redis7Test extends RedisTemplateTest {
 
     @Override
     public RedisContainerInfo getRedisContainerInfo() {
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
new file mode 100644
index 0000000000..b7e03962c5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.redis;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY;
+
+@Slf4j
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class RedisTemplateTest {
+
+    protected String host;
+    protected int port;
+    protected String password;
+    protected String imageName;
+    protected Jedis jedis;
+    protected GenericContainer<?> redisContainer;
+
+    @BeforeAll
+    public void startUp() {
+        initContainerInfo();
+        Network NETWORK =
+                Network.builder()
+                        .createNetworkCmdModifier(
+                                cmd -> cmd.withName("SEATUNNEL-" + 
UUID.randomUUID()))
+                        .enableIpv6(false)
+                        .build();
+
+        this.redisContainer =
+                new GenericContainer<>(DockerImageName.parse(imageName))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(host)
+                        .withExposedPorts(port)
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName)))
+                        .withCommand(String.format("redis-server --requirepass 
%s", password))
+                        .waitingFor(
+                                new HostPortWaitStrategy()
+                                        
.withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(redisContainer)).join();
+        log.info("Redis container started");
+        this.initJedis();
+        this.initSourceData();
+    }
+
+    protected void initSourceData() {}
+
+    protected abstract RedisContainerInfo getRedisContainerInfo();
+
+    private void initJedis() {
+        Jedis jedis = new Jedis(redisContainer.getHost(), 
redisContainer.getFirstMappedPort());
+        jedis.auth(password);
+        jedis.ping();
+        this.jedis = jedis;
+    }
+
+    protected void initContainerInfo() {
+        RedisContainerInfo redisContainerInfo = getRedisContainerInfo();
+        this.host = redisContainerInfo.getHost();
+        this.port = redisContainerInfo.getPort();
+        this.password = redisContainerInfo.getPassword();
+        this.imageName = redisContainerInfo.getImageName();
+    }
+
+    @AfterAll
+    public void tearDown() {
+        if (Objects.nonNull(jedis)) {
+            jedis.close();
+        }
+        redisContainer.close();
+    }
+
+    @Test
+    public void testFakeToRedisDeleteHashTest() throws IOException {
+        String key = "hash_check";
+        Map<String, Object> otherParams = new HashMap<>();
+        otherParams.put("hash_key_field", "id");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
+                new RedisSinkFactory(),
+                Arrays.asList(
+                        getSeaTunnelRowInsert1(),
+                        getSeaTunnelRowInsert2(),
+                        getSeaTunnelRowInsert3(),
+                        getSeaTunnelRowUpdateBefore(),
+                        getSeaTunnelRowUpdateAfter(),
+                        getSeaTunnelRowDelete()));
+        Assertions.assertEquals(2, jedis.hlen(key));
+        jedis.del(key);
+    }
+
+    @Test
+    public void testFakeToRedisDeleteKeyTest() throws IOException {
+        String key = "key_check:{id}";
+        Map<String, Object> otherParams = new HashMap<>();
+        otherParams.put("support_custom_key", true);
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
+                new RedisSinkFactory(),
+                Arrays.asList(
+                        getSeaTunnelRowInsert1(),
+                        getSeaTunnelRowInsert2(),
+                        getSeaTunnelRowInsert3(),
+                        getSeaTunnelRowUpdateBefore(),
+                        getSeaTunnelRowUpdateAfter(),
+                        getSeaTunnelRowDelete()));
+        int count = 0;
+        for (int i = 1; i <= 3; i++) {
+            String data = jedis.get("key_check:" + i);
+            if (data != null) {
+                count++;
+            }
+        }
+        Assertions.assertEquals(2, count);
+        for (int i = 1; i <= 3; i++) {
+            jedis.del("key_check:" + i);
+        }
+    }
+
+    @Test
+    public void testFakeToRedisDeleteListTest() throws IOException {
+        String key = "list_check";
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.LIST, key, new 
HashMap<>()),
+                new RedisSinkFactory(),
+                Arrays.asList(
+                        getSeaTunnelRowInsert1(),
+                        getSeaTunnelRowInsert2(),
+                        getSeaTunnelRowInsert3(),
+                        getSeaTunnelRowUpdateBefore(),
+                        getSeaTunnelRowUpdateAfter(),
+                        getSeaTunnelRowDelete()));
+        Assertions.assertEquals(2, jedis.llen(key));
+        jedis.del(key);
+    }
+
+    @Test
+    public void testFakeToRedisDeleteSetTest() throws IOException {
+        String key = "set_check";
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.SET, key, new 
HashMap<>()),
+                new RedisSinkFactory(),
+                Arrays.asList(
+                        getSeaTunnelRowInsert1(),
+                        getSeaTunnelRowInsert2(),
+                        getSeaTunnelRowInsert3(),
+                        getSeaTunnelRowUpdateBefore(),
+                        getSeaTunnelRowUpdateAfter(),
+                        getSeaTunnelRowDelete()));
+        Assertions.assertEquals(2, jedis.scard(key));
+        jedis.del(key);
+    }
+
+    @Test
+    public void testFakeToToRedisDeleteZSetTest() throws IOException {
+        String key = "zset_check";
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.ZSET, key, new 
HashMap<>()),
+                new RedisSinkFactory(),
+                Arrays.asList(
+                        getSeaTunnelRowInsert1(),
+                        getSeaTunnelRowInsert2(),
+                        getSeaTunnelRowInsert3(),
+                        getSeaTunnelRowUpdateBefore(),
+                        getSeaTunnelRowUpdateAfter(),
+                        getSeaTunnelRowDelete()));
+        Assertions.assertEquals(2, jedis.zcard(key));
+        jedis.del(key);
+    }
+
+    private ReadonlyConfig getDefaultReadonlyConfig(
+            RedisDataType dataType, String key, Map<String, Object> 
otherParams) {
+        Map<String, Object> map = new HashMap<>(otherParams);
+        map.put("host", redisContainer.getHost());
+        map.put("port", redisContainer.getFirstMappedPort());
+        map.put("db_num", 0);
+        map.put("auth", password);
+        map.put("key", key);
+        map.put("data_type", dataType.name());
+        map.put("batch_size", 33);
+        return ReadonlyConfig.fromMap(map);
+    }
+
+    private SeaTunnelRow getSeaTunnelRowInsert1() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    1,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private SeaTunnelRow getSeaTunnelRowInsert2() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    2,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private SeaTunnelRow getSeaTunnelRowInsert3() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    3,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+        return seaTunnelRow;
+    }
+
+    private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 2,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+        return seaTunnelRow;
+    }
+
+    private SeaTunnelRow getSeaTunnelRowDelete() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            2,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.DELETE);
+        return seaTunnelRow;
+    }
+
+    private CatalogTable getCatalogTable(Integer dbNum, String key) {
+        return CatalogTable.of(
+                TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
+                getTableSchema(),
+                new HashMap<>(),
+                new ArrayList<>(),
+                "");
+    }
+
+    private TableSchema getTableSchema() {
+        return new TableSchema(getColumns(), null, null);
+    }
+
+    private List<Column> getColumns() {
+        List<Column> columns = new ArrayList<>();
+        columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true, 
"", ""));
+        columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0, 
true, "", ""));
+        columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L, 
0, true, "", ""));
+        columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE, 
64L, 0, true, "", ""));
+        columns.add(
+                new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L, 
1, true, "", ""));
+        columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE, 
0L, 0, true, "", ""));
+        columns.add(
+                new PhysicalColumn(
+                        "val_unixtime_micros",
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                        64L,
+                        6,
+                        true,
+                        "",
+                        ""));
+        return columns;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
index 6df6e80d6e..b20700577e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis5IT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.e2e.connector.redis;
 
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
 public class Redis5IT extends RedisTestCaseTemplateIT {
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
index dfa46e886a..e6fb8a7989 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/Redis7IT.java
@@ -16,6 +16,8 @@
  */
 package org.apache.seatunnel.e2e.connector.redis;
 
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
+
 public class Redis7IT extends RedisTestCaseTemplateIT {
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
index b762ad60dc..fb4139f303 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisClusterIT.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
index 9c84bb3bf8..48ba3163f9 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisMasterAndSlaveIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.seatunnel.e2e.connector.redis;
 
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 9a84ac7734..0124681abb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -35,9 +35,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
-import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -48,7 +47,6 @@ import 
org.apache.seatunnel.format.json.JsonSerializationSchema;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -623,73 +621,6 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         jedis.del("custom-hash-check");
     }
 
-    @Test
-    public void testFakeToRedisDeleteHashTest() throws IOException {
-        String key = "hash_check";
-        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
-                getCatalogTable(0, key),
-                getReadonlyConfig(RedisDataType.HASH, key),
-                new RedisSinkFactory(),
-                Arrays.asList(
-                        getSeaTunnelRowInsert1(),
-                        getSeaTunnelRowInsert2(),
-                        getSeaTunnelRowInsert3(),
-                        getSeaTunnelRowUpdateBefore(),
-                        getSeaTunnelRowUpdateAfter(),
-                        getSeaTunnelRowDelete()));
-        Assertions.assertEquals(2, jedis.hlen(key));
-        jedis.del(key);
-    }
-
-    @TestTemplate
-    public void testFakeToRedisDeleteKeyTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob("/fake-to-redis-test-delete-key.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        int count = 0;
-        for (int i = 1; i <= 3; i++) {
-            String data = jedis.get("key_check:" + i);
-            if (data != null) {
-                count++;
-            }
-        }
-        Assertions.assertEquals(2, count);
-        for (int i = 1; i <= 3; i++) {
-            jedis.del("key_check:" + i);
-        }
-    }
-
-    @TestTemplate
-    public void testFakeToRedisDeleteListTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob("/fake-to-redis-test-delete-list.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.llen("list_check"));
-        jedis.del("list_check");
-    }
-
-    @TestTemplate
-    public void testFakeToRedisDeleteSetTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob("/fake-to-redis-test-delete-set.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.scard("set_check"));
-        jedis.del("set_check");
-    }
-
-    @TestTemplate
-    public void testFakeToToRedisDeleteZSetTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob("/fake-to-redis-test-delete-zset.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.zcard("zset_check"));
-        jedis.del("zset_check");
-    }
-
     @TestTemplate
     @DisabledOnContainer(
             value = {},
@@ -831,15 +762,15 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
 
     public abstract RedisContainerInfo getRedisContainerInfo();
 
-    private ReadonlyConfig getReadonlyConfig(RedisDataType dataType, String 
key) {
-        Map<String, Object> map = new HashMap<>();
+    private ReadonlyConfig getDefaultReadonlyConfig(
+            RedisDataType dataType, String key, Map<String, Object> 
otherParams) {
+        Map<String, Object> map = new HashMap<>(otherParams);
         map.put("host", redisContainer.getHost());
         map.put("port", redisContainer.getFirstMappedPort());
         map.put("db_num", 0);
         map.put("auth", password);
         map.put("key", key);
         map.put("data_type", dataType.name());
-        map.put("hash_key_field", "id");
         map.put("batch_size", 33);
         return ReadonlyConfig.fromMap(map);
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
deleted file mode 100644
index 5be915889e..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-key.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "key_check:{id}"
-    data_type = key
-    support_custom_key = true
-    batch_size = 33
-  }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
deleted file mode 100644
index 55deb18754..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-list.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "list_check"
-    data_type = list
-    batch_size = 33
-  }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
deleted file mode 100644
index bd1c71128e..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-set.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "set_check"
-    data_type = set
-    batch_size = 33
-  }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
deleted file mode 100644
index cf80d3b00c..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-delete-zset.conf
+++ /dev/null
@@ -1,86 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "zset_check"
-    data_type = zset
-    batch_size = 33
-  }
-}
\ No newline at end of file


Reply via email to