This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5d428210659 Add InventoryIncrementalJobManager (#29079)
5d428210659 is described below
commit 5d428210659a826e82157805f5b632714d9c9812
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 23:02:15 2023 +0800
Add InventoryIncrementalJobManager (#29079)
---
.../job/service/InventoryIncrementalJobAPI.java | 62 ----------
...pl.java => InventoryIncrementalJobManager.java} | 128 +++++++++------------
.../AbstractInventoryIncrementalJobAPIImpl.java | 99 +---------------
.../handler/query/ShowStreamingRuleExecutor.java | 4 +-
.../ShowMigrationCheckAlgorithmsExecutor.java | 6 +-
.../handler/update/CheckMigrationJobUpdater.java | 4 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 +-
.../api/impl/ConsistencyCheckJobAPI.java | 8 +-
.../migration/api/impl/MigrationJobAPI.java | 3 +-
.../MigrationDataConsistencyChecker.java | 3 +-
.../migration/prepare/MigrationJobPreparer.java | 7 +-
.../ral/queryable/ShowMigrationRuleExecutor.java | 4 +-
.../AlterInventoryIncrementalRuleUpdater.java | 6 +-
.../migration/api/impl/MigrationJobAPITest.java | 14 ++-
14 files changed, 99 insertions(+), 253 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index fa4594f7eab..0569315a40f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -22,21 +22,15 @@ import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipeli
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
/**
* Inventory incremental job API.
@@ -83,46 +77,6 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*/
void extendYamlJobConfiguration(PipelineContextKey contextKey,
YamlPipelineJobConfiguration yamlJobConfig);
- /**
- * Alter process configuration.
- *
- * @param contextKey context key
- * @param processConfig process configuration
- */
- void alterProcessConfiguration(PipelineContextKey contextKey,
PipelineProcessConfiguration processConfig);
-
- /**
- * Show process configuration.
- *
- * @param contextKey context key
- * @return process configuration, non-null
- */
- PipelineProcessConfiguration showProcessConfiguration(PipelineContextKey
contextKey);
-
- /**
- * Persist job offset info.
- *
- * @param jobId job ID
- * @param jobOffsetInfo job offset info
- */
- void persistJobOffsetInfo(String jobId, JobOffsetInfo jobOffsetInfo);
-
- /**
- * Get job offset info.
- *
- * @param jobId job ID
- * @return job offset progress
- */
- JobOffsetInfo getJobOffsetInfo(String jobId);
-
- /**
- * Get job progress.
- *
- * @param pipelineJobConfig job configuration
- * @return each sharding item progress
- */
- Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(PipelineJobConfiguration pipelineJobConfig);
-
/**
* Get job infos.
*
@@ -131,13 +85,6 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*/
List<InventoryIncrementalJobItemInfo> getJobItemInfos(String jobId);
- /**
- * List all data consistency check algorithms from SPI.
- *
- * @return data consistency check algorithms
- */
- Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
-
/**
* Build pipeline data consistency checker.
*
@@ -149,15 +96,6 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
InventoryIncrementalProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
- /**
- * Aggregate data consistency check results.
- *
- * @param jobId job ID
- * @param checkResults check results
- * @return check success or not
- */
- boolean aggregateDataConsistencyCheckResults(String jobId, Map<String,
TableDataConsistencyCheckResult> checkResults);
-
/**
* Commit pipeline job.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
similarity index 56%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index c3d0b1d689a..efe7ce8af46 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -15,28 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
+package org.apache.shardingsphere.data.pipeline.core.job.service;
-import lombok.extern.slf4j.Slf4j;
+import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -47,37 +41,50 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
- * Abstract inventory incremental job API implementation.
+ * Inventory incremental job manager.
*/
-@Slf4j
-public abstract class AbstractInventoryIncrementalJobAPIImpl implements
InventoryIncrementalJobAPI {
+@RequiredArgsConstructor
+public final class InventoryIncrementalJobManager {
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+ private final PipelineJobAPI jobAPI;
- private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new
YamlJobOffsetInfoSwapper();
+ private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
- @Override
+ /**
+ * Alter process configuration.
+ *
+ * @param contextKey context key
+ * @param processConfig process configuration
+ */
public void alterProcessConfiguration(final PipelineContextKey contextKey,
final PipelineProcessConfiguration processConfig) {
// TODO check rateLimiter type match or not
- processConfigPersistService.persist(contextKey, getType(),
processConfig);
+ processConfigPersistService.persist(contextKey, jobAPI.getType(),
processConfig);
}
- @Override
+ /**
+ * Show process configuration.
+ *
+ * @param contextKey context key
+ * @return process configuration, non-null
+ */
public PipelineProcessConfiguration showProcessConfiguration(final
PipelineContextKey contextKey) {
- return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
getType()));
+ return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
jobAPI.getType()));
}
- @Override
+ /**
+ * Get job progress.
+ *
+ * @param jobConfig pipeline job configuration
+ * @return each sharding item progress
+ */
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
- PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+ PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
@@ -87,52 +94,33 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
}, LinkedHashMap::putAll);
}
- @Override
- public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
- PipelineJobManager jobManager = new PipelineJobManager(this);
- PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobConfigPOJO);
- long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
- List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
- for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
- int shardingItem = entry.getKey();
- TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
getJobInfo(jobId);
- InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
- String errorMessage = jobItemManager.getErrorMessage(jobId,
shardingItem);
- if (null == jobItemProgress) {
- result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
- continue;
- }
- int inventoryFinishedPercentage = 0;
- if (JobStatus.EXECUTE_INCREMENTAL_TASK ==
jobItemProgress.getStatus() || JobStatus.FINISHED ==
jobItemProgress.getStatus()) {
- inventoryFinishedPercentage = 100;
- } else if (0 != jobItemProgress.getProcessedRecordsCount() && 0 !=
jobItemProgress.getInventoryRecordsCount()) {
- inventoryFinishedPercentage = (int) Math.min(100,
jobItemProgress.getProcessedRecordsCount() * 100 /
jobItemProgress.getInventoryRecordsCount());
- }
- result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), jobItemProgress, startTimeMillis,
inventoryFinishedPercentage, errorMessage));
- }
- return result;
- }
-
- @Override
+ /**
+ * Persist job offset info.
+ *
+ * @param jobId job ID
+ * @param jobOffsetInfo job offset info
+ */
public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
- String value =
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
+ String value = YamlEngine.marshal(new
YamlJobOffsetInfoSwapper().swapToYamlConfiguration(jobOffsetInfo));
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
value);
}
- @Override
+ /**
+ * Get job offset info.
+ *
+ * @param jobId job ID
+ * @return job offset progress
+ */
public JobOffsetInfo getJobOffsetInfo(final String jobId) {
Optional<String> offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
- if (offsetInfo.isPresent()) {
- YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(),
YamlJobOffsetInfo.class);
- return jobOffsetInfoSwapper.swapToObject(info);
- }
- return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
+ return new
YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ?
YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new
YamlJobOffsetInfo());
}
- @Override
+ /**
+ * List all data consistency check algorithms from SPI.
+ *
+ * @return data consistency check algorithms
+ */
public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
Collection<DataConsistencyCheckAlgorithmInfo> result = new
LinkedList<>();
for (TableDataConsistencyChecker each :
ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class))
{
@@ -147,17 +135,15 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
return supportedDatabaseTypes.isEmpty() ?
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) :
supportedDatabaseTypes;
}
- @Override
+ /**
+ * Aggregate data consistency check results.
+ *
+ * @param jobId job ID
+ * @param checkResults check results
+ * @return check success or not
+ */
public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, TableDataConsistencyCheckResult> checkResults) {
- if (checkResults.isEmpty()) {
- throw new IllegalArgumentException("checkResults empty, jobId:" +
jobId);
- }
- for (Entry<String, TableDataConsistencyCheckResult> entry :
checkResults.entrySet()) {
- TableDataConsistencyCheckResult checkResult = entry.getValue();
- if (!checkResult.isMatched()) {
- return false;
- }
- }
- return true;
+ Preconditions.checkArgument(!checkResults.isEmpty(), "checkResults
empty, jobId:", jobId);
+ return
checkResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index c3d0b1d689a..8562706aafa 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,85 +17,39 @@
package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.util.Collection;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Abstract inventory incremental job API implementation.
*/
-@Slf4j
public abstract class AbstractInventoryIncrementalJobAPIImpl implements
InventoryIncrementalJobAPI {
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-
- private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new
YamlJobOffsetInfoSwapper();
-
- @Override
- public void alterProcessConfiguration(final PipelineContextKey contextKey,
final PipelineProcessConfiguration processConfig) {
- // TODO check rateLimiter type match or not
- processConfigPersistService.persist(contextKey, getType(),
processConfig);
- }
-
- @Override
- public PipelineProcessConfiguration showProcessConfiguration(final
PipelineContextKey contextKey) {
- return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
getType()));
- }
-
- @Override
- public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
- PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
- String jobId = jobConfig.getJobId();
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobId, each);
- jobItemProgress.ifPresent(optional ->
optional.setActive(!jobConfigPOJO.isDisabled()));
- map.put(each, jobItemProgress.orElse(null));
- }, LinkedHashMap::putAll);
- }
-
@Override
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
PipelineJobManager jobManager = new PipelineJobManager(this);
- PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobConfigPOJO);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
+ InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(this);
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
+ PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
getJobInfo(jobId);
@@ -115,49 +69,4 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
}
return result;
}
-
- @Override
- public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
- String value =
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobOffsetInfo(jobId,
value);
- }
-
- @Override
- public JobOffsetInfo getJobOffsetInfo(final String jobId) {
- Optional<String> offsetInfo =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobOffsetInfo(jobId);
- if (offsetInfo.isPresent()) {
- YamlJobOffsetInfo info = YamlEngine.unmarshal(offsetInfo.get(),
YamlJobOffsetInfo.class);
- return jobOffsetInfoSwapper.swapToObject(info);
- }
- return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
- }
-
- @Override
- public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
- Collection<DataConsistencyCheckAlgorithmInfo> result = new
LinkedList<>();
- for (TableDataConsistencyChecker each :
ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class))
{
- SPIDescription description =
each.getClass().getAnnotation(SPIDescription.class);
- String typeAliases =
each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(","));
- result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(),
typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null
== description ? "" : description.value()));
- }
- return result;
- }
-
- private Collection<DatabaseType> getSupportedDatabaseTypes(final
Collection<DatabaseType> supportedDatabaseTypes) {
- return supportedDatabaseTypes.isEmpty() ?
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) :
supportedDatabaseTypes;
- }
-
- @Override
- public boolean aggregateDataConsistencyCheckResults(final String jobId,
final Map<String, TableDataConsistencyCheckResult> checkResults) {
- if (checkResults.isEmpty()) {
- throw new IllegalArgumentException("checkResults empty, jobId:" +
jobId);
- }
- for (Entry<String, TableDataConsistencyCheckResult> entry :
checkResults.entrySet()) {
- TableDataConsistencyCheckResult checkResult = entry.getValue();
- if (!checkResult.isMatched()) {
- return false;
- }
- }
- return true;
- }
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
index f646e82611b..7e63268e8ba 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -39,7 +39,7 @@ public final class ShowStreamingRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig =
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class,
"STREAMING"))
+ PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
"STREAMING"))
.showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
index 6b99ab54353..40c7949c6fa 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -36,8 +36,8 @@ public final class ShowMigrationCheckAlgorithmsExecutor
implements QueryableRALE
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationCheckAlgorithmsStatement sqlStatement) {
- InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION");
- return jobAPI.listDataConsistencyCheckAlgorithms().stream().map(
+ InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
"MIGRATION"));
+ return
inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
each -> new LocalDataQueryResultRow(each.getType(),
each.getTypeAliases(),
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")),
each.getDescription()))
.collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 0789b0519b0..86d91508712 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -54,7 +55,8 @@ public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigration
}
private void verifyInventoryFinished(final MigrationJobConfiguration
jobConfig) {
-
ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(),
migrationJobAPI.getJobProgress(jobConfig).values()),
+ InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(migrationJobAPI);
+
ShardingSpherePreconditions.checkState(PipelineJobProgressDetector.isInventoryFinished(jobConfig.getJobShardingCount(),
inventoryIncrementalJobManager.getJobProgress(jobConfig).values()),
() -> new PipelineInvalidParameterException("Inventory is not
finished."));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 9f90858e590..2484d75d9ca 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -67,6 +67,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -277,7 +278,8 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
@Override
public CDCProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
- return new CDCProcessContext(pipelineJobConfig.getJobId(),
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
+ InventoryIncrementalJobManager jobManager = new
InventoryIncrementalJobManager(this);
+ return new CDCProcessContext(pipelineJobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index ff0e63ce7af..d2689235c2c 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -31,7 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPi
import
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
@@ -279,9 +279,9 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
if (checkJobResult.isEmpty()) {
result.setCheckSuccess(null);
} else {
- InventoryIncrementalJobAPI inventoryIncrementalJobAPI =
(InventoryIncrementalJobAPI) TypedSPILoader.getService(
- PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getType());
-
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId,
checkJobResult));
+ InventoryIncrementalJobManager inventoryIncrementalJobManager =
new InventoryIncrementalJobManager(
+ TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getType()));
+
result.setCheckSuccess(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults(parentJobId,
checkJobResult));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 3d12a4363bc..c29a961158d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -53,6 +53,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
@@ -277,7 +278,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
@Override
public MigrationProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
- PipelineProcessConfiguration processConfig =
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
+ PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
return new MigrationProcessContext(pipelineJobConfig.getJobId(),
processConfig);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index addf2dee11a..c189075c554 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
@@ -98,7 +99,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
}
private long getRecordsCount() {
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new
MigrationJobAPI().getJobProgress(jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new
InventoryIncrementalJobManager(new MigrationJobAPI()).getJobProgress(jobConfig);
return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 2857e63f58e..358f3405180 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -84,6 +85,8 @@ public final class MigrationJobPreparer {
private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+ private final InventoryIncrementalJobManager
inventoryIncrementalJobManager = new InventoryIncrementalJobManager(jobAPI);
+
/**
* Do prepare work.
*
@@ -133,12 +136,12 @@ public final class MigrationJobPreparer {
if (lockContext.tryLock(lockDefinition, 600000)) {
log.info("try lock success, jobId={}, shardingItem={}, cost {}
ms", jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
try {
- JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
+ JobOffsetInfo offsetInfo =
inventoryIncrementalJobManager.getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
- jobAPI.persistJobOffsetInfo(jobId, new
JobOffsetInfo(true));
+ inventoryIncrementalJobManager.persistJobOffsetInfo(jobId,
new JobOffsetInfo(true));
}
} finally {
log.info("unlock, jobId={}, shardingItem={}, cost {} ms",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index e592dd7ee12..4100c69bc74 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.distsql.statement.ral.queryable.ShowMigrationRuleStatement;
@@ -39,7 +39,7 @@ public final class ShowMigrationRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig =
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class,
"MIGRATION"))
+ PipelineProcessConfiguration processConfig = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
"MIGRATION"))
.showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
index 4931418c499..b07cae2401f 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterInventoryIncrementalRuleUpdater.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
@@ -34,9 +34,9 @@ public final class AlterInventoryIncrementalRuleUpdater
implements RALUpdater<Al
@Override
public void executeUpdate(final String databaseName, final
AlterInventoryIncrementalRuleStatement sqlStatement) {
- InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, sqlStatement.getJobTypeName());
+ InventoryIncrementalJobManager jobManager = new
InventoryIncrementalJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
sqlStatement.getJobTypeName()));
PipelineProcessConfiguration processConfig =
InventoryIncrementalProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
- jobAPI.alterProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY), processConfig);
+ jobManager.alterProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY), processConfig);
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 187448cd273..4cd6fa38082 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
@@ -92,6 +93,8 @@ class MigrationJobAPITest {
private static PipelineJobManager jobManager;
+ private static InventoryIncrementalJobManager
inventoryIncrementalJobManager;
+
private static PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager;
private static DatabaseType databaseType;
@@ -101,6 +104,7 @@ class MigrationJobAPITest {
PipelineContextUtils.mockModeConfigAndContextManager();
jobAPI = new MigrationJobAPI();
jobManager = new PipelineJobManager(jobAPI);
+ inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(jobAPI);
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
String jdbcUrl =
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
databaseType = DatabaseTypeFactory.get(jdbcUrl);
@@ -171,7 +175,7 @@ class MigrationJobAPITest {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap =
jobAPI.getJobProgress(jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
assertThat(jobProgressMap.size(), is(1));
}
@@ -191,7 +195,7 @@ class MigrationJobAPITest {
@Test
void assertAggregateEmptyDataConsistencyCheckResults() {
- assertThrows(IllegalArgumentException.class, () ->
jobAPI.aggregateDataConsistencyCheckResults("foo_job", Collections.emptyMap()));
+ assertThrows(IllegalArgumentException.class, () ->
inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
Collections.emptyMap()));
}
@Test
@@ -199,7 +203,7 @@ class MigrationJobAPITest {
Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
checkResults.put("bar_tbl", new
TableDataConsistencyCheckResult(false));
- assertFalse(jobAPI.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
+
assertFalse(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
}
@Test
@@ -207,7 +211,7 @@ class MigrationJobAPITest {
Map<String, TableDataConsistencyCheckResult> checkResults = new
LinkedHashMap<>(2, 1F);
checkResults.put("foo_tbl", new TableDataConsistencyCheckResult(true));
checkResults.put("bar_tbl", new TableDataConsistencyCheckResult(true));
- assertTrue(jobAPI.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
+
assertTrue(inventoryIncrementalJobManager.aggregateDataConsistencyCheckResults("foo_job",
checkResults));
}
@Test
@@ -218,7 +222,7 @@ class MigrationJobAPITest {
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobItemManager.persistProgress(jobItemContext);
jobItemManager.updateStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
- Map<Integer, InventoryIncrementalJobItemProgress> progress =
jobAPI.getJobProgress(jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress> progress =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
progress.entrySet()) {
assertThat(entry.getValue().getStatus(),
is(JobStatus.EXECUTE_INVENTORY_TASK));
}