This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dd568356ebb Extract PipelineJobItemAPI and impl for common usage
(#20122)
dd568356ebb is described below
commit dd568356ebb74bf708aa60840f1d504438ebaba5
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 12 20:10:32 2022 +0800
Extract PipelineJobItemAPI and impl for common usage (#20122)
---
.../data/pipeline/core/api/PipelineJobAPI.java | 31 +------
...PipelineJobAPI.java => PipelineJobItemAPI.java} | 36 +-------
.../impl/InventoryIncrementalJobItemAPIImpl.java | 97 ++++++++++++++++++++++
.../core/api/impl/RuleAlteredJobAPIImpl.java | 79 ++++--------------
.../rulealtered/RuleAlteredJobPreparer.java | 21 +++--
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 6 +-
6 files changed, 136 insertions(+), 134 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 36e16b2e8eb..cdcaed2104e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -20,10 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
@@ -31,7 +28,7 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
* Pipeline job API.
*/
@SingletonSPI
-public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
+public interface PipelineJobAPI extends PipelineJobPublicAPI,
PipelineJobItemAPI, TypedSPI {
/**
* Marshal pipeline job id.
@@ -54,30 +51,6 @@ public interface PipelineJobAPI extends
PipelineJobPublicAPI, TypedSPI {
* @param jobId job id
* @return job configuration
*/
+ // TODO rename
PipelineJobConfiguration getJobConfig(String jobId);
-
- /**
- * Persist job item progress.
- *
- * @param jobItemContext job item context
- */
- void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return job item progress
- */
- PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
-
- /**
- * Update job item status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
similarity index 59%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
index 36e16b2e8eb..1a492b79c88 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
@@ -17,44 +17,14 @@
package org.apache.shardingsphere.data.pipeline.core.api;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
/**
- * Pipeline job API.
+ * Pipeline job item API.
*/
-@SingletonSPI
-public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
-
- /**
- * Marshal pipeline job id.
- *
- * @param pipelineJobId pipeline job id
- * @return marshaled text
- */
- String marshalJobId(PipelineJobId pipelineJobId);
-
- /**
- * Extend job configuration.
- *
- * @param yamlJobConfig yaml job configuration
- */
- void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
-
- /**
- * Get job configuration.
- *
- * @param jobId job id
- * @return job configuration
- */
- PipelineJobConfiguration getJobConfig(String jobId);
+public interface PipelineJobItemAPI {
/**
* Persist job item progress.
@@ -68,7 +38,7 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI,
TypedSPI {
*
* @param jobId job id
* @param shardingItem sharding item
- * @return job item progress
+ * @return job item progress, may be null
*/
PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
new file mode 100644
index 00000000000..1d1910bdcb5
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Inventory incremental job item API implementation.
+ */
+@Slf4j
+public final class InventoryIncrementalJobItemAPIImpl implements
PipelineJobItemAPI {
+
+ private static final YamlInventoryIncrementalJobItemProgressSwapper
SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper();
+
+ @Override
+ public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+ InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
+ InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
+ jobItemProgress.setStatus(jobItemContext.getStatus());
+
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
+
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
+ String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), value);
+ }
+
+ private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<IncrementalTask> incrementalTasks) {
+ Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new
HashMap<>();
+ for (IncrementalTask each : incrementalTasks) {
+ incrementalTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
+ }
+ return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
+ }
+
+ private JobItemInventoryTasksProgress getInventoryTasksProgress(final
Collection<InventoryTask> inventoryTasks) {
+ Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new
HashMap<>();
+ for (InventoryTask each : inventoryTasks) {
+ inventoryTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
+ }
+ return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+ }
+
+ @Override
+ public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
+ String data =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
+ if (StringUtils.isBlank(data)) {
+ return null;
+ }
+ return SWAPPER.swapToObject(YamlEngine.unmarshal(data,
YamlInventoryIncrementalJobItemProgress.class));
+ }
+
+ @Override
+ public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
+ InventoryIncrementalJobItemProgress jobItemProgress =
getJobItemProgress(jobId, shardingItem);
+ if (null == jobItemProgress) {
+ log.warn("updateJobItemStatus, jobItemProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
+ return;
+ }
+ jobItemProgress.setStatus(status);
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 6e17cf8a6f9..1f3576de972 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,8 +21,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
@@ -36,27 +34,20 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
@@ -75,7 +66,6 @@ import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -92,7 +82,7 @@ import java.util.stream.Stream;
@Slf4j
public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl
implements RuleAlteredJobAPI {
- private static final YamlInventoryIncrementalJobItemProgressSwapper
SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper();
+ private final PipelineJobItemAPI jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -206,6 +196,21 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
}, LinkedHashMap::putAll);
}
+ @Override
+ public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
+ return (InventoryIncrementalJobItemProgress)
jobItemAPI.getJobItemProgress(jobId, shardingItem);
+ }
+
+ @Override
+ public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+ jobItemAPI.persistJobItemProgress(jobItemContext);
+ }
+
+ @Override
+ public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
+ jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+ }
+
private void verifyManualMode(final RuleAlteredJobConfiguration jobConfig)
{
RuleAlteredContext ruleAlteredContext =
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
@@ -406,54 +411,6 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
return
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
}
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
- InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
- jobItemProgress.setStatus(jobItemContext.getStatus());
-
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
-
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
-
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
- String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), value);
- }
-
- private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<IncrementalTask> incrementalTasks) {
- Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new
HashMap<>();
- for (IncrementalTask each : incrementalTasks) {
- incrementalTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
- }
- return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
- }
-
- private JobItemInventoryTasksProgress getInventoryTasksProgress(final
Collection<InventoryTask> inventoryTasks) {
- Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new
HashMap<>();
- for (InventoryTask each : inventoryTasks) {
- inventoryTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
- }
- return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
- }
-
- @Override
- public InventoryIncrementalJobItemProgress getJobItemProgress(final String
jobId, final int shardingItem) {
- String data =
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId,
shardingItem);
- if (StringUtils.isBlank(data)) {
- return null;
- }
- return SWAPPER.swapToObject(YamlEngine.unmarshal(data,
YamlInventoryIncrementalJobItemProgress.class));
- }
-
- @Override
- public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- InventoryIncrementalJobItemProgress jobItemProgress =
getJobItemProgress(jobId, shardingItem);
- if (null == jobItemProgress) {
- log.warn("updateJobItemStatus, jobItemProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
- return;
- }
- jobItemProgress.setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
- }
-
@Override
public String getType() {
return JobType.MIGRATION.getTypeName();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index ddd31ebe961..304bc4fcc32 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,14 +18,15 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
@@ -54,6 +55,8 @@ import java.util.Collections;
@Slf4j
public final class RuleAlteredJobPreparer {
+ private static final RuleAlteredJobAPI JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+
/**
* Do prepare work for scaling job.
*
@@ -88,21 +91,21 @@ public final class RuleAlteredJobPreparer {
String lockName = "prepare-" + jobConfig.getJobId();
LockContext lockContext =
PipelineContext.getContextManager().getInstanceContext().getLockContext();
LockDefinition lockDefinition = new ExclusiveLockDefinition(lockName);
-
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+ JOB_API.persistJobItemProgress(jobItemContext);
if (lockContext.tryLock(lockDefinition, 180000)) {
log.info("try lock success, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobItemContext.getShardingItem());
try {
- InventoryIncrementalJobItemProgress jobItemProgress =
RuleAlteredJobAPIFactory.getInstance().getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- boolean prepareFlag =
JobStatus.PREPARING.equals(jobItemProgress.getStatus()) ||
JobStatus.RUNNING.equals(jobItemProgress.getStatus())
+ InventoryIncrementalJobItemProgress jobItemProgress =
JOB_API.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ boolean prepareFlag = null == jobItemProgress ||
JobStatus.PREPARING.equals(jobItemProgress.getStatus()) ||
JobStatus.RUNNING.equals(jobItemProgress.getStatus())
||
JobStatus.PREPARING_FAILURE.equals(jobItemProgress.getStatus());
if (prepareFlag) {
log.info("execute prepare, jobId={}, shardingItem={}",
jobConfig.getJobId(), jobItemContext.getShardingItem());
jobItemContext.setStatus(JobStatus.PREPARING);
-
RuleAlteredJobAPIFactory.getInstance().updateJobItemStatus(jobConfig.getJobId(),
jobItemContext.getShardingItem(), JobStatus.PREPARING);
+ JOB_API.updateJobItemStatus(jobConfig.getJobId(),
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
// TODO Loop insert zookeeper performance is not good
for (int i = 0; i <=
jobItemContext.getJobConfig().getJobShardingCount(); i++) {
-
RuleAlteredJobAPIFactory.getInstance().updateJobItemStatus(jobConfig.getJobId(),
i, JobStatus.PREPARE_SUCCESS);
+ JOB_API.updateJobItemStatus(jobConfig.getJobId(), i,
JobStatus.PREPARE_SUCCESS);
}
}
} finally {
@@ -159,8 +162,8 @@ public final class RuleAlteredJobPreparer {
ExecuteEngine incrementalDumperExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
TaskConfiguration taskConfig = jobItemContext.getTaskConfig();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
- JobItemIncrementalTasksProgress incrementalTasksProgress =
jobItemContext.getInitProgress() == null ? null :
jobItemContext.getInitProgress().getIncremental();
-
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(incrementalTasksProgress,
taskConfig.getDumperConfig(), dataSourceManager));
+ JobItemIncrementalTasksProgress initIncremental =
jobItemContext.getInitProgress() == null ? null :
jobItemContext.getInitProgress().getIncremental();
+
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
taskConfig.getDumperConfig(), dataSourceManager));
PipelineTableMetaDataLoader sourceMetaDataLoader =
jobItemContext.getSourceMetaDataLoader();
DefaultPipelineJobProgressListener jobProgressListener = new
DefaultPipelineJobProgressListener(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
IncrementalTask incrementalTask = new
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 6d364a790ab..ccd3a70d520 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -55,6 +55,7 @@ import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -259,7 +260,8 @@ public final class RuleAlteredJobAPIImplTest {
RuleAlteredJobContext jobItemContext = new
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(),
new PipelineDataSourceManager());
ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
ruleAlteredJobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
- InventoryIncrementalJobItemProgress jobProgress =
ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
- assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
+ InventoryIncrementalJobItemProgress actual =
ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ assertNotNull(actual);
+ assertThat(actual.getStatus(), is(JobStatus.FINISHED));
}
}