This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 462cb7a847b Remove AbstractInventoryIncrementalJobAPIImpl (#29080)
462cb7a847b is described below

commit 462cb7a847b8b98a354c33e14a4606049692c467
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 10:48:27 2023 +0800

    Remove AbstractInventoryIncrementalJobAPIImpl (#29080)
---
 .../job/service/InventoryIncrementalJobAPI.java    | 10 ---
 .../service/InventoryIncrementalJobManager.java    | 42 ++++++++++++-
 .../AbstractInventoryIncrementalJobAPIImpl.java    | 72 ----------------------
 .../query/ShowStreamingJobStatusExecutor.java      |  3 +-
 .../handler/query/ShowStreamingRuleExecutor.java   |  3 +-
 .../ShowMigrationCheckAlgorithmsExecutor.java      |  3 +-
 .../query/ShowMigrationJobStatusExecutor.java      |  3 +-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  4 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |  3 +-
 .../migration/api/impl/MigrationJobAPI.java        |  4 +-
 .../ral/queryable/ShowMigrationRuleExecutor.java   |  3 +-
 .../AlterInventoryIncrementalRuleUpdater.java      |  3 +-
 .../migration/api/impl/MigrationJobAPITest.java    |  4 +-
 13 files changed, 61 insertions(+), 96 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 0569315a40f..76a66b00e6b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -23,14 +23,12 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelinePro
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 
 import java.sql.SQLException;
-import java.util.List;
 
 /**
  * Inventory incremental job API.
@@ -77,14 +75,6 @@ public interface InventoryIncrementalJobAPI extends 
PipelineJobAPI {
      */
     void extendYamlJobConfiguration(PipelineContextKey contextKey, 
YamlPipelineJobConfiguration yamlJobConfig);
     
-    /**
-     * Get job infos.
-     *
-     * @param jobId job ID
-     * @return job item infos
-     */
-    List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);
-    
     /**
      * Build pipeline data consistency checker.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index efe7ce8af46..e31ee4796b2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -23,11 +23,14 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConf
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
+import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -41,7 +44,9 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -52,7 +57,7 @@ import java.util.stream.IntStream;
 @RequiredArgsConstructor
 public final class InventoryIncrementalJobManager {
     
-    private final PipelineJobAPI jobAPI;
+    private final InventoryIncrementalJobAPI jobAPI;
     
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
@@ -77,6 +82,41 @@ public final class InventoryIncrementalJobManager {
         return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 jobAPI.getType()));
     }
     
+    /**
+     * Get job infos.
+     *
+     * @param jobId job ID
+     * @return job item infos
+     */
+    public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
+        PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
+        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+        PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobConfigPOJO);
+        long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
+        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(jobAPI);
+        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
inventoryIncrementalJobManager.getJobProgress(jobConfig);
+        List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
+        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+        for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
+            int shardingItem = entry.getKey();
+            TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
jobAPI.getJobInfo(jobId);
+            InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
+            String errorMessage = jobItemManager.getErrorMessage(jobId, 
shardingItem);
+            if (null == jobItemProgress) {
+                result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
+                continue;
+            }
+            int inventoryFinishedPercentage = 0;
+            if (JobStatus.EXECUTE_INCREMENTAL_TASK == 
jobItemProgress.getStatus() || JobStatus.FINISHED == 
jobItemProgress.getStatus()) {
+                inventoryFinishedPercentage = 100;
+            } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != 
jobItemProgress.getInventoryRecordsCount()) {
+                inventoryFinishedPercentage = (int) Math.min(100, 
jobItemProgress.getProcessedRecordsCount() * 100 / 
jobItemProgress.getInventoryRecordsCount());
+            }
+            result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), jobItemProgress, startTimeMillis, 
inventoryFinishedPercentage, errorMessage));
+        }
+        return result;
+    }
+    
     /**
      * Get job progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
deleted file mode 100644
index 8562706aafa..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
-
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-
-/**
- * Abstract inventory incremental job API implementation.
- */
-public abstract class AbstractInventoryIncrementalJobAPIImpl implements 
InventoryIncrementalJobAPI {
-    
-    @Override
-    public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String 
jobId) {
-        PipelineJobManager jobManager = new PipelineJobManager(this);
-        JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        PipelineJobConfiguration jobConfig = 
jobManager.getJobConfiguration(jobConfigPOJO);
-        long startTimeMillis = 
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
-        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(this);
-        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
inventoryIncrementalJobManager.getJobProgress(jobConfig);
-        List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
-        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
-        for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
-            int shardingItem = entry.getKey();
-            TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
getJobInfo(jobId);
-            InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
-            String errorMessage = jobItemManager.getErrorMessage(jobId, 
shardingItem);
-            if (null == jobItemProgress) {
-                result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
-                continue;
-            }
-            int inventoryFinishedPercentage = 0;
-            if (JobStatus.EXECUTE_INCREMENTAL_TASK == 
jobItemProgress.getStatus() || JobStatus.FINISHED == 
jobItemProgress.getStatus()) {
-                inventoryFinishedPercentage = 100;
-            } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 != 
jobItemProgress.getInventoryRecordsCount()) {
-                inventoryFinishedPercentage = (int) Math.min(100, 
jobItemProgress.getProcessedRecordsCount() * 100 / 
jobItemProgress.getInventoryRecordsCount());
-            }
-            result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), jobItemProgress, startTimeMillis, 
inventoryFinishedPercentage, errorMessage));
-        }
-        return result;
-    }
-}
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 87fab7477a4..60bed768d22 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -41,7 +42,7 @@ public final class ShowStreamingJobStatusExecutor implements 
QueryableRALExecuto
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingStatusStatement sqlStatement) {
         InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType());
-        List<InventoryIncrementalJobItemInfo> jobItemInfos = 
jobAPI.getJobItemInfos(sqlStatement.getJobId());
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = new 
InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> generateResultRow(each, 
currentTimeMillis)).collect(Collectors.toList());
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
index 7e63268e8ba..78a02f6ea1f 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -39,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingRuleStatement sqlStatement) {
-        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
"STREAMING"))
+        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager((InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING"))
                 .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
index 40c7949c6fa..425d94e993b 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -36,7 +37,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor 
implements QueryableRALE
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationCheckAlgorithmsStatement sqlStatement) {
-        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
"MIGRATION"));
+        InventoryIncrementalJobManager inventoryIncrementalJobManager = new 
InventoryIncrementalJobManager((InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"));
         return 
inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
                 each -> new LocalDataQueryResultRow(each.getType(), 
each.getTypeAliases(),
                         
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")),
 each.getDescription()))
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index b92d9efb73e..79684dc0eb1 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.migration.distsql.handler.query;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -40,7 +41,7 @@ public final class ShowMigrationJobStatusExecutor implements 
QueryableRALExecuto
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationStatusStatement sqlStatement) {
         InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
-        List<InventoryIncrementalJobItemInfo> jobItemInfos = 
jobAPI.getJobItemInfos(sqlStatement.getJobId());
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = new 
InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> generateResultRow(each, 
currentTimeMillis)).collect(Collectors.toList());
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 2484d75d9ca..135d62f7710 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -67,11 +67,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -98,7 +98,7 @@ import java.util.stream.Collectors;
  * CDC job API.
  */
 @Slf4j
-public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
+public final class CDCJobAPI implements InventoryIncrementalJobAPI {
     
     private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index d2689235c2c..93520db26f8 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPi
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -280,7 +281,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
             result.setCheckSuccess(null);
         } else {
             InventoryIncrementalJobManager inventoryIncrementalJobManager = 
new InventoryIncrementalJobManager(
-                    TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
+                    (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
             
result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId,
 checkJobResult));
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index c29a961158d..4f7a023d21b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -53,10 +53,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
@@ -110,7 +110,7 @@ import java.util.stream.Collectors;
  * Migration job API.
  */
 @Slf4j
-public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
+public final class MigrationJobAPI implements InventoryIncrementalJobAPI {
     
     private final PipelineDataSourcePersistService dataSourcePersistService = 
new PipelineDataSourcePersistService();
     
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index 4100c69bc74..d9e53912613 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -39,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements 
QueryableRALExecutor<Sho
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationRuleStatement sqlStatement) {
-        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
"MIGRATION"))
+        PipelineProcessConfiguration processConfig = new 
InventoryIncrementalJobManager((InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"))
                 .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
         Collection<LocalDataQueryResultRow> result = new LinkedList<>();
         result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index b07cae2401f..52648f2cc95 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
@@ -34,7 +35,7 @@ public final class AlterInventoryIncrementalRuleUpdater 
implements RALUpdater<Al
     
     @Override
     public void executeUpdate(final String databaseName, final 
AlterInventoryIncrementalRuleStatement sqlStatement) {
-        InventoryIncrementalJobManager jobManager = new 
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class, 
sqlStatement.getJobTypeName()));
+        InventoryIncrementalJobManager jobManager = new 
InventoryIncrementalJobManager((InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName()));
         PipelineProcessConfiguration processConfig = 
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
         jobManager.alterProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY), processConfig);
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4cd6fa38082..e70b05acc01 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -326,7 +326,7 @@ class MigrationJobAPITest {
         yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
         yamlJobItemProgress.setSourceDatabaseType("MySQL");
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
-        List<InventoryIncrementalJobItemInfo> jobItemInfos = 
jobAPI.getJobItemInfos(jobId.get());
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         assertThat(jobItemInfos.size(), is(1));
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
         assertThat(jobItemInfo.getJobItemProgress().getStatus(), 
is(JobStatus.RUNNING));
@@ -343,7 +343,7 @@ class MigrationJobAPITest {
         yamlJobItemProgress.setProcessedRecordsCount(100);
         yamlJobItemProgress.setInventoryRecordsCount(50);
         
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
 0, YamlEngine.marshal(yamlJobItemProgress));
-        List<InventoryIncrementalJobItemInfo> jobItemInfos = 
jobAPI.getJobItemInfos(jobId.get());
+        List<InventoryIncrementalJobItemInfo> jobItemInfos = 
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
         InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
         assertThat(jobItemInfo.getJobItemProgress().getStatus(), 
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
         assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));


Reply via email to