This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dd568356ebb Extract PipelineJobItemAPI and impl for common usage 
(#20122)
dd568356ebb is described below

commit dd568356ebb74bf708aa60840f1d504438ebaba5
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 12 20:10:32 2022 +0800

    Extract PipelineJobItemAPI and impl for common usage (#20122)
---
 .../data/pipeline/core/api/PipelineJobAPI.java     | 31 +------
 ...PipelineJobAPI.java => PipelineJobItemAPI.java} | 36 +-------
 .../impl/InventoryIncrementalJobItemAPIImpl.java   | 97 ++++++++++++++++++++++
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 79 ++++--------------
 .../rulealtered/RuleAlteredJobPreparer.java        | 21 +++--
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   |  6 +-
 6 files changed, 136 insertions(+), 134 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 36e16b2e8eb..cdcaed2104e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -20,10 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api;
 import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
@@ -31,7 +28,7 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
  * Pipeline job API.
  */
 @SingletonSPI
-public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
+public interface PipelineJobAPI extends PipelineJobPublicAPI, 
PipelineJobItemAPI, TypedSPI {
     
     /**
      * Marshal pipeline job id.
@@ -54,30 +51,6 @@ public interface PipelineJobAPI extends 
PipelineJobPublicAPI, TypedSPI {
      * @param jobId job id
      * @return job configuration
      */
+    // TODO rename
     PipelineJobConfiguration getJobConfig(String jobId);
-    
-    /**
-     * Persist job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-    
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return job item progress
-     */
-    PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
-    
-    /**
-     * Update job item status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
similarity index 59%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
index 36e16b2e8eb..1a492b79c88 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobItemAPI.java
@@ -17,44 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api;
 
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 /**
- * Pipeline job API.
+ * Pipeline job item API.
  */
-@SingletonSPI
-public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
-    
-    /**
-     * Marshal pipeline job id.
-     *
-     * @param pipelineJobId pipeline job id
-     * @return marshaled text
-     */
-    String marshalJobId(PipelineJobId pipelineJobId);
-    
-    /**
-     * Extend job configuration.
-     *
-     * @param yamlJobConfig yaml job configuration
-     */
-    void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
-    
-    /**
-     * Get job configuration.
-     *
-     * @param jobId job id
-     * @return job configuration
-     */
-    PipelineJobConfiguration getJobConfig(String jobId);
+public interface PipelineJobItemAPI {
     
     /**
      * Persist job item progress.
@@ -68,7 +38,7 @@ public interface PipelineJobAPI extends PipelineJobPublicAPI, 
TypedSPI {
      *
      * @param jobId job id
      * @param shardingItem sharding item
-     * @return job item progress
+     * @return job item progress, may be null
      */
     PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
new file mode 100644
index 00000000000..1d1910bdcb5
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Inventory incremental job item API implementation.
+ */
+@Slf4j
+public final class InventoryIncrementalJobItemAPIImpl implements 
PipelineJobItemAPI {
+    
+    private static final YamlInventoryIncrementalJobItemProgressSwapper 
SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper();
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
+        InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
+        jobItemProgress.setStatus(jobItemContext.getStatus());
+        
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+        
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
+        
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
+        String value = 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), value);
+    }
+    
+    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<IncrementalTask> incrementalTasks) {
+        Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new 
HashMap<>();
+        for (IncrementalTask each : incrementalTasks) {
+            incrementalTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
+        }
+        return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
+    }
+    
+    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
Collection<InventoryTask> inventoryTasks) {
+        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
HashMap<>();
+        for (InventoryTask each : inventoryTasks) {
+            inventoryTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
+        }
+        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
+    }
+    
+    @Override
+    public InventoryIncrementalJobItemProgress getJobItemProgress(final String 
jobId, final int shardingItem) {
+        String data = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, 
shardingItem);
+        if (StringUtils.isBlank(data)) {
+            return null;
+        }
+        return SWAPPER.swapToObject(YamlEngine.unmarshal(data, 
YamlInventoryIncrementalJobItemProgress.class));
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+        InventoryIncrementalJobItemProgress jobItemProgress = 
getJobItemProgress(jobId, shardingItem);
+        if (null == jobItemProgress) {
+            log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        jobItemProgress.setStatus(status);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
shardingItem, 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 6e17cf8a6f9..1f3576de972 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -21,8 +21,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
@@ -36,27 +34,20 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemInventoryTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
-import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
@@ -75,7 +66,6 @@ import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -92,7 +82,7 @@ import java.util.stream.Stream;
 @Slf4j
 public final class RuleAlteredJobAPIImpl extends AbstractPipelineJobAPIImpl 
implements RuleAlteredJobAPI {
     
-    private static final YamlInventoryIncrementalJobItemProgressSwapper 
SWAPPER = new YamlInventoryIncrementalJobItemProgressSwapper();
+    private final PipelineJobItemAPI jobItemAPI = new 
InventoryIncrementalJobItemAPIImpl();
     
     @Override
     protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -206,6 +196,21 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         }, LinkedHashMap::putAll);
     }
     
+    @Override
+    public InventoryIncrementalJobItemProgress getJobItemProgress(final String 
jobId, final int shardingItem) {
+        return (InventoryIncrementalJobItemProgress) 
jobItemAPI.getJobItemProgress(jobId, shardingItem);
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        jobItemAPI.persistJobItemProgress(jobItemContext);
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+        jobItemAPI.updateJobItemStatus(jobId, shardingItem, status);
+    }
+    
     private void verifyManualMode(final RuleAlteredJobConfiguration jobConfig) 
{
         RuleAlteredContext ruleAlteredContext = 
RuleAlteredJobWorker.createRuleAlteredContext(jobConfig);
         if (null != ruleAlteredContext.getCompletionDetectAlgorithm()) {
@@ -406,54 +411,6 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         return 
RuleAlteredJobConfigurationSwapper.swapToObject(jobConfigPOJO.getJobParameter());
     }
     
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
-        InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
-        jobItemProgress.setStatus(jobItemContext.getStatus());
-        
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
-        
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
-        
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
-        String value = 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), value);
-    }
-    
-    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<IncrementalTask> incrementalTasks) {
-        Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new 
HashMap<>();
-        for (IncrementalTask each : incrementalTasks) {
-            incrementalTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
-        }
-        return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
-    }
-    
-    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
Collection<InventoryTask> inventoryTasks) {
-        Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
HashMap<>();
-        for (InventoryTask each : inventoryTasks) {
-            inventoryTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
-        }
-        return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
-    }
-    
-    @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String 
jobId, final int shardingItem) {
-        String data = 
PipelineAPIFactory.getGovernanceRepositoryAPI().getJobItemProgress(jobId, 
shardingItem);
-        if (StringUtils.isBlank(data)) {
-            return null;
-        }
-        return SWAPPER.swapToObject(YamlEngine.unmarshal(data, 
YamlInventoryIncrementalJobItemProgress.class));
-    }
-    
-    @Override
-    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        InventoryIncrementalJobItemProgress jobItemProgress = 
getJobItemProgress(jobId, shardingItem);
-        if (null == jobItemProgress) {
-            log.warn("updateJobItemStatus, jobItemProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
-            return;
-        }
-        jobItemProgress.setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
shardingItem, 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
-    }
-    
     @Override
     public String getType() {
         return JobType.MIGRATION.getTypeName();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index ddd31ebe961..304bc4fcc32 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,14 +18,15 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
@@ -54,6 +55,8 @@ import java.util.Collections;
 @Slf4j
 public final class RuleAlteredJobPreparer {
     
+    private static final RuleAlteredJobAPI JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    
     /**
      * Do prepare work for scaling job.
      *
@@ -88,21 +91,21 @@ public final class RuleAlteredJobPreparer {
         String lockName = "prepare-" + jobConfig.getJobId();
         LockContext lockContext = 
PipelineContext.getContextManager().getInstanceContext().getLockContext();
         LockDefinition lockDefinition = new ExclusiveLockDefinition(lockName);
-        
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+        JOB_API.persistJobItemProgress(jobItemContext);
         if (lockContext.tryLock(lockDefinition, 180000)) {
             log.info("try lock success, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobItemContext.getShardingItem());
             try {
-                InventoryIncrementalJobItemProgress jobItemProgress = 
RuleAlteredJobAPIFactory.getInstance().getJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem());
-                boolean prepareFlag = 
JobStatus.PREPARING.equals(jobItemProgress.getStatus()) || 
JobStatus.RUNNING.equals(jobItemProgress.getStatus())
+                InventoryIncrementalJobItemProgress jobItemProgress = 
JOB_API.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+                boolean prepareFlag = null == jobItemProgress || 
JobStatus.PREPARING.equals(jobItemProgress.getStatus()) || 
JobStatus.RUNNING.equals(jobItemProgress.getStatus())
                         || 
JobStatus.PREPARING_FAILURE.equals(jobItemProgress.getStatus());
                 if (prepareFlag) {
                     log.info("execute prepare, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobItemContext.getShardingItem());
                     jobItemContext.setStatus(JobStatus.PREPARING);
-                    
RuleAlteredJobAPIFactory.getInstance().updateJobItemStatus(jobConfig.getJobId(),
 jobItemContext.getShardingItem(), JobStatus.PREPARING);
+                    JOB_API.updateJobItemStatus(jobConfig.getJobId(), 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
                     // TODO Loop insert zookeeper performance is not good
                     for (int i = 0; i <= 
jobItemContext.getJobConfig().getJobShardingCount(); i++) {
-                        
RuleAlteredJobAPIFactory.getInstance().updateJobItemStatus(jobConfig.getJobId(),
 i, JobStatus.PREPARE_SUCCESS);
+                        JOB_API.updateJobItemStatus(jobConfig.getJobId(), i, 
JobStatus.PREPARE_SUCCESS);
                     }
                 }
             } finally {
@@ -159,8 +162,8 @@ public final class RuleAlteredJobPreparer {
         ExecuteEngine incrementalDumperExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalDumperExecuteEngine();
         TaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
-        JobItemIncrementalTasksProgress incrementalTasksProgress = 
jobItemContext.getInitProgress() == null ? null : 
jobItemContext.getInitProgress().getIncremental();
-        
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(incrementalTasksProgress,
 taskConfig.getDumperConfig(), dataSourceManager));
+        JobItemIncrementalTasksProgress initIncremental = 
jobItemContext.getInitProgress() == null ? null : 
jobItemContext.getInitProgress().getIncremental();
+        
taskConfig.getDumperConfig().setPosition(PipelineJobPreparerUtils.getIncrementalPosition(initIncremental,
 taskConfig.getDumperConfig(), dataSourceManager));
         PipelineTableMetaDataLoader sourceMetaDataLoader = 
jobItemContext.getSourceMetaDataLoader();
         DefaultPipelineJobProgressListener jobProgressListener = new 
DefaultPipelineJobProgressListener(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         IncrementalTask incrementalTask = new 
IncrementalTask(taskConfig.getImporterConfig().getConcurrency(),
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 6d364a790ab..ccd3a70d520 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -55,6 +55,7 @@ import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -259,7 +260,8 @@ public final class RuleAlteredJobAPIImplTest {
         RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new PipelineDataSourceManager());
         ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
         ruleAlteredJobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
-        InventoryIncrementalJobItemProgress jobProgress = 
ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
-        assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
+        InventoryIncrementalJobItemProgress actual = 
ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+        assertNotNull(actual);
+        assertThat(actual.getStatus(), is(JobStatus.FINISHED));
     }
 }

Reply via email to