This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f3de771e03 [Improve][E2E][connector-redis-e2e] Transfer e2e test cases
to unit tests (#10160)
f3de771e03 is described below
commit f3de771e033cf1a8c439a295866859aceffac935
Author: 老王 <[email protected]>
AuthorDate: Thu Jan 1 10:06:43 2026 +0800
[Improve][E2E][connector-redis-e2e] Transfer e2e test cases to unit tests
(#10160)
---
.../seatunnel/redis/RedisTemplateTest.java | 229 +++++++--------------
.../seatunnel/redis/row/TestForDeleteRows.java | 149 ++++++++++++++
.../redis/row/TestKeyOrValueIsNullRows.java | 149 ++++++++++++++
.../connector/redis/RedisTestCaseTemplateIT.java | 225 --------------------
.../fake-to-redis-test-custom-key-is-null.conf | 87 --------
...is-test-custom-value-when-hash-key-is-null.conf | 87 --------
...-test-custom-value-when-hash-value-is-null.conf | 88 --------
...-test-custom-value-when-other-type-is-null.conf | 87 --------
8 files changed, 375 insertions(+), 726 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
index b7e03962c5..b7dab161c5 100644
---
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/RedisTemplateTest.java
@@ -25,10 +25,10 @@ import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.RowKind;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
+import org.apache.seatunnel.connectors.seatunnel.redis.row.TestForDeleteRows;
+import
org.apache.seatunnel.connectors.seatunnel.redis.row.TestKeyOrValueIsNullRows;
import org.apache.seatunnel.connectors.seatunnel.redis.sink.RedisSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
@@ -49,11 +49,8 @@ import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import java.io.IOException;
-import java.math.BigDecimal;
import java.time.Duration;
-import java.time.LocalDateTime;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -138,13 +135,7 @@ public abstract class RedisTemplateTest {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
new RedisSinkFactory(),
- Arrays.asList(
- getSeaTunnelRowInsert1(),
- getSeaTunnelRowInsert2(),
- getSeaTunnelRowInsert3(),
- getSeaTunnelRowUpdateBefore(),
- getSeaTunnelRowUpdateAfter(),
- getSeaTunnelRowDelete()));
+ TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.hlen(key));
jedis.del(key);
}
@@ -158,13 +149,7 @@ public abstract class RedisTemplateTest {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
new RedisSinkFactory(),
- Arrays.asList(
- getSeaTunnelRowInsert1(),
- getSeaTunnelRowInsert2(),
- getSeaTunnelRowInsert3(),
- getSeaTunnelRowUpdateBefore(),
- getSeaTunnelRowUpdateAfter(),
- getSeaTunnelRowDelete()));
+ TestForDeleteRows.getRows());
int count = 0;
for (int i = 1; i <= 3; i++) {
String data = jedis.get("key_check:" + i);
@@ -185,13 +170,7 @@ public abstract class RedisTemplateTest {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.LIST, key, new
HashMap<>()),
new RedisSinkFactory(),
- Arrays.asList(
- getSeaTunnelRowInsert1(),
- getSeaTunnelRowInsert2(),
- getSeaTunnelRowInsert3(),
- getSeaTunnelRowUpdateBefore(),
- getSeaTunnelRowUpdateAfter(),
- getSeaTunnelRowDelete()));
+ TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.llen(key));
jedis.del(key);
}
@@ -203,35 +182,92 @@ public abstract class RedisTemplateTest {
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.SET, key, new
HashMap<>()),
new RedisSinkFactory(),
- Arrays.asList(
- getSeaTunnelRowInsert1(),
- getSeaTunnelRowInsert2(),
- getSeaTunnelRowInsert3(),
- getSeaTunnelRowUpdateBefore(),
- getSeaTunnelRowUpdateAfter(),
- getSeaTunnelRowDelete()));
+ TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.scard(key));
jedis.del(key);
}
@Test
- public void testFakeToToRedisDeleteZSetTest() throws IOException {
+ public void testFakeToRedisDeleteZSetTest() throws IOException {
String key = "zset_check";
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
getCatalogTable(0, key),
getDefaultReadonlyConfig(RedisDataType.ZSET, key, new
HashMap<>()),
new RedisSinkFactory(),
- Arrays.asList(
- getSeaTunnelRowInsert1(),
- getSeaTunnelRowInsert2(),
- getSeaTunnelRowInsert3(),
- getSeaTunnelRowUpdateBefore(),
- getSeaTunnelRowUpdateAfter(),
- getSeaTunnelRowDelete()));
+ TestForDeleteRows.getRows());
Assertions.assertEquals(2, jedis.zcard(key));
jedis.del(key);
}
+ @Test
+ public void testFakeToRedisCustomKeyIsNullTest() throws IOException {
+ String key = "key_check:{val_string}";
+ Map<String, Object> otherParams = new HashMap<>();
+ otherParams.put("support_custom_key", true);
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.KEY, key, otherParams),
+ new RedisSinkFactory(),
+ TestKeyOrValueIsNullRows.getRows());
+ int count = 0;
+ String data = jedis.get("key_check:");
+ if (data != null) {
+ count++;
+ jedis.del("key_check:");
+ }
+ for (int i = 2; i <= 3; i++) {
+ data = jedis.get("key_check:NEW" + i);
+ if (data != null) {
+ count++;
+ jedis.del("key_check:NEW" + i);
+ }
+ }
+ Assertions.assertEquals(2, count);
+ }
+
+ @Test
+ public void testFakeToRedisOtherTypeValueIsNullTest() throws IOException {
+ String key = "list_check";
+ Map<String, Object> otherParams = new HashMap<>();
+ otherParams.put("value_field", "val_string");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.LIST, key, otherParams),
+ new RedisSinkFactory(),
+ TestKeyOrValueIsNullRows.getRows());
+ Assertions.assertEquals(2, jedis.llen(key));
+ jedis.del(key);
+ }
+
+ @Test
+ public void testFakeToRedisHashTypeKeyIsNullTest() throws IOException {
+ String key = "hash_check";
+ Map<String, Object> otherParams = new HashMap<>();
+ otherParams.put("hash_key_field", "val_string");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
+ new RedisSinkFactory(),
+ TestKeyOrValueIsNullRows.getRows());
+ Assertions.assertEquals(2, jedis.hlen(key));
+ jedis.del(key);
+ }
+
+ @Test
+ public void testFakeToRedisHashTypeValueIsNullTest() throws IOException {
+ String key = "hash_check";
+ Map<String, Object> otherParams = new HashMap<>();
+ otherParams.put("hash_key_field", "id");
+ otherParams.put("hash_value_field", "val_string");
+ SinkFlowTestUtils.runBatchWithCheckpointDisabled(
+ getCatalogTable(0, key),
+ getDefaultReadonlyConfig(RedisDataType.HASH, key, otherParams),
+ new RedisSinkFactory(),
+ TestKeyOrValueIsNullRows.getRows());
+ Assertions.assertEquals(2, jedis.hlen(key));
+ jedis.del(key);
+ }
+
private ReadonlyConfig getDefaultReadonlyConfig(
RedisDataType dataType, String key, Map<String, Object>
otherParams) {
Map<String, Object> map = new HashMap<>(otherParams);
@@ -245,117 +281,6 @@ public abstract class RedisTemplateTest {
return ReadonlyConfig.fromMap(map);
}
- private SeaTunnelRow getSeaTunnelRowInsert1() {
- return new SeaTunnelRow(
- new Object[] {
- 1,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- }
-
- private SeaTunnelRow getSeaTunnelRowInsert2() {
- return new SeaTunnelRow(
- new Object[] {
- 2,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- }
-
- private SeaTunnelRow getSeaTunnelRowInsert3() {
- return new SeaTunnelRow(
- new Object[] {
- 3,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- }
-
- private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
- final SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(
- new Object[] {
- 1,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
- return seaTunnelRow;
- }
-
- private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
- final SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(
- new Object[] {
- 1,
- true,
- (byte) 2,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
- return seaTunnelRow;
- }
-
- private SeaTunnelRow getSeaTunnelRowDelete() {
- final SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(
- new Object[] {
- 2,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- seaTunnelRow.setRowKind(RowKind.DELETE);
- return seaTunnelRow;
- }
-
private CatalogTable getCatalogTable(Integer dbNum, String key) {
return CatalogTable.of(
TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
diff --git
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java
new file mode 100644
index 0000000000..32a9d8b498
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestForDeleteRows.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.redis.row;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestForDeleteRows {
+
+ public static List<SeaTunnelRow> getRows() {
+ return Arrays.asList(
+ getSeaTunnelRowInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowUpdateBefore(),
+ getSeaTunnelRowUpdateAfter(),
+ getSeaTunnelRowDelete());
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowInsert1() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowInsert2() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowInsert3() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 3,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowUpdateBefore() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+ return seaTunnelRow;
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowUpdateAfter() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 2,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+ return seaTunnelRow;
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowDelete() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.DELETE);
+ return seaTunnelRow;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java
new file mode 100644
index 0000000000..be83768b35
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-redis/src/test/java/org/apache/seatunnel/connectors/seatunnel/redis/row/TestKeyOrValueIsNullRows.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.redis.row;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestKeyOrValueIsNullRows {
+
+ public static List<SeaTunnelRow> getRows() {
+ return Arrays.asList(
+ getSeaTunnelRowWithStringNullInsert1(),
+ getSeaTunnelRowInsert2(),
+ getSeaTunnelRowInsert3(),
+ getSeaTunnelRowWithStringNullUpdateBefore(),
+ getSeaTunnelRowWithStringNullUpdateAfter(),
+ getSeaTunnelRowWithStringNullDelete());
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowWithStringNullInsert1() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ null,
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowInsert2() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 2,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW2",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowInsert3() {
+ return new SeaTunnelRow(
+ new Object[] {
+ 3,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ "NEW3",
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowWithStringNullUpdateBefore() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ null,
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
+ return seaTunnelRow;
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowWithStringNullUpdateAfter() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 2,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ null,
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
+ return seaTunnelRow;
+ }
+
+ private static SeaTunnelRow getSeaTunnelRowWithStringNullDelete() {
+ final SeaTunnelRow seaTunnelRow =
+ new SeaTunnelRow(
+ new Object[] {
+ 1,
+ true,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 4.3f,
+ 5.3d,
+ BigDecimal.valueOf(6.3).setScale(1),
+ null,
+ LocalDateTime.parse("2020-02-02T02:02:02")
+ });
+ seaTunnelRow.setRowKind(RowKind.DELETE);
+ return seaTunnelRow;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
index 0124681abb..77b0733db5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisTestCaseTemplateIT.java
@@ -18,25 +18,17 @@ package org.apache.seatunnel.e2e.connector.redis;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
import
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisContainerInfo;
-import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
@@ -81,7 +73,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions.CONNECTOR_IDENTITY;
import static org.awaitility.Awaitility.await;
@Slf4j
@@ -706,221 +697,5 @@ public abstract class RedisTestCaseTemplateIT extends
TestSuiteBase implements T
Assertions.assertEquals(2, count);
}
- @TestTemplate
- public void testFakeToRedisCustomKeyIsNullTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
-
container.executeJob("/fake-to-redis-test-custom-key-is-null.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- int count = 0;
- String data = jedis.get("key_check:");
- if (data != null) {
- count++;
- jedis.del("key_check:");
- }
- for (int i = 2; i <= 3; i++) {
- data = jedis.get("key_check:NEW" + i);
- if (data != null) {
- count++;
- jedis.del("key_check:NEW" + i);
- }
- }
- Assertions.assertEquals(2, count);
- }
-
- @TestTemplate
- public void testFakeToRedisOtherTypeValueIsNullTest(TestContainer
container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob(
-
"/fake-to-redis-test-custom-value-when-other-type-is-null.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.llen("list_check"));
- jedis.del("list_check");
- }
-
- @TestTemplate
- public void testFakeToRedisHashTypeKeyIsNullTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
-
container.executeJob("/fake-to-redis-test-custom-value-when-hash-key-is-null.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.hlen("hash_check"));
- jedis.del("hash_check");
- }
-
- @TestTemplate
- public void testFakeToRedisHashTypeValueIsNullTest(TestContainer container)
- throws IOException, InterruptedException {
- Container.ExecResult execResult =
- container.executeJob(
-
"/fake-to-redis-test-custom-value-when-hash-value-is-null.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- Assertions.assertEquals(2, jedis.hlen("hash_check"));
- jedis.del("hash_check");
- }
-
public abstract RedisContainerInfo getRedisContainerInfo();
-
- private ReadonlyConfig getDefaultReadonlyConfig(
- RedisDataType dataType, String key, Map<String, Object>
otherParams) {
- Map<String, Object> map = new HashMap<>(otherParams);
- map.put("host", redisContainer.getHost());
- map.put("port", redisContainer.getFirstMappedPort());
- map.put("db_num", 0);
- map.put("auth", password);
- map.put("key", key);
- map.put("data_type", dataType.name());
- map.put("batch_size", 33);
- return ReadonlyConfig.fromMap(map);
- }
-
- private SeaTunnelRow getSeaTunnelRowInsert1() {
- return new SeaTunnelRow(
- new Object[] {
- 1,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- }
-
- private SeaTunnelRow getSeaTunnelRowInsert2() {
- return new SeaTunnelRow(
- new Object[] {
- 2,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- }
-
- private SeaTunnelRow getSeaTunnelRowInsert3() {
- return new SeaTunnelRow(
- new Object[] {
- 3,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- }
-
- private SeaTunnelRow getSeaTunnelRowUpdateBefore() {
- final SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(
- new Object[] {
- 1,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- seaTunnelRow.setRowKind(RowKind.UPDATE_BEFORE);
- return seaTunnelRow;
- }
-
- private SeaTunnelRow getSeaTunnelRowUpdateAfter() {
- final SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(
- new Object[] {
- 1,
- true,
- (byte) 2,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- seaTunnelRow.setRowKind(RowKind.UPDATE_AFTER);
- return seaTunnelRow;
- }
-
- private SeaTunnelRow getSeaTunnelRowDelete() {
- final SeaTunnelRow seaTunnelRow =
- new SeaTunnelRow(
- new Object[] {
- 2,
- true,
- (byte) 1,
- (short) 2,
- 3,
- 4L,
- 4.3f,
- 5.3d,
- BigDecimal.valueOf(6.3).setScale(1),
- "NEW",
- LocalDateTime.parse("2020-02-02T02:02:02")
- });
- seaTunnelRow.setRowKind(RowKind.DELETE);
- return seaTunnelRow;
- }
-
- private CatalogTable getCatalogTable(Integer dbNum, String key) {
- return CatalogTable.of(
- TableIdentifier.of(CONNECTOR_IDENTITY, dbNum.toString(), key),
- getTableSchema(),
- new HashMap<>(),
- new ArrayList<>(),
- "");
- }
-
- private TableSchema getTableSchema() {
- return new TableSchema(getColumns(), null, null);
- }
-
- private List<Column> getColumns() {
- List<Column> columns = new ArrayList<>();
- columns.add(new PhysicalColumn("id", BasicType.INT_TYPE, 32L, 0, true,
"", ""));
- columns.add(new PhysicalColumn("val_bool", BasicType.BOOLEAN_TYPE, 1L,
0, true, "", ""));
- columns.add(new PhysicalColumn("val_int8", BasicType.BYTE_TYPE, 8L, 0,
true, "", ""));
- columns.add(new PhysicalColumn("val_int16", BasicType.SHORT_TYPE, 16L,
0, true, "", ""));
- columns.add(new PhysicalColumn("val_int32", BasicType.INT_TYPE, 32L,
0, true, "", ""));
- columns.add(new PhysicalColumn("val_int64", BasicType.LONG_TYPE, 64L,
0, true, "", ""));
- columns.add(new PhysicalColumn("val_float", BasicType.FLOAT_TYPE, 32L,
0, true, "", ""));
- columns.add(new PhysicalColumn("val_double", BasicType.DOUBLE_TYPE,
64L, 0, true, "", ""));
- columns.add(
- new PhysicalColumn("val_decimal", new DecimalType(16, 1), 16L,
1, true, "", ""));
- columns.add(new PhysicalColumn("val_string", BasicType.STRING_TYPE,
0L, 0, true, "", ""));
- columns.add(
- new PhysicalColumn(
- "val_unixtime_micros",
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- 64L,
- 6,
- true,
- "",
- ""));
- return columns;
- }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
deleted file mode 100644
index 20ec18450a..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-key-is-null.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "key_check:{val_string}"
- data_type = key
- support_custom_key = true
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
deleted file mode 100644
index 774b4aa379..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-key-is-null.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "hash_check"
- data_type = hash
- hash_key_field = "val_string"
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
deleted file mode 100644
index 8d3c2fee7a..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-hash-value-is-null.conf
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "hash_check"
- data_type = hash
- hash_key_field = "id"
- hash_value_field = "val_string"
- batch_size = 33
- }
-}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
deleted file mode 100644
index 1eab8030d9..0000000000
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/fake-to-redis-test-custom-value-when-other-type-is-null.conf
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-env {
- parallelism = 1
- job.mode = "BATCH"
- shade.identifier = "base64"
-
- #spark config
- spark.app.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
-}
-
-source {
- FakeSource {
- schema = {
- fields {
- id = int
- val_bool = boolean
- val_int8 = tinyint
- val_int16 = smallint
- val_int32 = int
- val_int64 = bigint
- val_float = float
- val_double = double
- val_decimal = "decimal(16, 1)"
- val_string = string
- val_unixtime_micros = timestamp
- }
- }
- rows = [
- {
- kind = INSERT
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW2",
"2020-02-02T02:02:02"]
- },
- {
- kind = INSERT
- fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW3",
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_BEFORE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = UPDATE_AFTER
- fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- },
- {
- kind = DELETE
- fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, null,
"2020-02-02T02:02:02"]
- }
- ]
- }
-}
-
-sink {
- Redis {
- host = "redis-e2e"
- port = 6379
- auth = "U2VhVHVubmVs"
- key = "list_check"
- data_type = list
- value_field = "val_string"
- batch_size = 33
- }
-}
\ No newline at end of file