This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 13901915820 Refactor RuleAlteredContext for common usage (#19903)
13901915820 is described below

commit 139019158209d8f02eadfd52979b996933dccb7e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 5 18:45:30 2022 +0800

    Refactor RuleAlteredContext for common usage (#19903)
    
    * Add PipelineProcessConfiguration and yaml swapper
    
    * Add PipelineProcessContext
    
    * Refactor RuleAlteredJobConfigurationPreparer
    
    * Add test
---
 ...hardingRuleAlteredJobConfigurationPreparer.java |  12 +--
 .../pipeline/PipelineProcessConfiguration.java     |  38 +++++++
 .../pipeline/YamlPipelineProcessConfiguration.java |  39 +++++++
 .../YamlPipelineProcessConfigurationSwapper.java   |  58 +++++++++++
 ...amlPipelineProcessConfigurationSwapperTest.java |  68 ++++++++++++
 .../RuleAlteredJobConfigurationPreparer.java       |   6 +-
 ...RuleAlteredJobConfigurationPreparerFixture.java |   4 +-
 .../context/AbstractPipelineProcessContext.java}   | 115 +++++++++++++--------
 .../core/context/PipelineProcessContext.java       |  24 +++++
 .../scenario/rulealtered/RuleAlteredContext.java   |  75 ++------------
 .../rulealtered/RuleAlteredJobContext.java         |   2 +-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |   7 +-
 .../rulealtered/prepare/InventoryTaskSplitter.java |   4 +-
 .../rulealtered/RuleAlteredJobWorkerTest.java      |   2 +-
 14 files changed, 322 insertions(+), 132 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 1e6b5bca95f..981150126c2 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -35,12 +35,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
@@ -153,7 +153,7 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
     
     // TODO use jobConfig as parameter, jobShardingItem
     @Override
-    public TaskConfiguration createTaskConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+    public TaskConfiguration createTaskConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         ShardingSpherePipelineDataSourceConfiguration sourceConfig = 
getSourceConfiguration(jobConfig);
         ShardingRuleConfiguration sourceRuleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
         Map<String, DataSourceProperties> dataSourcePropsMap = new 
YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
@@ -171,7 +171,7 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         Optional<ShardingRuleConfiguration> targetRuleConfig = 
getTargetRuleConfiguration(jobConfig);
         Set<LogicTableName> reShardNeededTables = 
jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig), 
reShardNeededTables);
-        ImporterConfiguration importerConfig = 
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
+        ImporterConfiguration importerConfig = 
createImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
         TaskConfiguration result = new TaskConfiguration(dumperConfig, 
importerConfig);
         log.info("createTaskConfiguration, dataSourceName={}, result={}", 
dataSourceName, result);
         return result;
@@ -237,10 +237,10 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return result;
     }
     
-    private static ImporterConfiguration createImporterConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration 
onRuleAlteredActionConfig,
+    private static ImporterConfiguration createImporterConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
                                                                      final 
Map<LogicTableName, Set<String>> shardingColumnsMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
 jobConfig.getTarget().getParameter());
-        int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
+        int batchSize = pipelineProcessConfig.getOutput().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
         return new ImporterConfiguration(dataSourceConfig, 
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize, 
retryTimes, concurrency);
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
new file mode 100644
index 00000000000..928960d3026
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.rule.data.pipeline;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+
+/**
+ * Pipeline process configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineProcessConfiguration {
+    
+    private final PipelineInputConfiguration input;
+    
+    private final PipelineOutputConfiguration output;
+    
+    private final AlgorithmConfiguration streamChannel;
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
new file mode 100644
index 00000000000..71bb47943e6
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+
+/**
+ * YAML pipeline process configuration.
+ */
+@Getter
+@Setter
+@ToString
+public final class YamlPipelineProcessConfiguration implements 
YamlConfiguration {
+    
+    private YamlPipelineInputConfiguration input;
+    
+    private YamlPipelineOutputConfiguration output;
+    
+    private YamlAlgorithmConfiguration streamChannel;
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
new file mode 100644
index 00000000000..cd6a37812d5
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+
+/**
+ * YAML pipeline process configuration swapper.
+ */
+public final class YamlPipelineProcessConfigurationSwapper implements 
YamlConfigurationSwapper<YamlPipelineProcessConfiguration, 
PipelineProcessConfiguration> {
+    
+    private static final YamlAlgorithmConfigurationSwapper 
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
+    
+    private static final YamlPipelineInputConfigurationSwapper 
INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+    
+    private static final YamlPipelineOutputConfigurationSwapper 
OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+    
+    @Override
+    public YamlPipelineProcessConfiguration swapToYamlConfiguration(final 
PipelineProcessConfiguration data) {
+        if (null == data) {
+            return null;
+        }
+        YamlPipelineProcessConfiguration result = new 
YamlPipelineProcessConfiguration();
+        
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+        
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+        
result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
+        return result;
+    }
+    
+    @Override
+    public PipelineProcessConfiguration swapToObject(final 
YamlPipelineProcessConfiguration yamlConfig) {
+        if (null == yamlConfig) {
+            return null;
+        }
+        return new PipelineProcessConfiguration(
+                INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+                OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+                
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()));
+    }
+}
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
new file mode 100644
index 00000000000..8558f0197c1
--- /dev/null
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class YamlPipelineProcessConfigurationSwapperTest {
+    
+    private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
+    
+    @Test
+    public void assertSwap() {
+        YamlPipelineProcessConfiguration yamlConfig = new 
YamlPipelineProcessConfiguration();
+        Properties rateLimiterProps = new Properties();
+        rateLimiterProps.setProperty("batch-size", "1000");
+        rateLimiterProps.setProperty("qps", "50");
+        YamlPipelineInputConfiguration yamlInputConfig = 
YamlPipelineInputConfiguration.buildWithDefaultValue();
+        yamlConfig.setInput(yamlInputConfig);
+        yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT", 
rateLimiterProps));
+        YamlPipelineOutputConfiguration yamlOutputConfig = 
YamlPipelineOutputConfiguration.buildWithDefaultValue();
+        yamlOutputConfig.setRateLimiter(new 
YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
+        yamlConfig.setOutput(yamlOutputConfig);
+        Properties streamChannelProps = new Properties();
+        streamChannelProps.setProperty("block-queue-size", "10000");
+        yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY", 
streamChannelProps));
+        YamlPipelineProcessConfigurationSwapper 
onRuleAlteredActionConfigSwapper = new 
YamlPipelineProcessConfigurationSwapper();
+        PipelineProcessConfiguration actualConfig = 
onRuleAlteredActionConfigSwapper.swapToObject(yamlConfig);
+        YamlPipelineProcessConfiguration actualYamlConfig = 
onRuleAlteredActionConfigSwapper.swapToYamlConfiguration(actualConfig);
+        assertThat(YamlEngine.marshal(actualYamlConfig), 
is(YamlEngine.marshal(yamlConfig)));
+    }
+    
+    @Test
+    public void assertYamlConfigNull() {
+        assertNull(SWAPPER.swapToYamlConfiguration(null));
+    }
+    
+    @Test
+    public void assertConfigNull() {
+        assertNull(SWAPPER.swapToObject(null));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index b400ad8fac8..809fa2fa6e7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.spi.rulealtered;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.spi.type.required.RequiredSPI;
 
 /**
@@ -40,8 +40,8 @@ public interface RuleAlteredJobConfigurationPreparer extends 
RequiredSPI {
      *
      * @param jobConfig job configuration
      * @param jobShardingItem job sharding item
-     * @param onRuleAlteredActionConfig action configuration
+     * @param pipelineProcessConfig pipeline process configuration
      * @return task configuration
      */
-    TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration 
jobConfig, int jobShardingItem, OnRuleAlteredActionConfiguration 
onRuleAlteredActionConfig);
+    TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
index 8649d74ab42..e454bd44c4a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 
 public final class RuleAlteredJobConfigurationPreparerFixture implements 
RuleAlteredJobConfigurationPreparer {
     
@@ -30,7 +30,7 @@ public final class RuleAlteredJobConfigurationPreparerFixture 
implements RuleAlt
     }
     
     @Override
-    public TaskConfiguration createTaskConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+    public TaskConfiguration createTaskConfiguration(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         return null;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
similarity index 50%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index b6e0c69070d..aafe6962271 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -15,17 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.context;
 
 import lombok.Getter;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
-import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -33,32 +31,25 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
 
 import java.util.Properties;
 
 /**
- * Rule altered context.
+ * Abstract pipeline process context.
  */
 @Getter
 @Slf4j
-// TODO extract Pipeline Context
-public final class RuleAlteredContext {
+public abstract class AbstractPipelineProcessContext {
     
-    private static final String INVENTORY_THREAD_PREFIX = "Inventory-";
+    private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new 
YamlPipelineProcessConfigurationSwapper();
     
-    private static final String INCREMENTAL_THREAD_PREFIX = "Incremental-";
-    
-    private static final String IMPORTER_THREAD_PREFIX = "Importer-";
-    
-    private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER = 
new YamlOnRuleAlteredActionConfigurationSwapper();
-    
-    private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
+    private final PipelineProcessConfiguration pipelineProcessConfig;
     
     private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
     
@@ -66,41 +57,45 @@ public final class RuleAlteredContext {
     
     private final PipelineChannelCreator pipelineChannelCreator;
     
-    private final 
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> 
completionDetectAlgorithm;
+    private final LazyInitializer<ExecuteEngine> 
inventoryDumperExecuteEngineLazyInitializer;
     
-    private final DataConsistencyCalculateAlgorithm 
dataConsistencyCalculateAlgorithm;
+    private final LazyInitializer<ExecuteEngine> 
incrementalDumperExecuteEngineLazyInitializer;
     
-    private final ExecuteEngine inventoryDumperExecuteEngine;
+    private final LazyInitializer<ExecuteEngine> 
importerExecuteEngineLazyInitializer;
     
-    private final ExecuteEngine incrementalDumperExecuteEngine;
-    
-    private final ExecuteEngine importerExecuteEngine;
-    
-    @SuppressWarnings("unchecked")
-    public RuleAlteredContext(final String jobId, final 
OnRuleAlteredActionConfiguration actionConfig) {
-        OnRuleAlteredActionConfiguration onRuleAlteredActionConfig = 
convertActionConfig(actionConfig);
-        this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
-        PipelineInputConfiguration inputConfig = 
onRuleAlteredActionConfig.getInput();
+    public AbstractPipelineProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
+        PipelineProcessConfiguration processConfig = 
convertProcessConfig(originalProcessConfig);
+        this.pipelineProcessConfig = processConfig;
+        PipelineInputConfiguration inputConfig = processConfig.getInput();
         AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
         inputRateLimitAlgorithm = null != inputRateLimiter ? 
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
-        PipelineOutputConfiguration outputConfig = 
onRuleAlteredActionConfig.getOutput();
+        PipelineOutputConfiguration outputConfig = processConfig.getOutput();
         AlgorithmConfiguration outputRateLimiter = 
outputConfig.getRateLimiter();
         outputRateLimitAlgorithm = null != outputRateLimiter ? 
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
-        AlgorithmConfiguration streamChannel = 
onRuleAlteredActionConfig.getStreamChannel();
+        AlgorithmConfiguration streamChannel = 
processConfig.getStreamChannel();
         pipelineChannelCreator = 
PipelineChannelCreatorFactory.newInstance(streamChannel);
-        AlgorithmConfiguration completionDetector = 
onRuleAlteredActionConfig.getCompletionDetector();
-        completionDetectAlgorithm = null != completionDetector ? 
JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
-        AlgorithmConfiguration dataConsistencyCheckerConfig = 
onRuleAlteredActionConfig.getDataConsistencyCalculator();
-        dataConsistencyCalculateAlgorithm = null != 
dataConsistencyCheckerConfig
-                ? 
DataConsistencyCalculateAlgorithmFactory.newInstance(dataConsistencyCheckerConfig.getType(),
 dataConsistencyCheckerConfig.getProps())
-                : null;
-        inventoryDumperExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(), 
INVENTORY_THREAD_PREFIX + jobId);
-        incrementalDumperExecuteEngine = 
ExecuteEngine.newCachedThreadInstance(INCREMENTAL_THREAD_PREFIX + jobId);
-        importerExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(), 
IMPORTER_THREAD_PREFIX + jobId);
+        inventoryDumperExecuteEngineLazyInitializer = new 
LazyInitializer<ExecuteEngine>() {
+            @Override
+            protected ExecuteEngine initialize() {
+                return 
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(), 
"Inventory-" + jobId);
+            }
+        };
+        incrementalDumperExecuteEngineLazyInitializer = new 
LazyInitializer<ExecuteEngine>() {
+            @Override
+            protected ExecuteEngine initialize() {
+                return ExecuteEngine.newCachedThreadInstance("Incremental-" + 
jobId);
+            }
+        };
+        importerExecuteEngineLazyInitializer = new 
LazyInitializer<ExecuteEngine>() {
+            @Override
+            protected ExecuteEngine initialize() {
+                return 
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(), 
"Importer-" + jobId);
+            }
+        };
     }
     
-    private OnRuleAlteredActionConfiguration convertActionConfig(final 
OnRuleAlteredActionConfiguration actionConfig) {
-        YamlOnRuleAlteredActionConfiguration yamlActionConfig = 
SWAPPER.swapToYamlConfiguration(actionConfig);
+    private PipelineProcessConfiguration convertProcessConfig(final 
PipelineProcessConfiguration originalProcessConfig) {
+        YamlPipelineProcessConfiguration yamlActionConfig = 
SWAPPER.swapToYamlConfiguration(originalProcessConfig);
         if (null == yamlActionConfig.getInput()) {
             
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
         } else {
@@ -116,4 +111,34 @@ public final class RuleAlteredContext {
         }
         return SWAPPER.swapToObject(yamlActionConfig);
     }
+    
+    /**
+     * Get inventory dumper execute engine.
+     *
+     * @return inventory dumper execute engine
+     */
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getInventoryDumperExecuteEngine() {
+        return inventoryDumperExecuteEngineLazyInitializer.get();
+    }
+    
+    /**
+     * Get incremental dumper execute engine.
+     *
+     * @return incremental dumper execute engine
+     */
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getIncrementalDumperExecuteEngine() {
+        return incrementalDumperExecuteEngineLazyInitializer.get();
+    }
+    
+    /**
+     * Get importer execute engine.
+     *
+     * @return importer execute engine
+     */
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getImporterExecuteEngine() {
+        return importerExecuteEngineLazyInitializer.get();
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
new file mode 100644
index 00000000000..866173c06b4
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+/**
+ * Pipeline process context.
+ */
+public interface PipelineProcessContext {
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index b6e0c69070d..81991f5b800 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -21,99 +21,36 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.context.AbstractPipelineProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
-import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
 import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
 
-import java.util.Properties;
-
 /**
  * Rule altered context.
  */
 @Getter
 @Slf4j
-// TODO extract Pipeline Context
-public final class RuleAlteredContext {
-    
-    private static final String INVENTORY_THREAD_PREFIX = "Inventory-";
-    
-    private static final String INCREMENTAL_THREAD_PREFIX = "Incremental-";
-    
-    private static final String IMPORTER_THREAD_PREFIX = "Importer-";
+public final class RuleAlteredContext extends AbstractPipelineProcessContext {
     
     private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER = 
new YamlOnRuleAlteredActionConfigurationSwapper();
     
-    private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
-    
-    private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
-    
-    private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
-    
-    private final PipelineChannelCreator pipelineChannelCreator;
-    
     private final 
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter> 
completionDetectAlgorithm;
     
     private final DataConsistencyCalculateAlgorithm 
dataConsistencyCalculateAlgorithm;
     
-    private final ExecuteEngine inventoryDumperExecuteEngine;
-    
-    private final ExecuteEngine incrementalDumperExecuteEngine;
-    
-    private final ExecuteEngine importerExecuteEngine;
-    
     @SuppressWarnings("unchecked")
     public RuleAlteredContext(final String jobId, final 
OnRuleAlteredActionConfiguration actionConfig) {
-        OnRuleAlteredActionConfiguration onRuleAlteredActionConfig = 
convertActionConfig(actionConfig);
-        this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
-        PipelineInputConfiguration inputConfig = 
onRuleAlteredActionConfig.getInput();
-        AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
-        inputRateLimitAlgorithm = null != inputRateLimiter ? 
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
-        PipelineOutputConfiguration outputConfig = 
onRuleAlteredActionConfig.getOutput();
-        AlgorithmConfiguration outputRateLimiter = 
outputConfig.getRateLimiter();
-        outputRateLimitAlgorithm = null != outputRateLimiter ? 
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
-        AlgorithmConfiguration streamChannel = 
onRuleAlteredActionConfig.getStreamChannel();
-        pipelineChannelCreator = 
PipelineChannelCreatorFactory.newInstance(streamChannel);
-        AlgorithmConfiguration completionDetector = 
onRuleAlteredActionConfig.getCompletionDetector();
+        super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(), 
actionConfig.getOutput(), actionConfig.getStreamChannel()));
+        AlgorithmConfiguration completionDetector = 
actionConfig.getCompletionDetector();
         completionDetectAlgorithm = null != completionDetector ? 
JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
-        AlgorithmConfiguration dataConsistencyCheckerConfig = 
onRuleAlteredActionConfig.getDataConsistencyCalculator();
+        AlgorithmConfiguration dataConsistencyCheckerConfig = 
actionConfig.getDataConsistencyCalculator();
         dataConsistencyCalculateAlgorithm = null != 
dataConsistencyCheckerConfig
                 ? 
DataConsistencyCalculateAlgorithmFactory.newInstance(dataConsistencyCheckerConfig.getType(),
 dataConsistencyCheckerConfig.getProps())
                 : null;
-        inventoryDumperExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(), 
INVENTORY_THREAD_PREFIX + jobId);
-        incrementalDumperExecuteEngine = 
ExecuteEngine.newCachedThreadInstance(INCREMENTAL_THREAD_PREFIX + jobId);
-        importerExecuteEngine = 
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(), 
IMPORTER_THREAD_PREFIX + jobId);
-    }
-    
-    private OnRuleAlteredActionConfiguration convertActionConfig(final 
OnRuleAlteredActionConfiguration actionConfig) {
-        YamlOnRuleAlteredActionConfiguration yamlActionConfig = 
SWAPPER.swapToYamlConfiguration(actionConfig);
-        if (null == yamlActionConfig.getInput()) {
-            
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
-        } else {
-            yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
-        }
-        if (null == yamlActionConfig.getOutput()) {
-            
yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
-        } else {
-            yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
-        }
-        if (null == yamlActionConfig.getStreamChannel()) {
-            yamlActionConfig.setStreamChannel(new 
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new 
Properties()));
-        }
-        return SWAPPER.swapToObject(yamlActionConfig);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index a8c968b4aea..65f9b35d133 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -94,7 +94,7 @@ public final class RuleAlteredJobContext {
         this.initProgress = initProgress;
         this.dataSourceManager = dataSourceManager;
         this.jobPreparer = jobPreparer;
-        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, 
jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig());
+        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, 
jobShardingItem, ruleAlteredContext.getPipelineProcessConfig());
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 24493a664d3..c7e8633ab8e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -40,6 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetect
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
@@ -248,11 +249,11 @@ public final class RuleAlteredJobWorker {
      *
      * @param jobConfig job configuration
      * @param jobShardingItem job sharding item
-     * @param onRuleAlteredActionConfig action configuration
+     * @param pipelineProcessConfig pipeline process configuration
      * @return task configuration
      */
-    public static TaskConfiguration buildTaskConfig(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
-        return 
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
 jobShardingItem, onRuleAlteredActionConfig);
+    public static TaskConfiguration buildTaskConfig(final 
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+        return 
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
 jobShardingItem, pipelineProcessConfig);
     }
     
     private boolean hasUncompletedJobOfSameDatabaseName(final String 
databaseName) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 635711c39c8..fb4609f1dec 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -111,7 +111,7 @@ public final class InventoryTaskSplitter {
                                                                        final 
InventoryDumperConfiguration dumperConfig) {
         Collection<InventoryDumperConfiguration> result = new LinkedList<>();
         RuleAlteredContext ruleAlteredContext = 
jobContext.getRuleAlteredContext();
-        PipelineInputConfiguration inputConfig = 
ruleAlteredContext.getOnRuleAlteredActionConfig().getInput();
+        PipelineInputConfiguration inputConfig = 
ruleAlteredContext.getPipelineProcessConfig().getInput();
         int batchSize = inputConfig.getBatchSize();
         JobRateLimitAlgorithm rateLimitAlgorithm = 
ruleAlteredContext.getInputRateLimitAlgorithm();
         Collection<IngestPosition<?>> inventoryPositions = 
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataLoader);
@@ -192,7 +192,7 @@ public final class InventoryTaskSplitter {
         RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
         String sql = 
PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
                 .buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
-        int shardingSize = 
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getInput().getShardingSize();
+        int shardingSize = 
jobContext.getRuleAlteredContext().getPipelineProcessConfig().getInput().getShardingSize();
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement ps = connection.prepareStatement(sql)) {
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 920cedc9669..d51586c71d9 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -68,7 +68,7 @@ public final class RuleAlteredJobWorkerTest {
     
     @Test
     public void assertCreateRuleAlteredContextSuccess() {
-        
assertNotNull(RuleAlteredJobWorker.createRuleAlteredContext(JobConfigurationBuilder.createJobConfiguration()).getOnRuleAlteredActionConfig());
+        
assertNotNull(RuleAlteredJobWorker.createRuleAlteredContext(JobConfigurationBuilder.createJobConfiguration()).getPipelineProcessConfig());
     }
     
     @Test

Reply via email to