This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f1ef13950 Move updateJobItemStatus method from PipelineJobAPI to 
PipelineJobManager (#29076)
c4f1ef13950 is described below

commit c4f1ef13950c0155054139dd381cedd3f204782b
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 18:28:16 2023 +0800

    Move updateJobItemStatus method from PipelineJobAPI to PipelineJobManager 
(#29076)
---
 .../common/job/progress/PipelineJobItemProgress.java   |  7 +++++++
 .../data/pipeline/core/job/service/PipelineJobAPI.java | 10 ----------
 .../pipeline/core/job/service/PipelineJobManager.java  | 18 ++++++++++++++++++
 .../impl/AbstractInventoryIncrementalJobAPIImpl.java   | 12 ------------
 .../task/runner/InventoryIncrementalTasksRunner.java   |  2 +-
 .../data/pipeline/cdc/core/job/CDCJob.java             |  2 +-
 .../api/impl/ConsistencyCheckJobAPI.java               | 14 --------------
 .../migration/prepare/MigrationJobPreparer.java        |  2 +-
 .../migration/api/impl/MigrationJobAPITest.java        |  4 ++--
 9 files changed, 30 insertions(+), 41 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
index 4153e740733..3c9770a369f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/PipelineJobItemProgress.java
@@ -30,4 +30,11 @@ public interface PipelineJobItemProgress {
      * @return job status
      */
     JobStatus getStatus();
+    
+    /**
+     * Set status.
+     * 
+     * @param jobStatus job status
+     */
+    void setStatus(JobStatus jobStatus);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 168b18d88ad..87ae49520dc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
-import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
@@ -76,15 +75,6 @@ public interface PipelineJobAPI extends TypedSPI {
         return Optional.empty();
     }
     
-    /**
-     * Update job item status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
-    
     /**
      * Get pipeline job class.
      * 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index e7fd3ad85e6..3eab31d08af 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -227,6 +227,24 @@ public final class PipelineJobManager {
                 .persistJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
     }
     
+    /**
+     * Update job item status.
+     *
+     * @param jobId job id
+     * @param shardingItem sharding item
+     * @param status status
+     */
+    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+        Optional<PipelineJobItemProgress> jobItemProgress = 
getJobItemProgress(jobId, shardingItem);
+        if (!jobItemProgress.isPresent()) {
+            log.warn("updateJobItemStatus, jobProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
+            return;
+        }
+        jobItemProgress.get().setStatus(status);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
+                
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
+    }
+    
     /**
      * Update job item progress.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 6b0cf4171e6..f0c85629db7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -130,18 +130,6 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl implements Inventor
         return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
     }
     
-    @Override
-    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        PipelineJobManager jobManager = new PipelineJobManager(this);
-        Optional<InventoryIncrementalJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobId, shardingItem);
-        if (!jobItemProgress.isPresent()) {
-            return;
-        }
-        jobItemProgress.get().setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
-                
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
-    }
-    
     @Override
     public Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms() {
         Collection<DataConsistencyCheckAlgorithmInfo> result = new 
LinkedList<>();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index ad9e3ccb81b..4af5b5f3b27 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -106,7 +106,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     
     private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
-        jobAPI.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+        jobManager.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     private synchronized void executeIncrementalTask() {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 657b7b48ccd..14cdb71513a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -151,7 +151,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private void updateLocalAndRemoteJobItemStatus(final 
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
-        jobAPI.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+        jobManager.updateJobItemStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     private void executeIncrementalTasks(final List<CDCJobItemContext> 
jobItemContexts) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 4ba4d05079d..a60fcb44d5e 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -46,7 +46,6 @@ import 
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.sql.Timestamp;
 import java.time.Duration;
@@ -117,19 +116,6 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         return true;
     }
     
-    @Override
-    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-        PipelineJobManager jobManager = new PipelineJobManager(this);
-        Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobManager.getJobItemProgress(jobId, shardingItem);
-        if (!jobItemProgress.isPresent()) {
-            log.warn("updateJobItemStatus, jobProgress is null, jobId={}, 
shardingItem={}", jobId, shardingItem);
-            return;
-        }
-        jobItemProgress.get().setStatus(status);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
 shardingItem,
-                
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
-    }
-    
     /**
      * Start by parent job id.
      *
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index d473001b873..50ce83843f1 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -136,7 +136,7 @@ public final class MigrationJobPreparer {
                 JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
                 if (!offsetInfo.isTargetSchemaTableCreated()) {
                     jobItemContext.setStatus(JobStatus.PREPARING);
-                    jobAPI.updateJobItemStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
+                    jobManager.updateJobItemStatus(jobId, 
jobItemContext.getShardingItem(), JobStatus.PREPARING);
                     prepareAndCheckTarget(jobItemContext);
                     jobAPI.persistJobOffsetInfo(jobId, new 
JobOffsetInfo(true));
                 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index a1a1c905dbd..6201a6cb507 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -213,7 +213,7 @@ class MigrationJobAPITest {
         assertTrue(jobId.isPresent());
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
         jobManager.persistJobItemProgress(jobItemContext);
-        jobAPI.updateJobItemStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+        jobManager.updateJobItemStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
         Map<Integer, InventoryIncrementalJobItemProgress> progress = 
jobAPI.getJobProgress(jobConfig);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
progress.entrySet()) {
             assertThat(entry.getValue().getStatus(), 
is(JobStatus.EXECUTE_INVENTORY_TASK));
@@ -246,7 +246,7 @@ class MigrationJobAPITest {
         final MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         MigrationJobItemContext jobItemContext = 
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
         jobManager.persistJobItemProgress(jobItemContext);
-        jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
+        jobManager.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
         Optional<InventoryIncrementalJobItemProgress> actual = 
jobManager.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         assertTrue(actual.isPresent());
         assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));

Reply via email to