This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b1facd6a75c Refactor PipelineProcessConfigurationPersistServiceTest to 
support more job types (#37741)
b1facd6a75c is described below

commit b1facd6a75c875024e166517328a813263e8db97
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 15 11:01:25 2026 +0800

    Refactor PipelineProcessConfigurationPersistServiceTest to support more job 
types (#37741)
---
 ...lineProcessConfigurationPersistServiceTest.java | 121 +++++++++------------
 1 file changed, 54 insertions(+), 67 deletions(-)

diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 013a4f07d5d..0344a7aab45 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata;
 
+import com.google.common.collect.ImmutableMap;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineWriteConfiguration;
@@ -26,9 +27,12 @@ import 
org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Map;
 import java.util.Properties;
+import java.util.function.BiConsumer;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -36,74 +40,56 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 
 class PipelineProcessConfigurationPersistServiceTest {
     
-    private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
+    private final Map<String, BiConsumer<YamlPipelineProcessConfiguration, 
YamlPipelineProcessConfiguration>> typesAssertionMap = ImmutableMap.of(
+            "READ", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), expected.getRead());
+                assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
+                assertNull(actual.getStreamChannel());
+            },
+            "WRITE", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
+                assertYamlConfiguration(actual.getWrite(), 
expected.getWrite());
+                assertNull(actual.getStreamChannel());
+            },
+            "STREAM_CHANNEL", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
+                assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
+                assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+            },
+            "READ-WRITE", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), expected.getRead());
+                assertYamlConfiguration(actual.getWrite(), 
expected.getWrite());
+                assertNull(actual.getStreamChannel());
+            },
+            "READ-STREAM_CHANNEL", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), expected.getRead());
+                assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
+                assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+            },
+            "WRITE-STREAM_CHANNEL", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
+                assertYamlConfiguration(actual.getWrite(), 
expected.getWrite());
+                assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+            },
+            "READ-WRITE-STREAM_CHANNEL", (actual, expected) -> {
+                assertYamlConfiguration(actual.getRead(), expected.getRead());
+                assertYamlConfiguration(actual.getWrite(), 
expected.getWrite());
+                assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+            });
     
     @BeforeAll
     static void beforeClass() {
         PipelineContextUtils.initPipelineContextManager();
     }
     
-    @Test
-    void assertPersistAndLoadReadConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), expected.getRead());
-        assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
-        assertNull(actual.getStreamChannel());
-    }
-    
-    @Test
-    void assertPersistAndLoadWriteConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("WRITE");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
-        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
-        assertNull(actual.getStreamChannel());
-    }
-    
-    @Test
-    void assertPersistAndLoadStreamChannelConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("STREAM_CHANNEL");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
-        assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
-        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
-    }
-    
-    @Test
-    void assertPersistAndLoadReadWriteConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ,WRITE");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), expected.getRead());
-        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
-        assertNull(actual.getStreamChannel());
-    }
-    
-    @Test
-    void assertPersistAndLoadReadStreamChannelConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ,STREAM_CHANNEL");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), expected.getRead());
-        assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
-        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
-    }
-    
-    @Test
-    void assertPersistAndLoadWriteStreamChannelConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("WRITE,STREAM_CHANNEL");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
-        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
-        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
-    }
-    
-    @Test
-    void assertPersistAndLoadAllConfiguration() {
-        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ,WRITE,STREAM_CHANNEL");
-        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
-        assertYamlConfiguration(actual.getRead(), expected.getRead());
-        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
-        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+    @ParameterizedTest
+    @ValueSource(strings = {"MIGRATION", "STREAMING"})
+    void assertPersistAndLoad(final String jobType) {
+        for (String each : typesAssertionMap.keySet()) {
+            YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration(each);
+            YamlPipelineProcessConfiguration actual = persistAndLoad(jobType, 
expected);
+            typesAssertionMap.get(each).accept(actual, expected);
+        }
     }
     
     private YamlPipelineProcessConfiguration 
createYamlPipelineProcessConfiguration(final String types) {
@@ -112,7 +98,7 @@ class PipelineProcessConfigurationPersistServiceTest {
         result.setRead(null);
         result.setWrite(null);
         result.setStreamChannel(null);
-        for (String each : types.split(",")) {
+        for (String each : types.split("-")) {
             switch (each.trim()) {
                 case "READ":
                     result.setRead(createYamlPipelineReadConfiguration());
@@ -166,10 +152,11 @@ class PipelineProcessConfigurationPersistServiceTest {
         return result;
     }
     
-    YamlPipelineProcessConfiguration persistAndLoad(final 
YamlPipelineProcessConfiguration yamlProcessConfig) {
+    YamlPipelineProcessConfiguration persistAndLoad(final String jobType, 
final YamlPipelineProcessConfiguration yamlProcessConfig) {
         PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
-        persistService.persist(PipelineContextUtils.getContextKey(), 
"MIGRATION", SWAPPER.swapToObject(yamlProcessConfig));
-        return 
SWAPPER.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 "MIGRATION"));
+        YamlPipelineProcessConfigurationSwapper swapper = new 
YamlPipelineProcessConfigurationSwapper();
+        persistService.persist(PipelineContextUtils.getContextKey(), jobType, 
swapper.swapToObject(yamlProcessConfig));
+        return 
swapper.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 jobType));
     }
     
     private void assertYamlConfiguration(final YamlConfiguration actual, final 
YamlConfiguration expected) {

Reply via email to