This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1198efc677e Move PipelineJobAPI.getJobInfo() to 
InventoryIncrementalJobAPI (#29058)
1198efc677e is described below

commit 1198efc677e4fa191a547c26d750493e4a5473c7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 16 23:14:37 2023 +0800

    Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI (#29058)
    
    * Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI
    
    * Move PipelineJobAPI.getJobInfo() to InventoryIncrementalJobAPI
    
    * Remove useless codes
---
 .../job/service/InventoryIncrementalJobAPI.java    | 27 +++++++++++++++-
 .../pipeline/core/job/service/PipelineJobAPI.java  | 30 ------------------
 .../core/job/service/PipelineJobManager.java       |  6 +++-
 .../api/impl/ConsistencyCheckJobAPI.java           | 36 ++--------------------
 .../api/impl/ConsistencyCheckJobAPITest.java       | 15 ---------
 5 files changed, 34 insertions(+), 80 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 3d7c9b4fbcb..487555edcb5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -26,9 +26,11 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -41,7 +43,30 @@ import java.util.Optional;
  */
 public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
     
-    @Override
+    /**
+     * Get pipeline job info.
+     *
+     * @param jobId job ID
+     * @return pipeline job info
+     */
+    PipelineJobInfo getJobInfo(String jobId);
+    
+    /**
+     * Build task configuration.
+     *
+     * @param pipelineJobConfig pipeline job configuration
+     * @param jobShardingItem job sharding item
+     * @param pipelineProcessConfig pipeline process configuration
+     * @return task configuration
+     */
+    PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration 
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
+    
+    /**
+     * Build pipeline process context.
+     *
+     * @param pipelineJobConfig pipeline job configuration
+     * @return pipeline process context
+     */
     InventoryIncrementalProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 40fb5d8a3f0..b823bba76bf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -18,14 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -38,24 +34,6 @@ import java.util.Optional;
 @SingletonSPI
 public interface PipelineJobAPI extends TypedSPI {
     
-    /**
-     * Build task configuration.
-     *
-     * @param pipelineJobConfig pipeline job configuration
-     * @param jobShardingItem job sharding item
-     * @param pipelineProcessConfig pipeline process configuration
-     * @return task configuration
-     */
-    PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration 
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
-    
-    /**
-     * Build pipeline process context.
-     *
-     * @param pipelineJobConfig pipeline job configuration
-     * @return pipeline process context
-     */
-    PipelineProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
-    
     /**
      * Get job configuration.
      *
@@ -99,14 +77,6 @@ public interface PipelineJobAPI extends TypedSPI {
         return Optional.empty();
     }
     
-    /**
-     * Get pipeline job info.
-     * 
-     * @param jobId job ID
-     * @return pipeline job info
-     */
-    PipelineJobInfo getJobInfo(String jobId);
-    
     /**
      * Persist job item progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 47d8eecad2e..47860f1400c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -39,6 +39,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -178,7 +179,10 @@ public final class PipelineJobManager {
      * @return jobs info
      */
     public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey 
contextKey) {
-        return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each 
-> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
+        if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) {
+            return getJobBriefInfos(contextKey, 
pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI) 
pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
+        }
+        return Collections.emptyList();
     }
     
     private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey 
contextKey, final String jobType) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index e2662e1340d..ac99a1917c8 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -19,17 +19,13 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.im
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
@@ -42,7 +38,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -122,19 +117,9 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()),
 () -> new 
UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
     }
     
-    /**
-     * Get latest data consistency check result.
-     *
-     * @param parentJobId parent job id
-     * @return latest data consistency check result
-     */
-    public Map<String, TableDataConsistencyCheckResult> 
getLatestDataConsistencyCheckResult(final String parentJobId) {
-        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
-        Optional<String> latestCheckJobId = 
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
-        if (!latestCheckJobId.isPresent()) {
-            return Collections.emptyMap();
-        }
-        return governanceRepositoryAPI.getCheckJobResult(parentJobId, 
latestCheckJobId.get());
+    @Override
+    public boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+        return true;
     }
     
     @Override
@@ -354,21 +339,6 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
-    @Override
-    public PipelineTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
-        throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public PipelineProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
-        throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public PipelineJobInfo getJobInfo(final String jobId) {
-        throw new UnsupportedOperationException();
-    }
-    
     @Override
     public Class<ConsistencyCheckJob> getPipelineJobClass() {
         return ConsistencyCheckJob.class;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index cb6f6f6a35d..6e90b0b2224 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -73,21 +73,6 @@ class ConsistencyCheckJobAPITest {
         assertThat(sequence, is(expectedSequence));
     }
     
-    @Test
-    void assertGetLatestDataConsistencyCheckResult() {
-        MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
-        String parentJobId = parentJobConfig.getJobId();
-        String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
-                parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
-        governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, 
checkJobId);
-        Map<String, TableDataConsistencyCheckResult> expectedCheckResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(true));
-        governanceRepositoryAPI.persistCheckJobResult(parentJobId, checkJobId, 
expectedCheckResult);
-        Map<String, TableDataConsistencyCheckResult> actualCheckResult = 
checkJobAPI.getLatestDataConsistencyCheckResult(parentJobId);
-        assertThat(actualCheckResult.size(), is(expectedCheckResult.size()));
-        assertThat(actualCheckResult.get("t_order").isMatched(), 
is(expectedCheckResult.get("t_order").isMatched()));
-    }
-    
     @Test
     void assertDropByParentJobId() {
         MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());

Reply via email to