This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b1facd6a75c Refactor PipelineProcessConfigurationPersistServiceTest to
support more job types (#37741)
b1facd6a75c is described below
commit b1facd6a75c875024e166517328a813263e8db97
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 15 11:01:25 2026 +0800
Refactor PipelineProcessConfigurationPersistServiceTest to support more job
types (#37741)
---
...lineProcessConfigurationPersistServiceTest.java | 121 +++++++++------------
1 file changed, 54 insertions(+), 67 deletions(-)
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 013a4f07d5d..0344a7aab45 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.metadata;
+import com.google.common.collect.ImmutableMap;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineWriteConfiguration;
@@ -26,9 +27,12 @@ import
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.util.Map;
import java.util.Properties;
+import java.util.function.BiConsumer;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -36,74 +40,56 @@ import static org.junit.jupiter.api.Assertions.assertNull;
class PipelineProcessConfigurationPersistServiceTest {
- private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new
YamlPipelineProcessConfigurationSwapper();
+ private final Map<String, BiConsumer<YamlPipelineProcessConfiguration,
YamlPipelineProcessConfiguration>> typesAssertionMap = ImmutableMap.of(
+ "READ", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), expected.getRead());
+ assertYamlConfiguration(actual.getWrite(), new
YamlPipelineWriteConfiguration());
+ assertNull(actual.getStreamChannel());
+ },
+ "WRITE", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), new
YamlPipelineReadConfiguration());
+ assertYamlConfiguration(actual.getWrite(),
expected.getWrite());
+ assertNull(actual.getStreamChannel());
+ },
+ "STREAM_CHANNEL", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), new
YamlPipelineReadConfiguration());
+ assertYamlConfiguration(actual.getWrite(), new
YamlPipelineWriteConfiguration());
+ assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
+ },
+ "READ-WRITE", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), expected.getRead());
+ assertYamlConfiguration(actual.getWrite(),
expected.getWrite());
+ assertNull(actual.getStreamChannel());
+ },
+ "READ-STREAM_CHANNEL", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), expected.getRead());
+ assertYamlConfiguration(actual.getWrite(), new
YamlPipelineWriteConfiguration());
+ assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
+ },
+ "WRITE-STREAM_CHANNEL", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), new
YamlPipelineReadConfiguration());
+ assertYamlConfiguration(actual.getWrite(),
expected.getWrite());
+ assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
+ },
+ "READ-WRITE-STREAM_CHANNEL", (actual, expected) -> {
+ assertYamlConfiguration(actual.getRead(), expected.getRead());
+ assertYamlConfiguration(actual.getWrite(),
expected.getWrite());
+ assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
+ });
@BeforeAll
static void beforeClass() {
PipelineContextUtils.initPipelineContextManager();
}
- @Test
- void assertPersistAndLoadReadConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("READ");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), expected.getRead());
- assertYamlConfiguration(actual.getWrite(), new
YamlPipelineWriteConfiguration());
- assertNull(actual.getStreamChannel());
- }
-
- @Test
- void assertPersistAndLoadWriteConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("WRITE");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), new
YamlPipelineReadConfiguration());
- assertYamlConfiguration(actual.getWrite(), expected.getWrite());
- assertNull(actual.getStreamChannel());
- }
-
- @Test
- void assertPersistAndLoadStreamChannelConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("STREAM_CHANNEL");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), new
YamlPipelineReadConfiguration());
- assertYamlConfiguration(actual.getWrite(), new
YamlPipelineWriteConfiguration());
- assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
- }
-
- @Test
- void assertPersistAndLoadReadWriteConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("READ,WRITE");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), expected.getRead());
- assertYamlConfiguration(actual.getWrite(), expected.getWrite());
- assertNull(actual.getStreamChannel());
- }
-
- @Test
- void assertPersistAndLoadReadStreamChannelConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("READ,STREAM_CHANNEL");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), expected.getRead());
- assertYamlConfiguration(actual.getWrite(), new
YamlPipelineWriteConfiguration());
- assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
- }
-
- @Test
- void assertPersistAndLoadWriteStreamChannelConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("WRITE,STREAM_CHANNEL");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), new
YamlPipelineReadConfiguration());
- assertYamlConfiguration(actual.getWrite(), expected.getWrite());
- assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
- }
-
- @Test
- void assertPersistAndLoadAllConfiguration() {
- YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration("READ,WRITE,STREAM_CHANNEL");
- YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
- assertYamlConfiguration(actual.getRead(), expected.getRead());
- assertYamlConfiguration(actual.getWrite(), expected.getWrite());
- assertYamlConfiguration(actual.getStreamChannel(),
expected.getStreamChannel());
+ @ParameterizedTest
+ @ValueSource(strings = {"MIGRATION", "STREAMING"})
+ void assertPersistAndLoad(final String jobType) {
+ for (String each : typesAssertionMap.keySet()) {
+ YamlPipelineProcessConfiguration expected =
createYamlPipelineProcessConfiguration(each);
+ YamlPipelineProcessConfiguration actual = persistAndLoad(jobType,
expected);
+ typesAssertionMap.get(each).accept(actual, expected);
+ }
}
private YamlPipelineProcessConfiguration
createYamlPipelineProcessConfiguration(final String types) {
@@ -112,7 +98,7 @@ class PipelineProcessConfigurationPersistServiceTest {
result.setRead(null);
result.setWrite(null);
result.setStreamChannel(null);
- for (String each : types.split(",")) {
+ for (String each : types.split("-")) {
switch (each.trim()) {
case "READ":
result.setRead(createYamlPipelineReadConfiguration());
@@ -166,10 +152,11 @@ class PipelineProcessConfigurationPersistServiceTest {
return result;
}
- YamlPipelineProcessConfiguration persistAndLoad(final
YamlPipelineProcessConfiguration yamlProcessConfig) {
+ YamlPipelineProcessConfiguration persistAndLoad(final String jobType,
final YamlPipelineProcessConfiguration yamlProcessConfig) {
PipelineProcessConfigurationPersistService persistService = new
PipelineProcessConfigurationPersistService();
- persistService.persist(PipelineContextUtils.getContextKey(),
"MIGRATION", SWAPPER.swapToObject(yamlProcessConfig));
- return
SWAPPER.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
"MIGRATION"));
+ YamlPipelineProcessConfigurationSwapper swapper = new
YamlPipelineProcessConfigurationSwapper();
+ persistService.persist(PipelineContextUtils.getContextKey(), jobType,
swapper.swapToObject(yamlProcessConfig));
+ return
swapper.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
jobType));
}
private void assertYamlConfiguration(final YamlConfiguration actual, final
YamlConfiguration expected) {