This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 865d138  Switch dataSources and rules configuration after scaling task 
triggered by http api finished (#12228)
865d138 is described below

commit 865d138d9b5b5168857ecd5e5633497a2d3d8d4f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Sep 5 20:47:11 2021 +0800

    Switch dataSources and rules configuration after scaling task triggered by 
http api finished (#12228)
    
    * Add schemaName for /scaling/job/start
    
    * Switch configuration in governance when scaling task finished
---
 .../subscriber/ScalingRegistrySubscriber.java      | 40 ++++++++++++++++++++++
 .../rule/ClusterSwitchConfigurationEvent.java      | 40 ++++++++++++++++++++++
 .../event/rule/ScalingTaskFinishedEvent.java       | 33 ++++++++++++++++++
 .../ScalingDataSourceConfigurationWrap.java        |  2 ++
 .../scaling/core/job/FinishedCheckJob.java         | 11 ++++--
 5 files changed, 123 insertions(+), 3 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
index 84c13c2..011c00f 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -18,10 +18,16 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.cache.subscriber;
 
 import com.google.common.eventbus.Subscribe;
+import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.cache.RegistryCacheManager;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.cache.event.StartScalingEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ClusterSwitchConfigurationEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationCachedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ScalingTaskFinishedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.SwitchRuleConfigurationEvent;
+import 
org.apache.shardingsphere.mode.persist.service.impl.DataSourcePersistService;
 import 
org.apache.shardingsphere.mode.persist.service.impl.SchemaRulePersistService;
 import org.apache.shardingsphere.mode.persist.node.SchemaMetadataNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -31,6 +37,10 @@ import 
org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
 
 import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
  * Scaling registry subscriber.
@@ -42,11 +52,14 @@ public final class ScalingRegistrySubscriber {
     
     private final SchemaRulePersistService persistService;
     
+    private final DataSourcePersistService dataSourcePersistService;
+    
     private final RegistryCacheManager registryCacheManager;
     
     public ScalingRegistrySubscriber(final ClusterPersistRepository 
repository) {
         this.repository = repository;
         this.persistService = new SchemaRulePersistService(repository);
+        dataSourcePersistService = new DataSourcePersistService(repository);
         registryCacheManager = new RegistryCacheManager(repository);
         ShardingSphereEventBus.getInstance().register(this);
     }
@@ -81,4 +94,31 @@ public final class ScalingRegistrySubscriber {
                 
registryCacheManager.loadCache(SchemaMetadataNode.getRulePath(event.getSchemaName()),
 event.getCacheId()), event.getCacheId());
         ShardingSphereEventBus.getInstance().post(startScalingEvent);
     }
+    
+    /**
+     * Scaling task finished.
+     *
+     * @param event scaling task finished event
+     */
+    @Subscribe
+    public void scalingTaskFinished(final ScalingTaskFinishedEvent event) {
+        YamlRootConfiguration yamlRootConfiguration = 
YamlEngine.unmarshal(event.getTargetParameter(), YamlRootConfiguration.class);
+        Map<String, DataSourceConfiguration> dataSourceConfigs = 
yamlRootConfiguration.getDataSources().entrySet().stream().collect(Collectors.toMap(
+                Entry::getKey, entry -> new 
YamlDataSourceConfigurationSwapper().swapToDataSourceConfiguration(entry.getValue()),
 (oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+        Collection<RuleConfiguration> ruleConfigs = new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfiguration.getRules());
+        ClusterSwitchConfigurationEvent switchEvent = new 
ClusterSwitchConfigurationEvent(event.getTargetSchemaName(), dataSourceConfigs, 
ruleConfigs);
+        ShardingSphereEventBus.getInstance().post(switchEvent);
+    }
+    
+    /**
+     * Cluster switch configuration.
+     *
+     * @param event cluster switch configuration event
+     */
+    @Subscribe
+    public void clusterSwitchConfiguration(final 
ClusterSwitchConfigurationEvent event) {
+        String schemaName = event.getTargetSchemaName();
+        dataSourcePersistService.persist(schemaName, 
event.getTargetDataSourceConfigs());
+        persistService.persist(schemaName, event.getTargetRuleConfigs());
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ClusterSwitchConfigurationEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ClusterSwitchConfigurationEvent.java
new file mode 100644
index 0000000..ac5be3c
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ClusterSwitchConfigurationEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Cluster switch configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ClusterSwitchConfigurationEvent {
+    
+    private final String targetSchemaName;
+    
+    private final Map<String, DataSourceConfiguration> targetDataSourceConfigs;
+    
+    private final Collection<RuleConfiguration> targetRuleConfigs;
+}
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ScalingTaskFinishedEvent.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ScalingTaskFinishedEvent.java
new file mode 100644
index 0000000..7cf94c6
--- /dev/null
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ScalingTaskFinishedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Scaling task finished event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ScalingTaskFinishedEvent {
+    
+    private final String targetSchemaName;
+    
+    private final String targetParameter;
+}
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
index 1a6dd7a..02e3983 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
@@ -29,6 +29,8 @@ import java.util.Map;
 @Setter
 public class ScalingDataSourceConfigurationWrap {
     
+    private String schemaName;
+    
     private String type;
     
     private String parameter;
diff --git 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 22ae919..8edfe4c 100644
--- 
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.scaling.core.job;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ScalingTaskFinishedEvent;
 import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
 import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
@@ -27,6 +29,7 @@ import 
org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlg
 import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
 import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
 import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import 
org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
 import 
org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
 
@@ -49,7 +52,7 @@ public final class FinishedCheckJob implements SimpleJob {
                         JobConfiguration jobConfig = 
scalingAPI.getJobConfig(jobId);
                         if 
(ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId), 
jobConfig.getHandleConfig())) {
                             log.info("scaling job {} almost finished.", jobId);
-                            trySwitch(jobId);
+                            trySwitch(jobId, jobConfig);
                         }
                         // CHECKSTYLE:OFF
                     } catch (final Exception ex) {
@@ -59,7 +62,7 @@ public final class FinishedCheckJob implements SimpleJob {
                 });
     }
     
-    private void trySwitch(final long jobId) {
+    private void trySwitch(final long jobId, final JobConfiguration jobConfig) 
{
         // TODO lock proxy
         ScalingDataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm = 
ScalingContext.getInstance().getDataConsistencyCheckAlgorithm();
         if (null != dataConsistencyCheckAlgorithm) {
@@ -71,7 +74,9 @@ public final class FinishedCheckJob implements SimpleJob {
             log.info("dataConsistencyCheckAlgorithm is not configured, data 
consistency check will be ignored.");
         }
         scalingAPI.stop(jobId);
-        //TODO auto switch configuration
+        ScalingDataSourceConfigurationWrap targetConfig = 
jobConfig.getRuleConfig().getTarget();
+        ScalingTaskFinishedEvent taskFinishedEvent = new 
ScalingTaskFinishedEvent(targetConfig.getSchemaName(), 
targetConfig.getParameter());
+        ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
     }
     
     private boolean dataConsistencyCheck(final long jobId) {

Reply via email to