This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 279458aad7c Improve PipelineProcessConfigurationPersistServiceTest 
code coverage rate (#37739)
279458aad7c is described below

commit 279458aad7c26a44e408083b6644c4ef7ab41373
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 14 20:33:52 2026 +0800

    Improve PipelineProcessConfigurationPersistServiceTest code coverage rate 
(#37739)
---
 ...lineProcessConfigurationPersistServiceTest.java | 150 ++++++++++++++++++---
 1 file changed, 132 insertions(+), 18 deletions(-)

diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index ceab68d907e..013a4f07d5d 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -17,48 +17,162 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata;
 
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.config.YamlPipelineWriteConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
 import 
org.apache.shardingsphere.infra.algorithm.core.yaml.YamlAlgorithmConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.util.Properties;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 class PipelineProcessConfigurationPersistServiceTest {
     
+    private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
+    
     @BeforeAll
     static void beforeClass() {
         PipelineContextUtils.initPipelineContextManager();
     }
     
     @Test
-    void assertLoadAndPersist() {
-        YamlPipelineProcessConfiguration yamlProcessConfig = 
createYamlPipelineProcessConfiguration();
-        String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
-        PipelineProcessConfiguration processConfig = new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
-        PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
-        persistService.persist(PipelineContextUtils.getContextKey(), 
"MIGRATION", processConfig);
-        String actualYamlText = YamlEngine.marshal(new 
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 "MIGRATION")));
-        assertThat(actualYamlText, is(expectedYamlText));
+    void assertPersistAndLoadReadConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), expected.getRead());
+        assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
+        assertNull(actual.getStreamChannel());
+    }
+    
+    @Test
+    void assertPersistAndLoadWriteConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("WRITE");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
+        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
+        assertNull(actual.getStreamChannel());
+    }
+    
+    @Test
+    void assertPersistAndLoadStreamChannelConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("STREAM_CHANNEL");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
+        assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
+        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+    }
+    
+    @Test
+    void assertPersistAndLoadReadWriteConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ,WRITE");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), expected.getRead());
+        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
+        assertNull(actual.getStreamChannel());
+    }
+    
+    @Test
+    void assertPersistAndLoadReadStreamChannelConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ,STREAM_CHANNEL");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), expected.getRead());
+        assertYamlConfiguration(actual.getWrite(), new 
YamlPipelineWriteConfiguration());
+        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+    }
+    
+    @Test
+    void assertPersistAndLoadWriteStreamChannelConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("WRITE,STREAM_CHANNEL");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), new 
YamlPipelineReadConfiguration());
+        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
+        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
     }
     
-    private YamlPipelineProcessConfiguration 
createYamlPipelineProcessConfiguration() {
+    @Test
+    void assertPersistAndLoadAllConfiguration() {
+        YamlPipelineProcessConfiguration expected = 
createYamlPipelineProcessConfiguration("READ,WRITE,STREAM_CHANNEL");
+        YamlPipelineProcessConfiguration actual = persistAndLoad(expected);
+        assertYamlConfiguration(actual.getRead(), expected.getRead());
+        assertYamlConfiguration(actual.getWrite(), expected.getWrite());
+        assertYamlConfiguration(actual.getStreamChannel(), 
expected.getStreamChannel());
+    }
+    
+    private YamlPipelineProcessConfiguration 
createYamlPipelineProcessConfiguration(final String types) {
         YamlPipelineProcessConfiguration result = new 
YamlPipelineProcessConfiguration();
-        YamlPipelineReadConfiguration yamlReadConfig = new 
YamlPipelineReadConfiguration();
-        yamlReadConfig.setShardingSize(10);
-        result.setRead(yamlReadConfig);
-        YamlPipelineWriteConfiguration yamlWriteConfig = new 
YamlPipelineWriteConfiguration();
-        result.setWrite(yamlWriteConfig);
-        YamlAlgorithmConfiguration yamlStreamChannel = new 
YamlAlgorithmConfiguration();
-        yamlStreamChannel.setType("MEMORY");
-        result.setStreamChannel(yamlStreamChannel);
+        // Ref AlterTransmissionRuleExecutor
+        result.setRead(null);
+        result.setWrite(null);
+        result.setStreamChannel(null);
+        for (String each : types.split(",")) {
+            switch (each.trim()) {
+                case "READ":
+                    result.setRead(createYamlPipelineReadConfiguration());
+                    break;
+                case "WRITE":
+                    result.setWrite(createYamlPipelineWriteConfiguration());
+                    break;
+                case "STREAM_CHANNEL":
+                    
result.setStreamChannel(createYamlStreamChannelConfiguration());
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown type: " + 
each);
+            }
+        }
+        return result;
+    }
+    
+    private YamlPipelineReadConfiguration 
createYamlPipelineReadConfiguration() {
+        YamlPipelineReadConfiguration result = new 
YamlPipelineReadConfiguration();
+        result.setWorkerThread(10);
+        result.setBatchSize(1000);
+        result.setShardingSize(10000000);
+        YamlAlgorithmConfiguration rateLimiter = new 
YamlAlgorithmConfiguration();
+        result.setRateLimiter(rateLimiter);
+        rateLimiter.setType("QPS");
+        Properties props = new Properties();
+        rateLimiter.setProps(props);
+        props.setProperty("qps", "500");
+        return result;
+    }
+    
+    private YamlPipelineWriteConfiguration 
createYamlPipelineWriteConfiguration() {
+        YamlPipelineWriteConfiguration result = new 
YamlPipelineWriteConfiguration();
+        result.setWorkerThread(10);
+        result.setBatchSize(1000);
+        YamlAlgorithmConfiguration rateLimiter = new 
YamlAlgorithmConfiguration();
+        result.setRateLimiter(rateLimiter);
+        rateLimiter.setType("TPS");
+        Properties props = new Properties();
+        rateLimiter.setProps(props);
+        props.setProperty("tps", "2000");
         return result;
     }
+    
+    private YamlAlgorithmConfiguration createYamlStreamChannelConfiguration() {
+        YamlAlgorithmConfiguration result = new YamlAlgorithmConfiguration();
+        result.setType("MEMORY");
+        Properties props = new Properties();
+        result.setProps(props);
+        props.setProperty("block-queue-size", "1000");
+        return result;
+    }
+    
+    YamlPipelineProcessConfiguration persistAndLoad(final 
YamlPipelineProcessConfiguration yamlProcessConfig) {
+        PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
+        persistService.persist(PipelineContextUtils.getContextKey(), 
"MIGRATION", SWAPPER.swapToObject(yamlProcessConfig));
+        return 
SWAPPER.swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 "MIGRATION"));
+    }
+    
+    private void assertYamlConfiguration(final YamlConfiguration actual, final 
YamlConfiguration expected) {
+        assertThat(YamlEngine.marshal(actual), 
is(YamlEngine.marshal(expected)));
+    }
 }

Reply via email to