This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1198efc677e Move PipelineJobAPI.getJobInfo() to
InventoryIncrementalJobAPI (#29058)
1198efc677e is described below
commit 1198efc677e4fa191a547c26d750493e4a5473c7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 16 23:14:37 2023 +0800
Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI (#29058)
* Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI
* Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI
* Remove useless codes
---
.../job/service/InventoryIncrementalJobAPI.java | 27 +++++++++++++++-
.../pipeline/core/job/service/PipelineJobAPI.java | 30 ------------------
.../core/job/service/PipelineJobManager.java | 6 +++-
.../api/impl/ConsistencyCheckJobAPI.java | 36 ++--------------------
.../api/impl/ConsistencyCheckJobAPITest.java | 15 ---------
5 files changed, 34 insertions(+), 80 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 3d7c9b4fbcb..487555edcb5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -26,9 +26,11 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import java.sql.SQLException;
import java.util.Collection;
@@ -41,7 +43,30 @@ import java.util.Optional;
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
- @Override
+ /**
+ * Get pipeline job info.
+ *
+ * @param jobId job ID
+ * @return pipeline job info
+ */
+ PipelineJobInfo getJobInfo(String jobId);
+
+ /**
+ * Build task configuration.
+ *
+ * @param pipelineJobConfig pipeline job configuration
+ * @param jobShardingItem job sharding item
+ * @param pipelineProcessConfig pipeline process configuration
+ * @return task configuration
+ */
+ PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+
+ /**
+ * Build pipeline process context.
+ *
+ * @param pipelineJobConfig pipeline job configuration
+ * @return pipeline process context
+ */
InventoryIncrementalProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 40fb5d8a3f0..b823bba76bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -18,14 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -38,24 +34,6 @@ import java.util.Optional;
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {
- /**
- * Build task configuration.
- *
- * @param pipelineJobConfig pipeline job configuration
- * @param jobShardingItem job sharding item
- * @param pipelineProcessConfig pipeline process configuration
- * @return task configuration
- */
- PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
-
- /**
- * Build pipeline process context.
- *
- * @param pipelineJobConfig pipeline job configuration
- * @return pipeline process context
- */
- PipelineProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-
/**
* Get job configuration.
*
@@ -99,14 +77,6 @@ public interface PipelineJobAPI extends TypedSPI {
return Optional.empty();
}
- /**
- * Get pipeline job info.
- *
- * @param jobId job ID
- * @return pipeline job info
- */
- PipelineJobInfo getJobInfo(String jobId);
-
/**
* Persist job item progress.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 47d8eecad2e..47860f1400c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -39,6 +39,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -178,7 +179,10 @@ public final class PipelineJobManager {
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey
contextKey) {
- return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each
-> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
+ if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) {
+ return getJobBriefInfos(contextKey,
pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI)
pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
+ }
+ return Collections.emptyList();
}
private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey
contextKey, final String jobType) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index e2662e1340d..ac99a1917c8 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -19,17 +19,13 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.im
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
@@ -42,7 +38,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -122,19 +117,9 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()),
() -> new
UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
}
- /**
- * Get latest data consistency check result.
- *
- * @param parentJobId parent job id
- * @return latest data consistency check result
- */
- public Map<String, TableDataConsistencyCheckResult>
getLatestDataConsistencyCheckResult(final String parentJobId) {
- GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
- Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
- if (!latestCheckJobId.isPresent()) {
- return Collections.emptyMap();
- }
- return governanceRepositoryAPI.getCheckJobResult(parentJobId,
latestCheckJobId.get());
+ @Override
+ public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+ return true;
}
@Override
@@ -354,21 +339,6 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
return new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
- @Override
- public PipelineTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public PipelineProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- throw new UnsupportedOperationException();
- }
-
@Override
public Class<ConsistencyCheckJob> getPipelineJobClass() {
return ConsistencyCheckJob.class;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index cb6f6f6a35d..6e90b0b2224 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -73,21 +73,6 @@ class ConsistencyCheckJobAPITest {
assertThat(sequence, is(expectedSequence));
}
- @Test
- void assertGetLatestDataConsistencyCheckResult() {
- MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
- String parentJobId = parentJobConfig.getJobId();
- String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
- parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
- governanceRepositoryAPI.persistLatestCheckJobId(parentJobId,
checkJobId);
- Map<String, TableDataConsistencyCheckResult> expectedCheckResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
- governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId,
expectedCheckResult);
- Map<String, TableDataConsistencyCheckResult> actualCheckResult =
checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId);
- assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
- assertThat(actualCheckResult.get("t_order").isMatched(),
is(expectedCheckResult.get("t_order").isMatched()));
- }
-
@Test
void assertDropByParentJobId() {
MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());