This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 865d138 Switch dataSources and rules configuration after scaling task
triggered by http api finished (#12228)
865d138 is described below
commit 865d138d9b5b5168857ecd5e5633497a2d3d8d4f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Sep 5 20:47:11 2021 +0800
Switch dataSources and rules configuration after scaling task triggered by
http api finished (#12228)
* Add schemaName for /scaling/job/start
* Switch configuration in governance when scaling task finished
---
.../subscriber/ScalingRegistrySubscriber.java | 40 ++++++++++++++++++++++
.../rule/ClusterSwitchConfigurationEvent.java | 40 ++++++++++++++++++++++
.../event/rule/ScalingTaskFinishedEvent.java | 33 ++++++++++++++++++
.../ScalingDataSourceConfigurationWrap.java | 2 ++
.../scaling/core/job/FinishedCheckJob.java | 11 ++++--
5 files changed, 123 insertions(+), 3 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
index 84c13c2..011c00f 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/cache/subscriber/ScalingRegistrySubscriber.java
@@ -18,10 +18,16 @@
package
org.apache.shardingsphere.mode.manager.cluster.governance.registry.cache.subscriber;
import com.google.common.eventbus.Subscribe;
+import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlDataSourceConfigurationSwapper;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.cache.RegistryCacheManager;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.cache.event.StartScalingEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ClusterSwitchConfigurationEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationCachedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ScalingTaskFinishedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.SwitchRuleConfigurationEvent;
+import
org.apache.shardingsphere.mode.persist.service.impl.DataSourcePersistService;
import
org.apache.shardingsphere.mode.persist.service.impl.SchemaRulePersistService;
import org.apache.shardingsphere.mode.persist.node.SchemaMetadataNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -31,6 +37,10 @@ import
org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlRuleConfigurationSwapperEngine;
import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
/**
* Scaling registry subscriber.
@@ -42,11 +52,14 @@ public final class ScalingRegistrySubscriber {
private final SchemaRulePersistService persistService;
+ private final DataSourcePersistService dataSourcePersistService;
+
private final RegistryCacheManager registryCacheManager;
public ScalingRegistrySubscriber(final ClusterPersistRepository
repository) {
this.repository = repository;
this.persistService = new SchemaRulePersistService(repository);
+ dataSourcePersistService = new DataSourcePersistService(repository);
registryCacheManager = new RegistryCacheManager(repository);
ShardingSphereEventBus.getInstance().register(this);
}
@@ -81,4 +94,31 @@ public final class ScalingRegistrySubscriber {
registryCacheManager.loadCache(SchemaMetadataNode.getRulePath(event.getSchemaName()),
event.getCacheId()), event.getCacheId());
ShardingSphereEventBus.getInstance().post(startScalingEvent);
}
+
+ /**
+ * Scaling task finished.
+ *
+ * @param event scaling task finished event
+ */
+ @Subscribe
+ public void scalingTaskFinished(final ScalingTaskFinishedEvent event) {
+ YamlRootConfiguration yamlRootConfiguration =
YamlEngine.unmarshal(event.getTargetParameter(), YamlRootConfiguration.class);
+ Map<String, DataSourceConfiguration> dataSourceConfigs =
yamlRootConfiguration.getDataSources().entrySet().stream().collect(Collectors.toMap(
+ Entry::getKey, entry -> new
YamlDataSourceConfigurationSwapper().swapToDataSourceConfiguration(entry.getValue()),
(oldValue, currentValue) -> oldValue, LinkedHashMap::new));
+ Collection<RuleConfiguration> ruleConfigs = new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfiguration.getRules());
+ ClusterSwitchConfigurationEvent switchEvent = new
ClusterSwitchConfigurationEvent(event.getTargetSchemaName(), dataSourceConfigs,
ruleConfigs);
+ ShardingSphereEventBus.getInstance().post(switchEvent);
+ }
+
+ /**
+ * Cluster switch configuration.
+ *
+ * @param event cluster switch configuration event
+ */
+ @Subscribe
+ public void clusterSwitchConfiguration(final
ClusterSwitchConfigurationEvent event) {
+ String schemaName = event.getTargetSchemaName();
+ dataSourcePersistService.persist(schemaName,
event.getTargetDataSourceConfigs());
+ persistService.persist(schemaName, event.getTargetRuleConfigs());
+ }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ClusterSwitchConfigurationEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ClusterSwitchConfigurationEvent.java
new file mode 100644
index 0000000..ac5be3c
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ClusterSwitchConfigurationEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.config.RuleConfiguration;
+import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Cluster switch configuration event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ClusterSwitchConfigurationEvent {
+
+ private final String targetSchemaName;
+
+ private final Map<String, DataSourceConfiguration> targetDataSourceConfigs;
+
+ private final Collection<RuleConfiguration> targetRuleConfigs;
+}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ScalingTaskFinishedEvent.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ScalingTaskFinishedEvent.java
new file mode 100644
index 0000000..7cf94c6
--- /dev/null
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/governance/registry/config/event/rule/ScalingTaskFinishedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+/**
+ * Scaling task finished event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class ScalingTaskFinishedEvent {
+
+ private final String targetSchemaName;
+
+ private final String targetParameter;
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
index 1a6dd7a..02e3983 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/datasource/ScalingDataSourceConfigurationWrap.java
@@ -29,6 +29,8 @@ import java.util.Map;
@Setter
public class ScalingDataSourceConfigurationWrap {
+ private String schemaName;
+
private String type;
private String parameter;
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
index 22ae919..8edfe4c 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/FinishedCheckJob.java
@@ -20,6 +20,8 @@ package org.apache.shardingsphere.scaling.core.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.ScalingTaskFinishedEvent;
import org.apache.shardingsphere.scaling.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPI;
import org.apache.shardingsphere.scaling.core.api.ScalingAPIFactory;
@@ -27,6 +29,7 @@ import
org.apache.shardingsphere.scaling.core.api.ScalingDataConsistencyCheckAlg
import org.apache.shardingsphere.scaling.core.common.constant.ScalingConstant;
import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
+import
org.apache.shardingsphere.scaling.core.config.datasource.ScalingDataSourceConfigurationWrap;
import
org.apache.shardingsphere.scaling.core.job.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.util.ScalingTaskUtil;
@@ -49,7 +52,7 @@ public final class FinishedCheckJob implements SimpleJob {
JobConfiguration jobConfig =
scalingAPI.getJobConfig(jobId);
if
(ScalingTaskUtil.almostFinished(scalingAPI.getProgress(jobId),
jobConfig.getHandleConfig())) {
log.info("scaling job {} almost finished.", jobId);
- trySwitch(jobId);
+ trySwitch(jobId, jobConfig);
}
// CHECKSTYLE:OFF
} catch (final Exception ex) {
@@ -59,7 +62,7 @@ public final class FinishedCheckJob implements SimpleJob {
});
}
- private void trySwitch(final long jobId) {
+ private void trySwitch(final long jobId, final JobConfiguration jobConfig)
{
// TODO lock proxy
ScalingDataConsistencyCheckAlgorithm dataConsistencyCheckAlgorithm =
ScalingContext.getInstance().getDataConsistencyCheckAlgorithm();
if (null != dataConsistencyCheckAlgorithm) {
@@ -71,7 +74,9 @@ public final class FinishedCheckJob implements SimpleJob {
log.info("dataConsistencyCheckAlgorithm is not configured, data
consistency check will be ignored.");
}
scalingAPI.stop(jobId);
- //TODO auto switch configuration
+ ScalingDataSourceConfigurationWrap targetConfig =
jobConfig.getRuleConfig().getTarget();
+ ScalingTaskFinishedEvent taskFinishedEvent = new
ScalingTaskFinishedEvent(targetConfig.getSchemaName(),
targetConfig.getParameter());
+ ShardingSphereEventBus.getInstance().post(taskFinishedEvent);
}
private boolean dataConsistencyCheck(final long jobId) {