This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b24ae72 Fix switchClusterConfiguration method does not modify task
status in proxy cluster (#15408)
b24ae72 is described below
commit b24ae727e4141e6607508c873439d25af83d5436
Author: ReyYang <[email protected]>
AuthorDate: Thu Feb 17 19:46:17 2022 +0800
Fix switchClusterConfiguration method does not modify task status in proxy
cluster (#15408)
---
.../user-manual/shardingsphere-scaling/usage.en.md | 1 -
.../pipeline/core/api/GovernanceRepositoryAPI.java | 8 +++++++
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 13 +++++++++++
.../core/api/impl/RuleAlteredJobAPIImpl.java | 20 +++++++++++------
.../rulealtered/RuleAlteredJobScheduler.java | 8 -------
.../rulealtered/RuleAlteredJobSchedulerCenter.java | 26 ----------------------
.../data/pipeline/api/job/JobStatus.java | 5 -----
7 files changed, 34 insertions(+), 47 deletions(-)
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 027b2be4..5d34402 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -253,7 +253,6 @@ Current scaling job is finished, new sharding rule should
take effect, and not i
| RUNNING | running
|
| EXECUTE_INVENTORY_TASK | inventory task running
|
| EXECUTE_INCREMENTAL_TASK | incremental task running
|
-| ALMOST_FINISHED | almost finished
|
| FINISHED | finished (The whole
process is completed, and the new rules have been taken effect) |
| PREPARING_FAILURE | preparation failed
|
| EXECUTE_INVENTORY_TASK_FAILURE | inventory task failed
|
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index 1f18563..0866ffc 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.api;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -98,4 +99,11 @@ public interface GovernanceRepositoryAPI {
* @param value value of data
*/
void persist(String key, String value);
+
+ /**
+ * Renew job status.
+ * @param status status
+ * @param jobId job id
+ */
+ void renewJobStatus(JobStatus status, String jobId);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 0e3aec5..338567a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -18,8 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
@@ -129,6 +131,17 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
repository.persist(key, value);
}
+ @Override
+ public void renewJobStatus(final JobStatus status, final String jobId) {
+ List<String> offsetKeys =
getChildrenKeys(String.format("%s/%s/offset",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId));
+ Map<Integer, JobProgress> progressMap = Maps.newHashMap();
+ offsetKeys.forEach(each -> progressMap.put(Integer.parseInt(each),
getJobProgress(jobId, Integer.parseInt(each))));
+ progressMap.forEach((key, value) -> {
+ value.setStatus(status);
+ persist(getOffsetPath(jobId, key),
YamlEngine.marshal(JOB_PROGRESS_YAML_SWAPPER.swapToYaml(value)));
+ });
+ }
+
private String getOffsetPath(final String jobId, final int shardingItem) {
return String.format("%s/%s/offset/%d",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId, shardingItem);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 6b8203a..f9e35d8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -39,7 +39,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoun
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobSchedulerCenter;
+import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobPreparer;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -67,6 +67,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -311,19 +312,24 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
throw new PipelineDataConsistencyCheckFailedException("Data
consistency check not finished or failed.");
}
}
- Optional<Collection<RuleAlteredJobContext>> optionalJobContexts =
RuleAlteredJobSchedulerCenter.getJobContexts(jobId);
- optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each
-> each.setStatus(JobStatus.ALMOST_FINISHED)));
YamlRootConfiguration yamlRootConfig =
YamlEngine.unmarshal(jobConfig.getPipelineConfig().getTarget().getParameter(),
YamlRootConfiguration.class, true);
WorkflowConfiguration workflowConfig = jobConfig.getWorkflowConfig();
String schemaName = workflowConfig.getSchemaName();
String ruleCacheId = workflowConfig.getRuleCacheId();
ScalingTaskFinishedEvent taskFinishedEvent = new
ScalingTaskFinishedEvent(schemaName, yamlRootConfig, ruleCacheId);
ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
- optionalJobContexts.ifPresent(jobContexts -> jobContexts.forEach(each
-> {
- each.setStatus(JobStatus.FINISHED);
- RuleAlteredJobSchedulerCenter.persistJobProgress(each);
- }));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().renewJobStatus(JobStatus.FINISHED,
jobId);
stop(jobId);
+ // TODO clean up should be done after the task is complete.
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (final InterruptedException ex) {
+ log.error(ex.getMessage());
+ }
+ RuleAlteredJobContext jobContext = new
RuleAlteredJobContext(jobConfig);
+ RuleAlteredJobPreparer jobPreparer = new RuleAlteredJobPreparer();
+ jobPreparer.cleanup(jobContext);
+ jobContext.close();
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
index f89292b..253a979 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobScheduler.java
@@ -51,14 +51,6 @@ public final class RuleAlteredJobScheduler implements
Runnable {
*/
public void stop() {
log.info("stop job {}", jobContext.getJobId());
- final boolean almostFinished = jobContext.getStatus() ==
JobStatus.ALMOST_FINISHED;
- if (almostFinished) {
- log.info("almost finished, preparer cleanup, job {}",
jobContext.getJobId());
- RuleAlteredJobPreparer jobPreparer = jobContext.getJobPreparer();
- if (null != jobPreparer) {
- jobPreparer.cleanup(jobContext);
- }
- }
for (InventoryTask each : jobContext.getInventoryTasks()) {
log.info("stop inventory task {} - {}", jobContext.getJobId(),
each.getTaskId());
each.stop();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
index 8f970c1..243fe5b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobSchedulerCenter.java
@@ -25,14 +25,11 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* Rule altered job scheduler center.
@@ -87,29 +84,6 @@ public final class RuleAlteredJobSchedulerCenter {
}
}
- /**
- * Get job contexts.
- *
- * @param jobId job id
- * @return job context
- */
- public static Optional<Collection<RuleAlteredJobContext>>
getJobContexts(final String jobId) {
- Map<Integer, RuleAlteredJobScheduler> schedulerMap =
JOB_SCHEDULER_MAP.get(jobId);
- if (null == schedulerMap) {
- return Optional.empty();
- }
- return
Optional.of(schedulerMap.values().stream().map(RuleAlteredJobScheduler::getJobContext).collect(Collectors.toList()));
- }
-
- /**
- * Persist job progress.
- *
- * @param jobContext job context
- */
- public static void persistJobProgress(final RuleAlteredJobContext
jobContext) {
- REGISTRY_REPOSITORY_API.persistJobProgress(jobContext);
- }
-
private static final class PersistJobContextRunnable implements Runnable {
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index 56d2ea9..da423a4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -48,11 +48,6 @@ public enum JobStatus {
EXECUTE_INCREMENTAL_TASK(true),
/**
- * Job is almost finished.
- */
- ALMOST_FINISHED(true),
-
- /**
* Job is finished.
*/
FINISHED(false),