This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ae4afbd602a Refactor RuleAlteredJobWorker and MigrationContext (#20244)
ae4afbd602a is described below
commit ae4afbd602aa606819d3d3cf1277b6e643bf189c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Aug 17 20:03:23 2022 +0800
Refactor RuleAlteredJobWorker and MigrationContext (#20244)
---
...hardingRuleAlteredJobConfigurationPreparer.java | 1 -
.../execute}/PipelineJobWorker.java | 18 ++++++------
.../data/pipeline/core/job/FinishedCheckJob.java | 8 +++---
.../PipelineContextManagerLifecycleListener.java | 4 +--
.../scenario/migration/MigrationJobAPI.java | 12 ++++++++
.../scenario/migration/MigrationJobAPIImpl.java | 29 ++++++++++++--------
.../migration/MigrationJobItemContext.java | 4 +--
...onContext.java => MigrationProcessContext.java} | 6 ++--
.../scenario/rulealtered/RuleAlteredJobWorker.java | 32 ++++++++++------------
.../core/fixture/MigrationJobAPIFixture.java | 7 +++++
10 files changed, 71 insertions(+), 50 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index bb0aabdcfcc..36d60d5262b 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -151,7 +151,6 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return result;
}
- // TODO use jobConfig as parameter, jobShardingItem
@Override
public TaskConfiguration createTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
ShardingSpherePipelineDataSourceConfiguration sourceConfig =
getSourceConfiguration(jobConfig);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
similarity index 75%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
index b4507e923f1..860fad8976f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/PipelineJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobWorker.java
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.execute;
-import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.core.execute.FinishedCheckJobExecutor;
-import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Pipeline job worker.
@@ -29,14 +30,12 @@ import
org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobExecutor;
@Slf4j
public final class PipelineJobWorker {
- private static final RuleAlteredJobWorker INSTANCE = new
RuleAlteredJobWorker();
-
private static final AtomicBoolean WORKER_INITIALIZED = new
AtomicBoolean(false);
/**
- * Initialize job worker if necessary.
+ * Initialize job worker.
*/
- public static void initWorkerIfNecessary() {
+ public static void initialize() {
if (WORKER_INITIALIZED.get()) {
return;
}
@@ -45,7 +44,8 @@ public final class PipelineJobWorker {
return;
}
log.info("start worker initialization");
-
PipelineContext.getContextManager().getInstanceContext().getEventBusContext().register(INSTANCE);
+ EventBusContext eventBusContext =
PipelineContext.getContextManager().getInstanceContext().getEventBusContext();
+ eventBusContext.register(RuleAlteredJobWorker.getInstance());
new FinishedCheckJobExecutor().start();
new PipelineJobExecutor().start();
WORKER_INITIALIZED.set(true);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index 01e2dbff8a8..79347621bc7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -24,9 +24,9 @@ import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCo
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -64,13 +64,13 @@ public final class FinishedCheckJob implements SimpleJob {
try {
// TODO refactor: dispatch to different job types
MigrationJobConfiguration jobConfig =
YamlMigrationJobConfigurationSwapper.swapToObject(jobInfo.getJobParameter());
- MigrationContext migrationContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- if (null == migrationContext.getCompletionDetectAlgorithm()) {
+ MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ if (null == processContext.getCompletionDetectAlgorithm()) {
log.info("completionDetector not configured, auto switch
will not be enabled. You could query job progress and switch config manually
with DistSQL.");
continue;
}
RuleAlteredJobAlmostCompletedParameter parameter = new
RuleAlteredJobAlmostCompletedParameter(jobInfo.getShardingTotalCount(),
jobAPI.getJobProgress(jobConfig).values());
- if
(!migrationContext.getCompletionDetectAlgorithm().isAlmostCompleted(parameter))
{
+ if
(!processContext.getCompletionDetectAlgorithm().isAlmostCompleted(parameter)) {
continue;
}
log.info("scaling job {} almost finished.", jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index 26bdfe40989..768a0d06dac 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -19,7 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.PipelineJobWorker;
+import org.apache.shardingsphere.data.pipeline.core.execute.PipelineJobWorker;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
@@ -42,6 +42,6 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
PipelineContext.initModeConfig(modeConfig);
PipelineContext.initContextManager(contextManager);
// TODO init worker only if necessary, e.g. 1) rule altered action
configured, 2) enabled job exists, 3) stopped job restarted
- PipelineJobWorker.initWorkerIfNecessary();
+ PipelineJobWorker.initialize();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
index b945b00c1f2..05cc6397c99 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPI.java
@@ -19,9 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
@@ -36,6 +38,16 @@ public interface MigrationJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI,
@Override
MigrationJobConfiguration getJobConfiguration(String jobId);
+ /**
+ * Build task configuration.
+ *
+ * @param jobConfig job configuration
+ * @param jobShardingItem job sharding item
+ * @param pipelineProcessConfig pipeline process configuration
+ * @return task configuration
+ */
+ TaskConfiguration buildTaskConfiguration(MigrationJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+
/**
* Get job progress.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index a2da0118ea7..6e5ff4de841 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -22,6 +22,7 @@ import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
@@ -53,6 +54,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobCon
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.lock.LockContext;
import org.apache.shardingsphere.infra.lock.LockDefinition;
import org.apache.shardingsphere.mode.lock.ExclusiveLockDefinition;
@@ -129,6 +131,11 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
return
YamlMigrationJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
+ @Override
+ public TaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ return
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
jobShardingItem, pipelineProcessConfig);
+ }
+
@Override
public List<JobInfo> list() {
checkModeConfig();
@@ -193,8 +200,8 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}
private void verifyManualMode(final MigrationJobConfiguration jobConfig) {
- MigrationContext migrationContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- if (null != migrationContext.getCompletionDetectAlgorithm()) {
+ MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ if (null != processContext.getCompletionDetectAlgorithm()) {
throw new PipelineVerifyFailedException("It's not necessary to do
it in auto mode.");
}
}
@@ -277,12 +284,12 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public boolean isDataConsistencyCheckNeeded(final
MigrationJobConfiguration jobConfig) {
- MigrationContext migrationContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- return isDataConsistencyCheckNeeded(migrationContext);
+ MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ return isDataConsistencyCheckNeeded(processContext);
}
- private boolean isDataConsistencyCheckNeeded(final MigrationContext
migrationContext) {
- return null != migrationContext.getDataConsistencyCalculateAlgorithm();
+ private boolean isDataConsistencyCheckNeeded(final MigrationProcessContext
processContext) {
+ return null != processContext.getDataConsistencyCalculateAlgorithm();
}
@Override
@@ -296,12 +303,12 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final
MigrationJobConfiguration jobConfig) {
- MigrationContext migrationContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
- if (!isDataConsistencyCheckNeeded(migrationContext)) {
+ MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ if (!isDataConsistencyCheckNeeded(processContext)) {
log.info("DataConsistencyCalculatorAlgorithm is not configured,
data consistency check is ignored.");
return Collections.emptyMap();
}
- return dataConsistencyCheck(jobConfig,
migrationContext.getDataConsistencyCalculateAlgorithm());
+ return dataConsistencyCheck(jobConfig,
processContext.getDataConsistencyCalculateAlgorithm());
}
@Override
@@ -357,9 +364,9 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
@Override
public void switchClusterConfiguration(final MigrationJobConfiguration
jobConfig) {
String jobId = jobConfig.getJobId();
- MigrationContext migrationContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
+ MigrationProcessContext processContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
- if (isDataConsistencyCheckNeeded(migrationContext)) {
+ if (isDataConsistencyCheckNeeded(processContext)) {
Optional<Boolean> checkResult =
repositoryAPI.getJobCheckResult(jobId);
if (!checkResult.isPresent() || !checkResult.get()) {
throw new PipelineVerifyFailedException("Data consistency
check is not finished or failed.");
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c58ed2d740b..b1ea5766ef5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -64,7 +64,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
private final MigrationJobConfiguration jobConfig;
- private final MigrationContext jobProcessContext;
+ private final MigrationProcessContext jobProcessContext;
private final PipelineDataSourceManager dataSourceManager;
@@ -93,7 +93,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
this.shardingItem = jobShardingItem;
this.initProgress = initProgress;
this.dataSourceManager = dataSourceManager;
- taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig,
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
+ taskConfig =
MigrationJobAPIFactory.getInstance().buildTaskConfiguration(jobConfig,
jobShardingItem, jobProcessContext.getPipelineProcessConfig());
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
similarity index 93%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
index 3386acb1ae4..4d960e30305 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationProcessContext.java
@@ -31,11 +31,11 @@ import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActi
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
/**
- * Migration context.
+ * Migration process context.
*/
@Getter
@Slf4j
-public final class MigrationContext extends AbstractPipelineProcessContext {
+public final class MigrationProcessContext extends
AbstractPipelineProcessContext {
private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER =
new YamlOnRuleAlteredActionConfigurationSwapper();
@@ -44,7 +44,7 @@ public final class MigrationContext extends
AbstractPipelineProcessContext {
private final DataConsistencyCalculateAlgorithm
dataConsistencyCalculateAlgorithm;
@SuppressWarnings("unchecked")
- public MigrationContext(final String jobId, final
OnRuleAlteredActionConfiguration actionConfig) {
+ public MigrationProcessContext(final String jobId, final
OnRuleAlteredActionConfiguration actionConfig) {
super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(),
actionConfig.getOutput(), actionConfig.getStreamChannel()));
AlgorithmConfiguration completionDetector =
actionConfig.getCompletionDetector();
completionDetectAlgorithm = null != completionDetector ?
JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 9e702a66c19..1ffee2397fa 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
@@ -35,13 +34,11 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetector;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
-import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -67,8 +64,19 @@ import java.util.stream.Collectors;
@Slf4j
public final class RuleAlteredJobWorker {
+ private static final RuleAlteredJobWorker INSTANCE = new
RuleAlteredJobWorker();
+
private static final YamlRuleConfigurationSwapperEngine SWAPPER_ENGINE =
new YamlRuleConfigurationSwapperEngine();
+ /**
+ * Get instance.
+ *
+ * @return instance
+ */
+ public static RuleAlteredJobWorker getInstance() {
+ return INSTANCE;
+ }
+
/**
* Is on rule altered action enabled.
*
@@ -89,7 +97,7 @@ public final class RuleAlteredJobWorker {
* @param jobConfig job configuration
* @return rule altered context
*/
- public static MigrationContext createRuleAlteredContext(final
MigrationJobConfiguration jobConfig) {
+ public static MigrationProcessContext createRuleAlteredContext(final
MigrationJobConfiguration jobConfig) {
YamlRootConfiguration targetRootConfig = getYamlRootConfig(jobConfig);
YamlRuleConfiguration yamlRuleConfig = null;
for (YamlRuleConfiguration each : targetRootConfig.getRules()) {
@@ -109,7 +117,7 @@ public final class RuleAlteredJobWorker {
log.error("rule altered action enabled but actor is not
configured, ignored, ruleConfig={}", ruleConfig);
throw new PipelineJobCreationException("rule altered actor not
configured");
}
- return new MigrationContext(jobConfig.getJobId(),
onRuleAlteredActionConfig.get());
+ return new MigrationProcessContext(jobConfig.getJobId(),
onRuleAlteredActionConfig.get());
}
/**
@@ -220,18 +228,6 @@ public final class RuleAlteredJobWorker {
return result;
}
- /**
- * Build task configuration.
- *
- * @param jobConfig job configuration
- * @param jobShardingItem job sharding item
- * @param pipelineProcessConfig pipeline process configuration
- * @return task configuration
- */
- public static TaskConfiguration buildTaskConfig(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
- return
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
jobShardingItem, pipelineProcessConfig);
- }
-
private boolean hasUncompletedJobOfSameDatabaseName(final String
databaseName) {
boolean result = false;
for (JobInfo each : MigrationJobAPIFactory.getInstance().list()) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 75887302fdf..599721022a6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
+import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
@@ -28,6 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import java.util.Collection;
import java.util.List;
@@ -146,6 +148,11 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
return null;
}
+ @Override
+ public TaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ return null;
+ }
+
@Override
public boolean isDefault() {
return MigrationJobAPI.super.isDefault();