This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d428210659 Add InventoryIncrementalJobManager (#29079)
5d428210659 is described below

commit 5d428210659a826e82157805f5b632714d9c9812
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 23:02:15 2023 +0800

    Add InventoryIncrementalJobManager (#29079)
---
 .../job/service/InventoryIncrementalJobAPI.java    |  62 ----------
 ...pl.java => InventoryIncrementalJobManager.java} | 128 +++++++++------------
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  99 +---------------
 .../handler/query/ShowStreamingRuleExecutor.java   |   4 +-
 .../ShowMigrationCheckAlgorithmsExecutor.java      |   6 +-
 .../handler/update/CheckMigrationJobUpdater.java   |   4 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |   4 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |   8 +-
 .../migration/api/impl/MigrationJobAPI.java        |   3 +-
 .../MigrationDataConsistencyChecker.java           |   3 +-
 .../migration/prepare/MigrationJobPreparer.java    |   7 +-
 .../ral/queryable/ShowMigrationRuleExecutor.java   |   4 +-
 .../AlterInventoryIncrementalRuleUpdater.java      |   6 +-
 .../migration/api/impl/MigrationJobAPITest.java    |  14 ++-
 14 files changed, 99 insertions(+), 253 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index fa4594f7eab..0569315a40f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -22,21 +22,15 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipeli
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Inventory incremental job API.
@@ -83,46 +77,6 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      */
     void extendYamlJobConfiguration(PipelineContextKey contextKey, 
YamlPipelineJobConfiguration yamlJobConfig);
     
-    /**
-     * Alter process configuration.
-     *
-     * @param contextKey context key
-     * @param processConfig process configuration
-     */
-    void alterProcessConfiguration(PipelineContextKey contextKey, 
PipelineProcessConfiguration processConfig);
-    
-    /**
-     * Show process configuration.
-     *
-     * @param contextKey context key
-     * @return process configuration, non-null
-     */
-    PipelineProcessConfiguration showProcessConfiguration(PipelineContextKey 
contextKey);
-    
-    /**
-     * Persist job offset info.
-     *
-     * @param jobId job ID
-     * @param jobOffsetInfo job offset info
-     */
-    void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
-    
-    /**
-     * Get job offset info.
-     *
-     * @param jobId job ID
-     * @return job offset progress
-     */
-    JobOffsetInfo getJobOffsetInfo(String jobId);
-    
-    /**
-     * Get job progress.
-     *
-     * @param pipelineJobConfig job configuration
-     * @return each sharding item progress
-     */
-    Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(PipelineJobConfiguration pipelineJobConfig);
-    
     /**
      * Get job infos.
      *
@@ -131,13 +85,6 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      */
     List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);
     
-    /**
-     * List all data consistency check algorithms from SPI.
-     *
-     * @return data consistency check algorithms
-     */
-    Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms();
-    
     /**
      * Build pipeline data consistency checker.
      *
@@ -149,15 +96,6 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
     PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, 
InventoryIncrementalProcessContext processContext,
                                                                        
ConsistencyCheckJobItemProgressContext progressContext);
     
-    /**
-     * Aggregate data consistency check results.
-     *
-     * @param jobId job ID
-     * @param checkResults check results
-     * @return check success or not
-     */
-    boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, 
TableDataConsistencyCheckResult> checkResults);
-    
     /**
      * Commit pipeline job.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
similarity index 56%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index c3d0b1d689a..efe7ce8af46 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -15,28 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
+package org.apache.shardingsphere.data.pipeline.core.job.service;
 
-import lombok.extern.slf4j.Slf4j;
+import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -47,37 +41,50 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
- * Abstract inventory incremental job API implementation.
+ * Inventory incremental job manager.
  */
-@Slf4j
-public abstract class AbstractInventoryIncrementalJobAPIImpl implements 
InventoryIncrementalJobAPI {
+@RequiredArgsConstructor
+public final class InventoryIncrementalJobManager {
     
-    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+    private final PipelineJobAPI jobAPI;
     
-    private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new 
YamlJobOffsetInfoSwapper();
+    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
-    @Override
+    /**
+     * Alter process configuration.
+     *
+     * @param contextKey context key
+     * @param processConfig process configuration
+     */
     public void alterProcessConfiguration(final PipelineContextKey contextKey, 
final PipelineProcessConfiguration processConfig) {
         // TODO check rateLimiter type match or not
-        processConfigPersistService.persist(contextKey, getType(), 
processConfig);
+        processConfigPersistService.persist(contextKey, jobAPI.getType(), 
processConfig);
     }
     
-    @Override
+    /**
+     * Show process configuration.
+     *
+     * @param contextKey context key
+     * @return process configuration, non-null
+     */
     public PipelineProcessConfiguration showProcessConfiguration(final 
PipelineContextKey contextKey) {
-        return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 getType()));
+        return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 jobAPI.getType()));
     }
     
-    @Override
+    /**
+     * Get job progress.
+     *
+     * @param jobConfig pipeline job configuration
+     * @return each sharding item progress
+     */
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final PipelineJobConfiguration jobConfig) {
-        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
         String jobId = jobConfig.getJobId();
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
@@ -87,52 +94,33 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         }, LinkedHashMap::putAll);
     }
     
-    @Override
-    public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
-        PipelineJobManager jobManager = new PipelineJobManager(this);
-        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobConfigPOJO);
-        long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
-        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
-        List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
-        for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
-            int shardingItem = entry.getKey();
-            TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
getJobInfo(jobId);
-            InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
-            String errorMessage = jobItemManager.getErrorMessage(jobId, 
shardingItem);
-            if (null == jobItemProgress) {
-                result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
-                continue;
-            }
-            int inventoryFinishedPercentage = 0;
-            if (JobStatus.EXECUTE_INCREMENTAL_TASK == 
jobItemProgress.getStatus() || JobStatus.FINISHED == 
jobItemProgress.getStatus()) {
-                inventoryFinishedPercentage = 100;
-            } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != 
jobItemProgress.getInventoryRecordsCount()) {
-                inventoryFinishedPercentage = (int) Math.min(100, 
jobItemProgress.getProcessedRecordsCount() * 100 / 
jobItemProgress.getInventoryRecordsCount());
-            }
-            result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), jobItemProgress, startTimeMillis, 
inventoryFinishedPercentage, errorMessage));
-        }
-        return result;
-    }
-    
-    @Override
+    /**
+     * Persist job offset info.
+     *
+     * @param jobId job ID
+     * @param jobOffsetInfo job offset info
+     */
     public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
-        String value = 
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
+        String value = YamlEngine.marshal(new 
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo));
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
 value);
     }
     
-    @Override
+    /**
+     * Get job offset info.
+     *
+     * @param jobId job ID
+     * @return job offset progress
+     */
     public JobOffsetInfo getJobOffsetInfo(final String jobId) {
         Optional<String> offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
-        if (offsetInfo.isPresent()) {
-            YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), 
YamlJobOffsetInfo.class);
-            return jobOffsetInfoSwapper.swapToObject(info);
-        }
-        return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
+        return new 
YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ? 
YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new 
YamlJobOffsetInfo());
     }
     
-    @Override
+    /**
+     * List all data consistency check algorithms from SPI.
+     *
+     * @return data consistency check algorithms
+     */
     public Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms() {
         Collection<DataConsistencyCheckAlgorithmInfo> result = new 
LinkedList<>();
         for (TableDataConsistencyChecker each : 
ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class))
 {
@@ -147,17 +135,15 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         return supportedDatabaseTypes.isEmpty() ? 
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : 
supportedDatabaseTypes;
     }
     
-    @Override
+    /**
+     * Aggregate data consistency check results.
+     *
+     * @param jobId job ID
+     * @param checkResults check results
+     * @return check success or not
+     */
     public boolean aggregateDataConsistencyCheckResults(final String jobId, 
final Map<String, TableDataConsistencyCheckResult> checkResults) {
-        if (checkResults.isEmpty()) {
-            throw new IllegalArgumentException("checkResults empty, jobId:" + 
jobId);
-        }
-        for (Entry<String, TableDataConsistencyCheckResult> entry : 
checkResults.entrySet()) {
-            TableDataConsistencyCheckResult checkResult = entry.getValue();
-            if (!checkResult.isMatched()) {
-                return false;
-            }
-        }
-        return true;
+        Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults 
empty, jobId:", jobId);
+        return 
checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index c3d0b1d689a..8562706aafa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,85 +17,39 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
-import java.util.Collection;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Abstract inventory incremental job API implementation.
  */
-@Slf4j
 public abstract class AbstractInventoryIncrementalJobAPIImpl implements 
InventoryIncrementalJobAPI {
     
-    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-    
-    private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new 
YamlJobOffsetInfoSwapper();
-    
-    @Override
-    public void alterProcessConfiguration(final PipelineContextKey contextKey, 
final PipelineProcessConfiguration processConfig) {
-        // TODO check rateLimiter type match or not
-        processConfigPersistService.persist(contextKey, getType(), 
processConfig);
-    }
-    
-    @Override
-    public PipelineProcessConfiguration showProcessConfiguration(final 
PipelineContextKey contextKey) {
-        return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 getType()));
-    }
-    
-    @Override
-    public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final PipelineJobConfiguration jobConfig) {
-        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
-        String jobId = jobConfig.getJobId();
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        return IntStream.range(0, 
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map, 
each) -> {
-            Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobId, each);
-            jobItemProgress.ifPresent(optional -> 
optional.setActive(!jobConfigPOJO.isDisabled()));
-            map.put(each, jobItemProgress.orElse(null));
-        }, LinkedHashMap::putAll);
-    }
-    
     @Override
     public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
         PipelineJobManager jobManager = new PipelineJobManager(this);
-        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobConfigPOJO);
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
-        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
+        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(this);
+        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
inventoryIncrementalJobManager.getJobProgress(jobConfig);
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
+        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
getJobInfo(jobId);
@@ -115,49 +69,4 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         }
         return result;
     }
-    
-    @Override
-    public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo 
jobOffsetInfo) {
-        String value = 
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
 value);
-    }
-    
-    @Override
-    public JobOffsetInfo getJobOffsetInfo(final String jobId) {
-        Optional<String> offsetInfo = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
-        if (offsetInfo.isPresent()) {
-            YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(), 
YamlJobOffsetInfo.class);
-            return jobOffsetInfoSwapper.swapToObject(info);
-        }
-        return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
-    }
-    
-    @Override
-    public Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms() {
-        Collection<DataConsistencyCheckAlgorithmInfo> result = new 
LinkedList<>();
-        for (TableDataConsistencyChecker each : 
ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class))
 {
-            SPIDescription description = 
each.getClass().getAnnotation(SPIDescription.class);
-            String typeAliases = 
each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(","));
-            result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(), 
typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null 
== description ? "" : description.value()));
-        }
-        return result;
-    }
-    
-    private Collection<DatabaseType> getSupportedDatabaseTypes(final 
Collection<DatabaseType> supportedDatabaseTypes) {
-        return supportedDatabaseTypes.isEmpty() ? 
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) : 
supportedDatabaseTypes;
-    }
-    
-    @Override
-    public boolean aggregateDataConsistencyCheckResults(final String jobId, 
final Map<String, TableDataConsistencyCheckResult> checkResults) {
-        if (checkResults.isEmpty()) {
-            throw new IllegalArgumentException("checkResults empty, jobId:" + 
jobId);
-        }
-        for (Entry<String, TableDataConsistencyCheckResult> entry : 
checkResults.entrySet()) {
-            TableDataConsistencyCheckResult checkResult = entry.getValue();
-            if (!checkResult.isMatched()) {
-                return false;
-            }
-        }
-        return true;
-    }
 }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
index f646e82611b..7e63268e8ba 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -39,7 +39,7 @@ public final class ShowStreamingRuleExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingRuleStatement sqlStatement) {
-        PipelineProcessConfiguration processConfig = 
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, 
"STREAMING"))
+        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
"STREAMING"))
                 .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
index 6b99ab54353..40c7949c6fa 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -36,8 +36,8 @@ public final class ShowMigrationCheckAlgorithmsExecutor 
implements QueryableRALE
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationCheckAlgorithmsStatement sqlStatement) {
-        InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
-        return jobAPI.listDataConsistencyCheckAlgorithms().stream().map(
+        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
"MIGRATION"));
+        return 
inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
                 each -> new LocalDataQueryResultRow(each.getType(), 
each.getTypeAliases(),
                         
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")),
 each.getDescription()))
                 .collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 0789b0519b0..86d91508712 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.migration.distsql.handler.update;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -54,7 +55,8 @@ public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigration
     }
     
     private void verifyInventoryFinished(final MigrationJobConfiguration 
jobConfig) {
-        
ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(),
 migrationJobAPI.getJobProgress(jobConfig).values()),
+        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(migrationJobAPI);
+        
ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(),
 inventoryIncrementalJobManager.getJobProgress(jobConfig).values()),
                 () -> new PipelineInvalidParameterException("Inventory is not 
finished."));
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 9f90858e590..2484d75d9ca 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -67,6 +67,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -277,7 +278,8 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     
     @Override
     public CDCProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
-        return new CDCProcessContext(pipelineJobConfig.getJobId(), 
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
+        InventoryIncrementalJobManager jobManager = new 
InventoryIncrementalJobManager(this);
+        return new CDCProcessContext(pipelineJobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index ff0e63ce7af..d2689235c2c 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -31,7 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPi
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -279,9 +279,9 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         if (checkJobResult.isEmpty()) {
             result.setCheckSuccess(null);
         } else {
-            InventoryIncrementalJobAPI inventoryIncrementalJobAPI = 
(InventoryIncrementalJobAPI) TypedSPILoader.getService(
-                    PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(parentJobId).getType());
-            
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId,
 checkJobResult));
+            InventoryIncrementalJobManager inventoryIncrementalJobManager = 
new InventoryIncrementalJobManager(
+                    TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
+            
result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId,
 checkJobResult));
         }
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 3d12a4363bc..c29a961158d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -53,6 +53,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
@@ -277,7 +278,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     @Override
     public MigrationProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
-        PipelineProcessConfiguration processConfig = 
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
+        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
         return new MigrationProcessContext(pipelineJobConfig.getJobId(), 
processConfig);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index addf2dee11a..c189075c554 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
@@ -98,7 +99,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     }
     
     private long getRecordsCount() {
-        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new 
MigrationJobAPI().getJobProgress(jobConfig);
+        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new 
InventoryIncrementalJobManager(new MigrationJobAPI()).getJobProgress(jobConfig);
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 2857e63f58e..358f3405180 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,6 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -84,6 +85,8 @@ public final class MigrationJobPreparer {
     
     private final PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
     
+    private final InventoryIncrementalJobManager 
inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
+    
     /**
      * Do prepare work.
      *
@@ -133,12 +136,12 @@ public final class MigrationJobPreparer {
         if (lockContext.tryLock(lockDefinition, 600000)) {
             log.info("try lock success, jobId={}, shardingItem={}, cost {} 
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
             try {
-                JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
+                JobOffsetInfo offsetInfo = 
inventoryIncrementalJobManager.getJobOffsetInfo(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
                     jobItemManager.updateStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
-                    jobAPI.persistJobOffsetInfo(jobId, new 
JobOffsetInfo(true));
+                    inventoryIncrementalJobManager.persistJobOffsetInfo(jobId, 
new JobOffsetInfo(true));
                 }
             } finally {
                 log.info("unlock, jobId={}, shardingItem={}, cost {} ms", 
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index e592dd7ee12..4100c69bc74 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement;
@@ -39,7 +39,7 @@ public final class ShowMigrationRuleExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationRuleStatement sqlStatement) {
-        PipelineProcessConfiguration processConfig = 
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, 
"MIGRATION"))
+        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
"MIGRATION"))
                 .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index 4931418c499..b07cae2401f 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import 
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
@@ -34,9 +34,9 @@ public final class AlterInventoryIncrementalRuleUpdater 
implements RALUpdater<Al
     
     @Override
     public void executeUpdate(final String databaseName, final 
AlterInventoryIncrementalRuleStatement sqlStatement) {
-        InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName());
+        InventoryIncrementalJobManager jobManager = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
sqlStatement.getJobTypeName()));
         PipelineProcessConfiguration processConfig = 
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
-        jobAPI.alterProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY), processConfig);
+        jobManager.alterProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY), processConfig);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 187448cd273..4cd6fa38082 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -92,6 +93,8 @@ class MigrationJobAPITest {
     
     private static PipelineJobManager jobManager;
     
+    private static InventoryIncrementalJobManager 
inventoryIncrementalJobManager;
+    
     private static PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager;
     
     private static DatabaseType databaseType;
@@ -101,6 +104,7 @@ class MigrationJobAPITest {
         PipelineContextUtils.mockModeConfigAndContextManager();
         jobAPI = new MigrationJobAPI();
         jobManager = new PipelineJobManager(jobAPI);
+        inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(jobAPI);
         jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
         String jdbcUrl = 
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
         databaseType = DatabaseTypeFactory.get(jdbcUrl);
@@ -171,7 +175,7 @@ class MigrationJobAPITest {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
-        Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap = 
jobAPI.getJobProgress(jobConfig);
+        Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap = 
inventoryIncrementalJobManager.getJobProgress(jobConfig);
         assertThat(jobProgressMap.size(), is(1));
     }
     
@@ -191,7 +195,7 @@ class MigrationJobAPITest {
     
     @Test
     void assertAggregateEmptyDataConsistencyCheckResults() {
-        assertThrows(IllegalArgumentException.class, () -> 
jobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()));
+        assertThrows(IllegalArgumentException.class, () -> 
inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job", 
Collections.emptyMap()));
     }
     
     @Test
@@ -199,7 +203,7 @@ class MigrationJobAPITest {
         Map<String, TableDataConsistencyCheckResult> checkResults = new 
LinkedHashMap<>(2, 1F);
         checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
         checkResults.put("bar_tbl", new 
TableDataConsistencyCheckResult(false));
-        assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job", 
checkResults));
+        
assertFalse(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
 checkResults));
     }
     
     @Test
@@ -207,7 +211,7 @@ class MigrationJobAPITest {
         Map<String, TableDataConsistencyCheckResult> checkResults = new 
LinkedHashMap<>(2, 1F);
         checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
         checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true));
-        assertTrue(jobAPI.aggregateDataConsistencyCheckResults("foo_job", 
checkResults));
+        
assertTrue(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
 checkResults));
     }
     
     @Test
@@ -218,7 +222,7 @@ class MigrationJobAPITest {
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
         jobItemManager.persistProgress(jobItemContext);
         jobItemManager.updateStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
-        Map<Integer, InventoryIncrementalJobItemProgress> progress = 
jobAPI.getJobProgress(jobConfig);
+        Map<Integer, InventoryIncrementalJobItemProgress> progress = 
inventoryIncrementalJobManager.getJobProgress(jobConfig);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
progress.entrySet()) {
             assertThat(entry.getValue().getStatus(), 
is(JobStatus.EXECUTE_INVENTORY_TASK));
         }

Reply via email to