This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ae4afbd602a Refactor RuleAlteredJobWorker and MigrationContext (#20244)
ae4afbd602a is described below

commit ae4afbd602aa606819d3d3cf1277b6e643bf189c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 17 20:03:23 2022 +0800

    Refactor RuleAlteredJobWorker and MigrationContext (#20244)
---
 ...hardingRuleAlteredJobConfigurationPreparer.java |  1 -
 .../execute}/PipelineJobWorker.java                | 18 ++++++------
 .../data/pipeline/core/job/FinishedCheckJob.java   |  8 +++---
 .../PipelineContextManagerLifecycleListener.java   |  4 +--
 .../scenario/migration/MigrationJobAPI.java        | 12 ++++++++
 .../scenario/migration/MigrationJobAPIImpl.java    | 29 ++++++++++++--------
 .../migration/MigrationJobItemContext.java         |  4 +--
 ...onContext.java => MigrationProcessContext.java} |  6 ++--
 .../scenario/rulealtered/RuleAlteredJobWorker.java | 32 ++++++++++------------
 .../core/fixture/MigrationJobAPIFixture.java       |  7 +++++
 10 files changed, 71 insertions(+), 50 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index bb0aabdcfcc..36d60d5262b 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -151,7 +151,6 @@ public final class 
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
         return result;
     }
     
-    // TODO use jobConfig as parameter, jobShardingItem
     @Override
     public TaskConfiguration createTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         ShardingSpherePipelineDataSourceConfiguration sourceConfig = 
getSourceConfiguration(jobConfig);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
similarity index 75%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
index b4507e923f1..860fad8976f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.execute;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
-import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Pipeline job worker.
@@ -29,14 +30,12 @@ import 
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
 @Slf4j
 public final class PipelineJobWorker {
     
-    private static final RuleAlteredJobWorker INSTANCE = new 
RuleAlteredJobWorker();
-    
     private static final AtomicBoolean WORKER_INITIALIZED = new 
AtomicBoolean(false);
     
     /**
-     * Initialize job worker if necessary.
+     * Initialize job worker.
      */
-    public static void initWorkerIfNecessary() {
+    public static void initialize() {
         if (WORKER_INITIALIZED.get()) {
             return;
         }
@@ -45,7 +44,8 @@ public final class PipelineJobWorker {
                 return;
             }
             log.info("start worker initialization");
-            
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().register(INSTANCE);
+            EventBusContext eventBusContext = 
PipelineContext.getContextManager().getInstanceContext().getEventBusContext();
+            eventBusContext.register(RuleAlteredJobWorker.getInstance());
             new FinishedCheckJobExecutor().start();
             new PipelineJobExecutor().start();
             WORKER_INITIALIZED.set(true);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 01e2dbff8a8..79347621bc7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -24,9 +24,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCo
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -64,13 +64,13 @@ public final class FinishedCheckJob implements SimpleJob {
             try {
                 // TODO refactor: dispatch to different job types
                 MigrationJobConfiguration jobConfig = 
YamlMigrationJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
-                MigrationContext migrationContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
-                if (null == migrationContext.getCompletionDetectAlgorithm()) {
+                MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+                if (null == processContext.getCompletionDetectAlgorithm()) {
                     log.info("completionDetector not configured, auto switch 
will not be enabled. You could query job progress and switch config manually 
with DistSQL.");
                     continue;
                 }
                 RuleAlteredJobAlmostCompletedParameter parameter = new 
RuleAlteredJobAlmostCompletedParameter(jobInfo.getShardingTotalCount(), 
jobAPI.getJobProgress(jobConfig).values());
-                if 
(!migrationContext.getCompletionDetectAlgorithm().isAlmostCompleted(parameter)) 
{
+                if 
(!processContext.getCompletionDetectAlgorithm().isAlmostCompleted(parameter)) {
                     continue;
                 }
                 log.info("scaling job {} almost finished.", jobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 26bdfe40989..768a0d06dac 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.PipelineJobWorker;
+import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobWorker;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import 
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
@@ -42,6 +42,6 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
         PipelineContext.initModeConfig(modeConfig);
         PipelineContext.initContextManager(contextManager);
         // TODO init worker only if necessary, e.g. 1) rule altered action 
configured, 2) enabled job exists, 3) stopped job restarted
-        PipelineJobWorker.initWorkerIfNecessary();
+        PipelineJobWorker.initialize();
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index b945b00c1f2..05cc6397c99 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -19,9 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
@@ -36,6 +38,16 @@ public interface MigrationJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI,
     @Override
     MigrationJobConfiguration getJobConfiguration(String jobId);
     
+    /**
+     * Build task configuration.
+     *
+     * @param jobConfig job configuration
+     * @param jobShardingItem job sharding item
+     * @param pipelineProcessConfig pipeline process configuration
+     * @return task configuration
+     */
+    TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
+    
     /**
      * Get job progress.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index a2da0118ea7..6e5ff4de841 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
@@ -53,6 +54,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCon
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import org.apache.shardingsphere.infra.lock.LockContext;
 import org.apache.shardingsphere.infra.lock.LockDefinition;
 import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
@@ -129,6 +131,11 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         return 
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
     
+    @Override
+    public TaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+        return 
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
 jobShardingItem, pipelineProcessConfig);
+    }
+    
     @Override
     public List<JobInfo> list() {
         checkModeConfig();
@@ -193,8 +200,8 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     }
     
     private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
-        MigrationContext migrationContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
-        if (null != migrationContext.getCompletionDetectAlgorithm()) {
+        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        if (null != processContext.getCompletionDetectAlgorithm()) {
             throw new PipelineVerifyFailedException("It's not necessary to do 
it in auto mode.");
         }
     }
@@ -277,12 +284,12 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     @Override
     public boolean isDataConsistencyCheckNeeded(final 
MigrationJobConfiguration jobConfig) {
-        MigrationContext migrationContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
-        return isDataConsistencyCheckNeeded(migrationContext);
+        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        return isDataConsistencyCheckNeeded(processContext);
     }
     
-    private boolean isDataConsistencyCheckNeeded(final MigrationContext 
migrationContext) {
-        return null != migrationContext.getDataConsistencyCalculateAlgorithm();
+    private boolean isDataConsistencyCheckNeeded(final MigrationProcessContext 
processContext) {
+        return null != processContext.getDataConsistencyCalculateAlgorithm();
     }
     
     @Override
@@ -296,12 +303,12 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     
     @Override
     public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
MigrationJobConfiguration jobConfig) {
-        MigrationContext migrationContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
-        if (!isDataConsistencyCheckNeeded(migrationContext)) {
+        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        if (!isDataConsistencyCheckNeeded(processContext)) {
             log.info("DataConsistencyCalculatorAlgorithm is not configured, 
data consistency check is ignored.");
             return Collections.emptyMap();
         }
-        return dataConsistencyCheck(jobConfig, 
migrationContext.getDataConsistencyCalculateAlgorithm());
+        return dataConsistencyCheck(jobConfig, 
processContext.getDataConsistencyCalculateAlgorithm());
     }
     
     @Override
@@ -357,9 +364,9 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     @Override
     public void switchClusterConfiguration(final MigrationJobConfiguration 
jobConfig) {
         String jobId = jobConfig.getJobId();
-        MigrationContext migrationContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+        MigrationProcessContext processContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        if (isDataConsistencyCheckNeeded(migrationContext)) {
+        if (isDataConsistencyCheckNeeded(processContext)) {
             Optional<Boolean> checkResult = 
repositoryAPI.getJobCheckResult(jobId);
             if (!checkResult.isPresent() || !checkResult.get()) {
                 throw new PipelineVerifyFailedException("Data consistency 
check is not finished or failed.");
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c58ed2d740b..b1ea5766ef5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -64,7 +64,7 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
     
     private final MigrationJobConfiguration jobConfig;
     
-    private final MigrationContext jobProcessContext;
+    private final MigrationProcessContext jobProcessContext;
     
     private final PipelineDataSourceManager dataSourceManager;
     
@@ -93,7 +93,7 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
         this.shardingItem = jobShardingItem;
         this.initProgress = initProgress;
         this.dataSourceManager = dataSourceManager;
-        taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig, 
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
+        taskConfig = 
MigrationJobAPIFactory.getInstance().buildTaskConfiguration(jobConfig, 
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
similarity index 93%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index 3386acb1ae4..4d960e30305 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -31,11 +31,11 @@ import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActi
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
 
 /**
- * Migration context.
+ * Migration process context.
  */
 @Getter
 @Slf4j
-public final class MigrationContext extends AbstractPipelineProcessContext {
+public final class MigrationProcessContext extends 
AbstractPipelineProcessContext {
     
     private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER = 
new YamlOnRuleAlteredActionConfigurationSwapper();
     
@@ -44,7 +44,7 @@ public final class MigrationContext extends 
AbstractPipelineProcessContext {
     private final DataConsistencyCalculateAlgorithm 
dataConsistencyCalculateAlgorithm;
     
     @SuppressWarnings("unchecked")
-    public MigrationContext(final String jobId, final 
OnRuleAlteredActionConfiguration actionConfig) {
+    public MigrationProcessContext(final String jobId, final 
OnRuleAlteredActionConfiguration actionConfig) {
         super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(), 
actionConfig.getOutput(), actionConfig.getStreamChannel()));
         AlgorithmConfiguration completionDetector = 
actionConfig.getCompletionDetector();
         completionDetectAlgorithm = null != completionDetector ? 
JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 9e702a66c19..1ffee2397fa 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
@@ -35,13 +34,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
-import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -67,8 +64,19 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class RuleAlteredJobWorker {
     
+    private static final RuleAlteredJobWorker INSTANCE = new 
RuleAlteredJobWorker();
+    
     private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE = 
new YamlRuleConfigurationSwapperEngine();
     
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static RuleAlteredJobWorker getInstance() {
+        return INSTANCE;
+    }
+    
     /**
      * Is on rule altered action enabled.
      *
@@ -89,7 +97,7 @@ public final class RuleAlteredJobWorker {
      * @param jobConfig job configuration
      * @return rule altered context
      */
-    public static MigrationContext createRuleAlteredContext(final 
MigrationJobConfiguration jobConfig) {
+    public static MigrationProcessContext createRuleAlteredContext(final 
MigrationJobConfiguration jobConfig) {
         YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
         YamlRuleConfiguration yamlRuleConfig = null;
         for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
@@ -109,7 +117,7 @@ public final class RuleAlteredJobWorker {
             log.error("rule altered action enabled but actor is not 
configured, ignored, ruleConfig={}", ruleConfig);
             throw new PipelineJobCreationException("rule altered actor not 
configured");
         }
-        return new MigrationContext(jobConfig.getJobId(), 
onRuleAlteredActionConfig.get());
+        return new MigrationProcessContext(jobConfig.getJobId(), 
onRuleAlteredActionConfig.get());
     }
     
     /**
@@ -220,18 +228,6 @@ public final class RuleAlteredJobWorker {
         return result;
     }
     
-    /**
-     * Build task configuration.
-     *
-     * @param jobConfig job configuration
-     * @param jobShardingItem job sharding item
-     * @param pipelineProcessConfig pipeline process configuration
-     * @return task configuration
-     */
-    public static TaskConfiguration buildTaskConfig(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
-        return 
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
 jobShardingItem, pipelineProcessConfig);
-    }
-    
     private boolean hasUncompletedJobOfSameDatabaseName(final String 
databaseName) {
         boolean result = false;
         for (JobInfo each : MigrationJobAPIFactory.getInstance().list()) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 75887302fdf..599721022a6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
@@ -28,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
 
 import java.util.Collection;
 import java.util.List;
@@ -146,6 +148,11 @@ public final class MigrationJobAPIFixture implements 
MigrationJobAPI {
         return null;
     }
     
+    @Override
+    public TaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+        return null;
+    }
+    
     @Override
     public boolean isDefault() {
         return MigrationJobAPI.super.isDefault();

Reply via email to