This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b24ae72  Fix switchClusterConfiguration method does not modify task 
status in proxy cluster (#15408)
b24ae72 is described below

commit b24ae727e4141e6607508c873439d25af83d5436
Author: ReyYang <[email protected]>
AuthorDate: Thu Feb 17 19:46:17 2022 +0800

    Fix switchClusterConfiguration method does not modify task status in proxy 
cluster (#15408)
---
 .../user-manual/shardingsphere-scaling/usage.en.md |  1 -
 .../pipeline/core/api/GovernanceRepositoryAPI.java |  8 +++++++
 .../core/api/impl/GovernanceRepositoryAPIImpl.java | 13 +++++++++++
 .../core/api/impl/RuleAlteredJobAPIImpl.java       | 20 +++++++++++------
 .../rulealtered/RuleAlteredJobScheduler.java       |  8 -------
 .../rulealtered/RuleAlteredJobSchedulerCenter.java | 26 ----------------------
 .../data/pipeline/api/job/JobStatus.java           |  5 -----
 7 files changed, 34 insertions(+), 47 deletions(-)

diff --git 
a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md 
b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 027b2be4..5d34402 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -253,7 +253,6 @@ Current scaling job is finished, new sharding rule should 
take effect, and not i
 | RUNNING                                           | running                  
                                    |
 | EXECUTE_INVENTORY_TASK                            | inventory task running   
                                    |
 | EXECUTE_INCREMENTAL_TASK                          | incremental task running 
                                    |
-| ALMOST_FINISHED                                   | almost finished          
                                    |
 | FINISHED                                          | finished (The whole 
process is completed, and the new rules have been taken effect) |
 | PREPARING_FAILURE                                 | preparation failed       
                                    |
 | EXECUTE_INVENTORY_TASK_FAILURE                    | inventory task failed    
                                    |
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 1f18563..0866ffc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.api;
 
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -98,4 +99,11 @@ public interface GovernanceRepositoryAPI {
      * @param value value of data
      */
     void persist(String key, String value);
+    
+    /**
+     * Renew job status.
+     * @param status status
+     * @param jobId job id
+     */
+    void renewJobStatus(JobStatus status, String jobId);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 0e3aec5..338567a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -18,8 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
@@ -129,6 +131,17 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         repository.persist(key, value);
     }
     
+    @Override
+    public void renewJobStatus(final JobStatus status, final String jobId) {
+        List<String> offsetKeys = 
getChildrenKeys(String.format("%s/%s/offset", 
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId));
+        Map<Integer, JobProgress> progressMap = Maps.newHashMap();
+        offsetKeys.forEach(each -> progressMap.put(Integer.parseInt(each), 
getJobProgress(jobId, Integer.parseInt(each))));
+        progressMap.forEach((key, value) -> {
+            value.setStatus(status);
+            persist(getOffsetPath(jobId, key), 
YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(value)));
+        });
+    }
+    
     private String getOffsetPath(final String jobId, final int shardingItem) {
         return String.format("%s/%s/offset/%d", 
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, shardingItem);
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 6b8203a..f9e35d8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -39,7 +39,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoun
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
+import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -67,6 +67,7 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -311,19 +312,24 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
                 throw new PipelineDataConsistencyCheckFailedException("Data 
consistency check not finished or failed.");
             }
         }
-        Optional<Collection<RuleAlteredJobContext>> optionalJobContexts = 
RuleAlteredJobSchedulerCenter.getJobContexts(jobId);
-        optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each 
-> each.setStatus(JobStatus.ALMOST_FINISHED)));
         YamlRootConfiguration yamlRootConfig = 
YamlEngine.unmarshal(jobConfig.getPipelineConfig().getTarget().getParameter(), 
YamlRootConfiguration.class, true);
         WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
         String schemaName = workflowConfig.getSchemaName();
         String ruleCacheId = workflowConfig.getRuleCacheId();
         ScalingTaskFinishedEvent taskFinishedEvent = new 
ScalingTaskFinishedEvent(schemaName, yamlRootConfig, ruleCacheId);
         ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
-        optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each 
-> {
-            each.setStatus(JobStatus.FINISHED);
-            RuleAlteredJobSchedulerCenter.persistJobProgress(each);
-        }));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().renewJobStatus(JobStatus.FINISHED,
 jobId);
         stop(jobId);
+        // TODO clean up should be done after the task is complete.
+        try {
+            TimeUnit.SECONDS.sleep(1);
+        } catch (final InterruptedException ex) {
+            log.error(ex.getMessage());
+        }
+        RuleAlteredJobContext jobContext = new 
RuleAlteredJobContext(jobConfig);
+        RuleAlteredJobPreparer jobPreparer = new RuleAlteredJobPreparer();
+        jobPreparer.cleanup(jobContext);
+        jobContext.close();
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index f89292b..253a979 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -51,14 +51,6 @@ public final class RuleAlteredJobScheduler implements 
Runnable {
      */
     public void stop() {
         log.info("stop job {}", jobContext.getJobId());
-        final boolean almostFinished = jobContext.getStatus() == 
JobStatus.ALMOST_FINISHED;
-        if (almostFinished) {
-            log.info("almost finished, preparer cleanup, job {}", 
jobContext.getJobId());
-            RuleAlteredJobPreparer jobPreparer = jobContext.getJobPreparer();
-            if (null != jobPreparer) {
-                jobPreparer.cleanup(jobContext);
-            }
-        }
         for (InventoryTask each : jobContext.getInventoryTasks()) {
             log.info("stop inventory task {} - {}", jobContext.getJobId(), 
each.getTaskId());
             each.stop();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 8f970c1..243fe5b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -25,14 +25,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * Rule altered job scheduler center.
@@ -87,29 +84,6 @@ public final class RuleAlteredJobSchedulerCenter {
         }
     }
     
-    /**
-     * Get job contexts.
-     *
-     * @param jobId job id
-     * @return job context
-     */
-    public static Optional<Collection<RuleAlteredJobContext>> 
getJobContexts(final String jobId) {
-        Map<Integer, RuleAlteredJobScheduler> schedulerMap = 
JOB_SCHEDULER_MAP.get(jobId);
-        if (null == schedulerMap) {
-            return Optional.empty();
-        }
-        return 
Optional.of(schedulerMap.values().stream().map(RuleAlteredJobScheduler::getJobContext).collect(Collectors.toList()));
-    }
-    
-    /**
-     * Persist job progress.
-     *
-     * @param jobContext job context
-     */
-    public static void persistJobProgress(final RuleAlteredJobContext 
jobContext) {
-        REGISTRY_REPOSITORY_API.persistJobProgress(jobContext);
-    }
-    
     private static final class PersistJobContextRunnable implements Runnable {
         
         @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 56d2ea9..da423a4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -48,11 +48,6 @@ public enum JobStatus {
     EXECUTE_INCREMENTAL_TASK(true),
     
     /**
-     * Job is almost finished.
-     */
-    ALMOST_FINISHED(true),
-    
-    /**
      * Job is finished.
      */
     FINISHED(false),

Reply via email to