This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 462cb7a847b Remove AbstractInventoryIncrementalJobAPIImpl (#29080)
462cb7a847b is described below
commit 462cb7a847b8b98a354c33e14a4606049692c467
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 10:48:27 2023 +0800
Remove AbstractInventoryIncrementalJobAPIImpl (#29080)
---
.../job/service/InventoryIncrementalJobAPI.java | 10 ---
.../service/InventoryIncrementalJobManager.java | 42 ++++++++++++-
.../AbstractInventoryIncrementalJobAPIImpl.java | 72 ----------------------
.../query/ShowStreamingJobStatusExecutor.java | 3 +-
.../handler/query/ShowStreamingRuleExecutor.java | 3 +-
.../ShowMigrationCheckAlgorithmsExecutor.java | 3 +-
.../query/ShowMigrationJobStatusExecutor.java | 3 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 +-
.../api/impl/ConsistencyCheckJobAPI.java | 3 +-
.../migration/api/impl/MigrationJobAPI.java | 4 +-
.../ral/queryable/ShowMigrationRuleExecutor.java | 3 +-
.../AlterInventoryIncrementalRuleUpdater.java | 3 +-
.../migration/api/impl/MigrationJobAPITest.java | 4 +-
13 files changed, 61 insertions(+), 96 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 0569315a40f..76a66b00e6b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -23,14 +23,12 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelinePro
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import java.sql.SQLException;
-import java.util.List;
/**
* Inventory incremental job API.
@@ -77,14 +75,6 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*/
void extendYamlJobConfiguration(PipelineContextKey contextKey,
YamlPipelineJobConfiguration yamlJobConfig);
- /**
- * Get job infos.
- *
- * @param jobId job ID
- * @return job item infos
- */
- List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);
-
/**
* Build pipeline data consistency checker.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index efe7ce8af46..e31ee4796b2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -23,11 +23,14 @@ import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConf
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
+import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
+import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -41,7 +44,9 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -52,7 +57,7 @@ import java.util.stream.IntStream;
@RequiredArgsConstructor
public final class InventoryIncrementalJobManager {
- private final PipelineJobAPI jobAPI;
+ private final InventoryIncrementalJobAPI jobAPI;
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
@@ -77,6 +82,41 @@ public final class InventoryIncrementalJobManager {
return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
jobAPI.getType()));
}
+ /**
+ * Get job infos.
+ *
+ * @param jobId job ID
+ * @return job item infos
+ */
+ public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
+ PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+ PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobConfigPOJO);
+ long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
+ InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(jobAPI);
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
+ List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
+ PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+ for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
+ int shardingItem = entry.getKey();
+ TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
jobAPI.getJobInfo(jobId);
+ InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
+ String errorMessage = jobItemManager.getErrorMessage(jobId,
shardingItem);
+ if (null == jobItemProgress) {
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
+ continue;
+ }
+ int inventoryFinishedPercentage = 0;
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK ==
jobItemProgress.getStatus() || JobStatus.FINISHED ==
jobItemProgress.getStatus()) {
+ inventoryFinishedPercentage = 100;
+ } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 !=
jobItemProgress.getInventoryRecordsCount()) {
+ inventoryFinishedPercentage = (int) Math.min(100,
jobItemProgress.getProcessedRecordsCount() * 100 /
jobItemProgress.getInventoryRecordsCount());
+ }
+ result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), jobItemProgress, startTimeMillis,
inventoryFinishedPercentage, errorMessage));
+ }
+ return result;
+ }
+
/**
* Get job progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
deleted file mode 100644
index 8562706aafa..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
-
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-
-/**
- * Abstract inventory incremental job API implementation.
- */
-public abstract class AbstractInventoryIncrementalJobAPIImpl implements
InventoryIncrementalJobAPI {
-
- @Override
- public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
- PipelineJobManager jobManager = new PipelineJobManager(this);
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobConfigPOJO);
- long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
- InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(this);
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
- List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
- PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
- for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
- int shardingItem = entry.getKey();
- TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
getJobInfo(jobId);
- InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
- String errorMessage = jobItemManager.getErrorMessage(jobId,
shardingItem);
- if (null == jobItemProgress) {
- result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
- continue;
- }
- int inventoryFinishedPercentage = 0;
- if (JobStatus.EXECUTE_INCREMENTAL_TASK ==
jobItemProgress.getStatus() || JobStatus.FINISHED ==
jobItemProgress.getStatus()) {
- inventoryFinishedPercentage = 100;
- } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 !=
jobItemProgress.getInventoryRecordsCount()) {
- inventoryFinishedPercentage = (int) Math.min(100,
jobItemProgress.getProcessedRecordsCount() * 100 /
jobItemProgress.getInventoryRecordsCount());
- }
- result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), jobItemProgress, startTimeMillis,
inventoryFinishedPercentage, errorMessage));
- }
- return result;
- }
-}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index 87fab7477a4..60bed768d22 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -41,7 +42,7 @@ public final class ShowStreamingJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingStatusStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType());
- List<InventoryIncrementalJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(sqlStatement.getJobId());
+ List<InventoryIncrementalJobItemInfo> jobItemInfos = new
InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each,
currentTimeMillis)).collect(Collectors.toList());
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
index 7e63268e8ba..78a02f6ea1f 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -39,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
"STREAMING"))
+ PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager((InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "STREAMING"))
.showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
index 40c7949c6fa..425d94e993b 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -36,7 +37,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor
implements QueryableRALE
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationCheckAlgorithmsStatement sqlStatement) {
- InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
"MIGRATION"));
+ InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager((InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"));
return
inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
each -> new LocalDataQueryResultRow(each.getType(),
each.getTypeAliases(),
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")),
each.getDescription()))
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index b92d9efb73e..79684dc0eb1 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -40,7 +41,7 @@ public final class ShowMigrationJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationStatusStatement sqlStatement) {
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
- List<InventoryIncrementalJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(sqlStatement.getJobId());
+ List<InventoryIncrementalJobItemInfo> jobItemInfos = new
InventoryIncrementalJobManager(jobAPI).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each,
currentTimeMillis)).collect(Collectors.toList());
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 2484d75d9ca..135d62f7710 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -67,11 +67,11 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -98,7 +98,7 @@ import java.util.stream.Collectors;
* CDC job API.
*/
@Slf4j
-public final class CDCJobAPI extends AbstractInventoryIncrementalJobAPIImpl {
+public final class CDCJobAPI implements InventoryIncrementalJobAPI {
private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper =
new YamlDataSourceConfigurationSwapper();
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index d2689235c2c..93520db26f8 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPi
import
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -280,7 +281,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
result.setCheckSuccess(null);
} else {
InventoryIncrementalJobManager inventoryIncrementalJobManager =
new InventoryIncrementalJobManager(
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
+ (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId,
checkJobResult));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index c29a961158d..4f7a023d21b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -53,10 +53,10 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
@@ -110,7 +110,7 @@ import java.util.stream.Collectors;
* Migration job API.
*/
@Slf4j
-public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
+public final class MigrationJobAPI implements InventoryIncrementalJobAPI {
private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index 4100c69bc74..d9e53912613 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -39,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
"MIGRATION"))
+ PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager((InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"))
.showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index b07cae2401f..52648f2cc95 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
@@ -34,7 +35,7 @@ public final class AlterInventoryIncrementalRuleUpdater
implements RALUpdater<Al
@Override
public void executeUpdate(final String databaseName, final
AlterInventoryIncrementalRuleStatement sqlStatement) {
- InventoryIncrementalJobManager jobManager = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
sqlStatement.getJobTypeName()));
+ InventoryIncrementalJobManager jobManager = new
InventoryIncrementalJobManager((InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName()));
PipelineProcessConfiguration processConfig =
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobManager.alterProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY), processConfig);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4cd6fa38082..e70b05acc01 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -326,7 +326,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
yamlJobItemProgress.setSourceDatabaseType("MySQL");
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
- List<InventoryIncrementalJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(jobId.get());
+ List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
assertThat(jobItemInfos.size(), is(1));
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.RUNNING));
@@ -343,7 +343,7 @@ class MigrationJobAPITest {
yamlJobItemProgress.setProcessedRecordsCount(100);
yamlJobItemProgress.setInventoryRecordsCount(50);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey()).persistJobItemProgress(jobId.get(),
0, YamlEngine.marshal(yamlJobItemProgress));
- List<InventoryIncrementalJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(jobId.get());
+ List<InventoryIncrementalJobItemInfo> jobItemInfos =
inventoryIncrementalJobManager.getJobItemInfos(jobId.get());
InventoryIncrementalJobItemInfo jobItemInfo = jobItemInfos.get(0);
assertThat(jobItemInfo.getJobItemProgress().getStatus(),
is(JobStatus.EXECUTE_INCREMENTAL_TASK));
assertThat(jobItemInfo.getInventoryFinishedPercentage(), is(100));