This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f3de771e03 [Improve][E2E][connector-redis-e2e] Transfer e2e test cases 
to unit tests (#10160)
f3de771e03 is described below

commit f3de771e033cf1a8c439a295866859aceffac935
Author: 老王 <[email protected]>
AuthorDate: Thu Jan 1 10:06:43 2026 +0800

    [Improve][E2E][connector-redis-e2e] Transfer e2e test cases to unit tests 
(#10160)
---
 .../seatunnel/redis/RedisTemplateTest.java         | 229 +++++++--------------
 .../seatunnel/redis/row/TestForDeleteRows.java     | 149 ++++++++++++++
 .../redis/row/TestKeyOrValueIsNullRows.java        | 149 ++++++++++++++
 .../connector/redis/RedisTestCaseTemplateIT.java   | 225 --------------------
 .../fake-to-redis-test-custom-key-is-null.conf     |  87 --------
 ...is-test-custom-value-when-hash-key-is-null.conf |  87 --------
 ...-test-custom-value-when-hash-value-is-null.conf |  88 --------
 ...-test-custom-value-when-other-type-is-null.conf |  87 --------
 8 files changed, 375 insertions(+), 726 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
index b7e03962c5..b7dab161c5 100644
--- 
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
@@ -25,10 +25,10 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
 import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.row.TestForDeleteRows;
+import 
org.apache.seatunnel.connectors.seatunnel.redis.row.TestKeyOrValueIsNullRows;
 import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
 import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
 
@@ -49,11 +49,8 @@ import lombok.extern.slf4j.Slf4j;
 import redis.clients.jedis.Jedis;
 
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.time.Duration;
-import java.time.LocalDateTime;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -138,13 +135,7 @@ public abstract class RedisTemplateTest {
                 getCatalogTable(0, key),
                 getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
                 new RedisSinkFactory(),
-                Arrays.asList(
-                        getSeaTunnelRowInsert1(),
-                        getSeaTunnelRowInsert2(),
-                        getSeaTunnelRowInsert3(),
-                        getSeaTunnelRowUpdateBefore(),
-                        getSeaTunnelRowUpdateAfter(),
-                        getSeaTunnelRowDelete()));
+                TestForDeleteRows.getRows());
         Assertions.assertEquals(2, jedis.hlen(key));
         jedis.del(key);
     }
@@ -158,13 +149,7 @@ public abstract class RedisTemplateTest {
                 getCatalogTable(0, key),
                 getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
                 new RedisSinkFactory(),
-                Arrays.asList(
-                        getSeaTunnelRowInsert1(),
-                        getSeaTunnelRowInsert2(),
-                        getSeaTunnelRowInsert3(),
-                        getSeaTunnelRowUpdateBefore(),
-                        getSeaTunnelRowUpdateAfter(),
-                        getSeaTunnelRowDelete()));
+                TestForDeleteRows.getRows());
         int count = 0;
         for (int i = 1; i <= 3; i++) {
             String data = jedis.get("key_check:" + i);
@@ -185,13 +170,7 @@ public abstract class RedisTemplateTest {
                 getCatalogTable(0, key),
                 getDefaultReadonlyConfig(RedisDataType.LIST, key, new 
HashMap<>()),
                 new RedisSinkFactory(),
-                Arrays.asList(
-                        getSeaTunnelRowInsert1(),
-                        getSeaTunnelRowInsert2(),
-                        getSeaTunnelRowInsert3(),
-                        getSeaTunnelRowUpdateBefore(),
-                        getSeaTunnelRowUpdateAfter(),
-                        getSeaTunnelRowDelete()));
+                TestForDeleteRows.getRows());
         Assertions.assertEquals(2, jedis.llen(key));
         jedis.del(key);
     }
@@ -203,35 +182,92 @@ public abstract class RedisTemplateTest {
                 getCatalogTable(0, key),
                 getDefaultReadonlyConfig(RedisDataType.SET, key, new 
HashMap<>()),
                 new RedisSinkFactory(),
-                Arrays.asList(
-                        getSeaTunnelRowInsert1(),
-                        getSeaTunnelRowInsert2(),
-                        getSeaTunnelRowInsert3(),
-                        getSeaTunnelRowUpdateBefore(),
-                        getSeaTunnelRowUpdateAfter(),
-                        getSeaTunnelRowDelete()));
+                TestForDeleteRows.getRows());
         Assertions.assertEquals(2, jedis.scard(key));
         jedis.del(key);
     }
 
     @Test
-    public void testFakeToToRedisDeleteZSetTest() throws IOException {
+    public void testFakeToRedisDeleteZSetTest() throws IOException {
         String key = "zset_check";
         SinkFlowTestUtils.runBatchWithCheckpointDisabled(
                 getCatalogTable(0, key),
                 getDefaultReadonlyConfig(RedisDataType.ZSET, key, new 
HashMap<>()),
                 new RedisSinkFactory(),
-                Arrays.asList(
-                        getSeaTunnelRowInsert1(),
-                        getSeaTunnelRowInsert2(),
-                        getSeaTunnelRowInsert3(),
-                        getSeaTunnelRowUpdateBefore(),
-                        getSeaTunnelRowUpdateAfter(),
-                        getSeaTunnelRowDelete()));
+                TestForDeleteRows.getRows());
         Assertions.assertEquals(2, jedis.zcard(key));
         jedis.del(key);
     }
 
+    @Test
+    public void testFakeToRedisCustomKeyIsNullTest() throws IOException {
+        String key = "key_check:{val_string}";
+        Map<String, Object> otherParams = new HashMap<>();
+        otherParams.put("support_custom_key", true);
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
+                new RedisSinkFactory(),
+                TestKeyOrValueIsNullRows.getRows());
+        int count = 0;
+        String data = jedis.get("key_check:");
+        if (data != null) {
+            count++;
+            jedis.del("key_check:");
+        }
+        for (int i = 2; i <= 3; i++) {
+            data = jedis.get("key_check:NEW" + i);
+            if (data != null) {
+                count++;
+                jedis.del("key_check:NEW" + i);
+            }
+        }
+        Assertions.assertEquals(2, count);
+    }
+
+    @Test
+    public void testFakeToRedisOtherTypeValueIsNullTest() throws IOException {
+        String key = "list_check";
+        Map<String, Object> otherParams = new HashMap<>();
+        otherParams.put("value_field", "val_string");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.LIST, key, otherParams),
+                new RedisSinkFactory(),
+                TestKeyOrValueIsNullRows.getRows());
+        Assertions.assertEquals(2, jedis.llen(key));
+        jedis.del(key);
+    }
+
+    @Test
+    public void testFakeToRedisHashTypeKeyIsNullTest() throws IOException {
+        String key = "hash_check";
+        Map<String, Object> otherParams = new HashMap<>();
+        otherParams.put("hash_key_field", "val_string");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
+                new RedisSinkFactory(),
+                TestKeyOrValueIsNullRows.getRows());
+        Assertions.assertEquals(2, jedis.hlen(key));
+        jedis.del(key);
+    }
+
+    @Test
+    public void testFakeToRedisHashTypeValueIsNullTest() throws IOException {
+        String key = "hash_check";
+        Map<String, Object> otherParams = new HashMap<>();
+        otherParams.put("hash_key_field", "id");
+        otherParams.put("hash_value_field", "val_string");
+        SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+                getCatalogTable(0, key),
+                getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
+                new RedisSinkFactory(),
+                TestKeyOrValueIsNullRows.getRows());
+        Assertions.assertEquals(2, jedis.hlen(key));
+        jedis.del(key);
+    }
+
     private ReadonlyConfig getDefaultReadonlyConfig(
             RedisDataType dataType, String key, Map<String, Object> 
otherParams) {
         Map<String, Object> map = new HashMap<>(otherParams);
@@ -245,117 +281,6 @@ public abstract class RedisTemplateTest {
         return ReadonlyConfig.fromMap(map);
     }
 
-    private SeaTunnelRow getSeaTunnelRowInsert1() {
-        return new SeaTunnelRow(
-                new Object[] {
-                    1,
-                    true,
-                    (byte) 1,
-                    (short) 2,
-                    3,
-                    4L,
-                    4.3f,
-                    5.3d,
-                    BigDecimal.valueOf(6.3).setScale(1),
-                    "NEW",
-                    LocalDateTime.parse("2020-02-02T02:02:02")
-                });
-    }
-
-    private SeaTunnelRow getSeaTunnelRowInsert2() {
-        return new SeaTunnelRow(
-                new Object[] {
-                    2,
-                    true,
-                    (byte) 1,
-                    (short) 2,
-                    3,
-                    4L,
-                    4.3f,
-                    5.3d,
-                    BigDecimal.valueOf(6.3).setScale(1),
-                    "NEW",
-                    LocalDateTime.parse("2020-02-02T02:02:02")
-                });
-    }
-
-    private SeaTunnelRow getSeaTunnelRowInsert3() {
-        return new SeaTunnelRow(
-                new Object[] {
-                    3,
-                    true,
-                    (byte) 1,
-                    (short) 2,
-                    3,
-                    4L,
-                    4.3f,
-                    5.3d,
-                    BigDecimal.valueOf(6.3).setScale(1),
-                    "NEW",
-                    LocalDateTime.parse("2020-02-02T02:02:02")
-                });
-    }
-
-    private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
-        final SeaTunnelRow seaTunnelRow =
-                new SeaTunnelRow(
-                        new Object[] {
-                            1,
-                            true,
-                            (byte) 1,
-                            (short) 2,
-                            3,
-                            4L,
-                            4.3f,
-                            5.3d,
-                            BigDecimal.valueOf(6.3).setScale(1),
-                            "NEW",
-                            LocalDateTime.parse("2020-02-02T02:02:02")
-                        });
-        seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
-        return seaTunnelRow;
-    }
-
-    private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
-        final SeaTunnelRow seaTunnelRow =
-                new SeaTunnelRow(
-                        new Object[] {
-                            1,
-                            true,
-                            (byte) 2,
-                            (short) 2,
-                            3,
-                            4L,
-                            4.3f,
-                            5.3d,
-                            BigDecimal.valueOf(6.3).setScale(1),
-                            "NEW",
-                            LocalDateTime.parse("2020-02-02T02:02:02")
-                        });
-        seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
-        return seaTunnelRow;
-    }
-
-    private SeaTunnelRow getSeaTunnelRowDelete() {
-        final SeaTunnelRow seaTunnelRow =
-                new SeaTunnelRow(
-                        new Object[] {
-                            2,
-                            true,
-                            (byte) 1,
-                            (short) 2,
-                            3,
-                            4L,
-                            4.3f,
-                            5.3d,
-                            BigDecimal.valueOf(6.3).setScale(1),
-                            "NEW",
-                            LocalDateTime.parse("2020-02-02T02:02:02")
-                        });
-        seaTunnelRow.setRowKind(RowKind.DELETE);
-        return seaTunnelRow;
-    }
-
     private CatalogTable getCatalogTable(Integer dbNum, String key) {
         return CatalogTable.of(
                 TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java
new file mode 100644
index 0000000000..32a9d8b498
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.redis.row;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestForDeleteRows {
+
+    public static List<SeaTunnelRow> getRows() {
+        return Arrays.asList(
+                getSeaTunnelRowInsert1(),
+                getSeaTunnelRowInsert2(),
+                getSeaTunnelRowInsert3(),
+                getSeaTunnelRowUpdateBefore(),
+                getSeaTunnelRowUpdateAfter(),
+                getSeaTunnelRowDelete());
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowInsert1() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    1,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowInsert2() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    2,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowInsert3() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    3,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowUpdateBefore() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+        return seaTunnelRow;
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowUpdateAfter() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 2,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+        return seaTunnelRow;
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowDelete() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            2,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            "NEW",
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.DELETE);
+        return seaTunnelRow;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java
 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java
new file mode 100644
index 0000000000..be83768b35
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.redis.row;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestKeyOrValueIsNullRows {
+
+    public static List<SeaTunnelRow> getRows() {
+        return Arrays.asList(
+                getSeaTunnelRowWithStringNullInsert1(),
+                getSeaTunnelRowInsert2(),
+                getSeaTunnelRowInsert3(),
+                getSeaTunnelRowWithStringNullUpdateBefore(),
+                getSeaTunnelRowWithStringNullUpdateAfter(),
+                getSeaTunnelRowWithStringNullDelete());
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowWithStringNullInsert1() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    1,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    null,
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowInsert2() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    2,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW2",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowInsert3() {
+        return new SeaTunnelRow(
+                new Object[] {
+                    3,
+                    true,
+                    (byte) 1,
+                    (short) 2,
+                    3,
+                    4L,
+                    4.3f,
+                    5.3d,
+                    BigDecimal.valueOf(6.3).setScale(1),
+                    "NEW3",
+                    LocalDateTime.parse("2020-02-02T02:02:02")
+                });
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowWithStringNullUpdateBefore() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            null,
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+        return seaTunnelRow;
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowWithStringNullUpdateAfter() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 2,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            null,
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+        return seaTunnelRow;
+    }
+
+    private static SeaTunnelRow getSeaTunnelRowWithStringNullDelete() {
+        final SeaTunnelRow seaTunnelRow =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            true,
+                            (byte) 1,
+                            (short) 2,
+                            3,
+                            4L,
+                            4.3f,
+                            5.3d,
+                            BigDecimal.valueOf(6.3).setScale(1),
+                            null,
+                            LocalDateTime.parse("2020-02-02T02:02:02")
+                        });
+        seaTunnelRow.setRowKind(RowKind.DELETE);
+        return seaTunnelRow;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 0124681abb..77b0733db5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -18,25 +18,17 @@ package org.apache.seatunnel.e2e.connector.redis;
 
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.JsonUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -81,7 +73,6 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY;
 import static org.awaitility.Awaitility.await;
 
 @Slf4j
@@ -706,221 +697,5 @@ public abstract class RedisTestCaseTemplateIT extends 
TestSuiteBase implements T
         Assertions.assertEquals(2, count);
     }
 
-    @TestTemplate
-    public void testFakeToRedisCustomKeyIsNullTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                
container.executeJob("/fake-to-redis-test-custom-key-is-null.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        int count = 0;
-        String data = jedis.get("key_check:");
-        if (data != null) {
-            count++;
-            jedis.del("key_check:");
-        }
-        for (int i = 2; i <= 3; i++) {
-            data = jedis.get("key_check:NEW" + i);
-            if (data != null) {
-                count++;
-                jedis.del("key_check:NEW" + i);
-            }
-        }
-        Assertions.assertEquals(2, count);
-    }
-
-    @TestTemplate
-    public void testFakeToRedisOtherTypeValueIsNullTest(TestContainer 
container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob(
-                        
"/fake-to-redis-test-custom-value-when-other-type-is-null.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.llen("list_check"));
-        jedis.del("list_check");
-    }
-
-    @TestTemplate
-    public void testFakeToRedisHashTypeKeyIsNullTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                
container.executeJob("/fake-to-redis-test-custom-value-when-hash-key-is-null.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.hlen("hash_check"));
-        jedis.del("hash_check");
-    }
-
-    @TestTemplate
-    public void testFakeToRedisHashTypeValueIsNullTest(TestContainer container)
-            throws IOException, InterruptedException {
-        Container.ExecResult execResult =
-                container.executeJob(
-                        
"/fake-to-redis-test-custom-value-when-hash-value-is-null.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
-        Assertions.assertEquals(2, jedis.hlen("hash_check"));
-        jedis.del("hash_check");
-    }
-
     public abstract RedisContainerInfo getRedisContainerInfo();
-
-    private ReadonlyConfig getDefaultReadonlyConfig(
-            RedisDataType dataType, String key, Map<String, Object> 
otherParams) {
-        Map<String, Object> map = new HashMap<>(otherParams);
-        map.put("host", redisContainer.getHost());
-        map.put("port", redisContainer.getFirstMappedPort());
-        map.put("db_num", 0);
-        map.put("auth", password);
-        map.put("key", key);
-        map.put("data_type", dataType.name());
-        map.put("batch_size", 33);
-        return ReadonlyConfig.fromMap(map);
-    }
-
-    private SeaTunnelRow getSeaTunnelRowInsert1() {
-        return new SeaTunnelRow(
-                new Object[] {
-                    1,
-                    true,
-                    (byte) 1,
-                    (short) 2,
-                    3,
-                    4L,
-                    4.3f,
-                    5.3d,
-                    BigDecimal.valueOf(6.3).setScale(1),
-                    "NEW",
-                    LocalDateTime.parse("2020-02-02T02:02:02")
-                });
-    }
-
-    private SeaTunnelRow getSeaTunnelRowInsert2() {
-        return new SeaTunnelRow(
-                new Object[] {
-                    2,
-                    true,
-                    (byte) 1,
-                    (short) 2,
-                    3,
-                    4L,
-                    4.3f,
-                    5.3d,
-                    BigDecimal.valueOf(6.3).setScale(1),
-                    "NEW",
-                    LocalDateTime.parse("2020-02-02T02:02:02")
-                });
-    }
-
-    private SeaTunnelRow getSeaTunnelRowInsert3() {
-        return new SeaTunnelRow(
-                new Object[] {
-                    3,
-                    true,
-                    (byte) 1,
-                    (short) 2,
-                    3,
-                    4L,
-                    4.3f,
-                    5.3d,
-                    BigDecimal.valueOf(6.3).setScale(1),
-                    "NEW",
-                    LocalDateTime.parse("2020-02-02T02:02:02")
-                });
-    }
-
-    private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
-        final SeaTunnelRow seaTunnelRow =
-                new SeaTunnelRow(
-                        new Object[] {
-                            1,
-                            true,
-                            (byte) 1,
-                            (short) 2,
-                            3,
-                            4L,
-                            4.3f,
-                            5.3d,
-                            BigDecimal.valueOf(6.3).setScale(1),
-                            "NEW",
-                            LocalDateTime.parse("2020-02-02T02:02:02")
-                        });
-        seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
-        return seaTunnelRow;
-    }
-
-    private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
-        final SeaTunnelRow seaTunnelRow =
-                new SeaTunnelRow(
-                        new Object[] {
-                            1,
-                            true,
-                            (byte) 2,
-                            (short) 2,
-                            3,
-                            4L,
-                            4.3f,
-                            5.3d,
-                            BigDecimal.valueOf(6.3).setScale(1),
-                            "NEW",
-                            LocalDateTime.parse("2020-02-02T02:02:02")
-                        });
-        seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
-        return seaTunnelRow;
-    }
-
-    private SeaTunnelRow getSeaTunnelRowDelete() {
-        final SeaTunnelRow seaTunnelRow =
-                new SeaTunnelRow(
-                        new Object[] {
-                            2,
-                            true,
-                            (byte) 1,
-                            (short) 2,
-                            3,
-                            4L,
-                            4.3f,
-                            5.3d,
-                            BigDecimal.valueOf(6.3).setScale(1),
-                            "NEW",
-                            LocalDateTime.parse("2020-02-02T02:02:02")
-                        });
-        seaTunnelRow.setRowKind(RowKind.DELETE);
-        return seaTunnelRow;
-    }
-
-    private CatalogTable getCatalogTable(Integer dbNum, String key) {
-        return CatalogTable.of(
-                TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
-                getTableSchema(),
-                new HashMap<>(),
-                new ArrayList<>(),
-                "");
-    }
-
-    private TableSchema getTableSchema() {
-        return new TableSchema(getColumns(), null, null);
-    }
-
-    private List<Column> getColumns() {
-        List<Column> columns = new ArrayList<>();
-        columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true, 
"", ""));
-        columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L, 
0, true, "", ""));
-        columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0, 
true, "", ""));
-        columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L, 
0, true, "", ""));
-        columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L, 
0, true, "", ""));
-        columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L, 
0, true, "", ""));
-        columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L, 
0, true, "", ""));
-        columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE, 
64L, 0, true, "", ""));
-        columns.add(
-                new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L, 
1, true, "", ""));
-        columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE, 
0L, 0, true, "", ""));
-        columns.add(
-                new PhysicalColumn(
-                        "val_unixtime_micros",
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
-                        64L,
-                        6,
-                        true,
-                        "",
-                        ""));
-        return columns;
-    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
deleted file mode 100644
index 20ec18450a..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "key_check:{val_string}"
-    data_type = key
-    support_custom_key = true
-    batch_size = 33
-  }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
deleted file mode 100644
index 774b4aa379..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "hash_check"
-    data_type = hash
-    hash_key_field = "val_string"
-    batch_size = 33
-  }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
deleted file mode 100644
index 8d3c2fee7a..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "hash_check"
-    data_type = hash
-    hash_key_field = "id"
-    hash_value_field = "val_string"
-    batch_size = 33
-  }
-}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
deleted file mode 100644
index 1eab8030d9..0000000000
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
-  parallelism = 1
-  job.mode = "BATCH"
-  shade.identifier = "base64"
-
-  #spark config
-  spark.app.name = "SeaTunnel"
-  spark.executor.instances = 2
-  spark.executor.cores = 1
-  spark.executor.memory = "1g"
-  spark.master = local
-}
-
-source {
-  FakeSource {
-    schema = {
-      fields {
-                id = int
-                val_bool = boolean
-                val_int8 = tinyint
-                val_int16 = smallint
-                val_int32 = int
-                val_int64 = bigint
-                val_float = float
-                val_double = double
-                val_decimal = "decimal(16, 1)"
-                val_string = string
-                val_unixtime_micros = timestamp
-      }
-    }
-    rows = [
-      {
-        kind = INSERT
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = INSERT
-        fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3", 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_BEFORE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = UPDATE_AFTER
-        fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      },
-      {
-        kind = DELETE
-        fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null, 
"2020-02-02T02:02:02"]
-      }
-    ]
-  }
-}
-
-sink {
-  Redis {
-    host = "redis-e2e"
-    port = 6379
-    auth = "U2VhVHVubmVs"
-    key = "list_check"
-    data_type = list
-    value_field = "val_string"
-    batch_size = 33
-  }
-}
\ No newline at end of file

Reply via email to