This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 438a4b51737 Add YamlStorageNodeDataSource (#23881)
438a4b51737 is described below
commit 438a4b517371e31c695d2e5ec6a9040dbc64356c
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Feb 1 01:18:46 2023 +0800
Add YamlStorageNodeDataSource (#23881)
---
.../algorithm/DatabaseDiscoveryEngine.java | 33 +++++++++--------
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 2 +-
.../rule/ReadwriteSplittingRule.java | 2 +-
.../infra/datasource/state/DataSourceState.java | 2 +-
.../mode/metadata/MetaDataContextsFactory.java | 2 +-
.../metadata/storage/StorageNodeDataSource.java | 22 +++--------
.../mode/metadata/storage/StorageNodeStatus.java | 22 +----------
.../storage/service/StorageNodeStatusService.java | 6 ++-
.../subscriber/StorageNodeStatusSubscriber.java | 8 ++--
.../watcher/StorageNodeStateChangedWatcher.java | 12 +++---
.../storage/yaml/YamlStorageNodeDataSource.java | 18 +++++++--
.../yaml/YamlStorageNodeDataSourceSwapper.java | 43 ++++++++++++++++++++++
.../subscriber/ConfigurationChangedSubscriber.java | 2 +-
.../subscriber/StateChangedSubscriber.java | 2 +-
.../StorageNodeStatusSubscriberTest.java | 15 +++++---
.../StorageNodeStateChangedWatcherTest.java | 16 ++++----
...owStatusFromReadwriteSplittingRulesHandler.java | 9 +++--
...SplittingStorageUnitStatusStatementHandler.java | 2 +-
18 files changed, 127 insertions(+), 91 deletions(-)
diff --git
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 25e9c39eac8..2551b30187b 100644
---
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -97,22 +97,23 @@ public final class DatabaseDiscoveryEngine {
final Map<String,
DataSource> dataSourceMap, final Collection<String> disabledDataSourceNames) {
int enabledReplicasCount = dataSourceMap.size() -
disabledDataSourceNames.size() - 1;
for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
- if (!entry.getKey().equals(primaryDataSourceName)) {
- StorageNodeDataSource storageNodeDataSource =
createStorageNodeDataSource(loadReplicaStatus(entry.getValue()));
- if
(StorageNodeStatus.isEnable(storageNodeDataSource.getStatus())) {
- enabledReplicasCount +=
disabledDataSourceNames.contains(entry.getKey()) ? 1 : 0;
- eventBusContext.post(new
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(),
storageNodeDataSource));
- continue;
- }
- if
(Strings.isNullOrEmpty(databaseDiscoveryProviderAlgorithm.getProps().getProperty("min-enabled-replicas")))
{
- eventBusContext.post(new
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(),
storageNodeDataSource));
- continue;
- }
- if (!(databaseDiscoveryProviderAlgorithm instanceof
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm)
- || enabledReplicasCount >
Integer.parseInt(databaseDiscoveryProviderAlgorithm.getProps().getProperty("min-enabled-replicas",
"0"))) {
- enabledReplicasCount -=
disabledDataSourceNames.contains(entry.getKey()) ? 0 : 1;
- eventBusContext.post(new
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(),
storageNodeDataSource));
- }
+ if (entry.getKey().equals(primaryDataSourceName)) {
+ continue;
+ }
+ StorageNodeDataSource storageNodeDataSource =
createStorageNodeDataSource(loadReplicaStatus(entry.getValue()));
+ if (StorageNodeStatus.ENABLED ==
storageNodeDataSource.getStatus()) {
+ enabledReplicasCount +=
disabledDataSourceNames.contains(entry.getKey()) ? 1 : 0;
+ eventBusContext.post(new DataSourceDisabledEvent(databaseName,
groupName, entry.getKey(), storageNodeDataSource));
+ continue;
+ }
+ if
(Strings.isNullOrEmpty(databaseDiscoveryProviderAlgorithm.getProps().getProperty("min-enabled-replicas")))
{
+ eventBusContext.post(new DataSourceDisabledEvent(databaseName,
groupName, entry.getKey(), storageNodeDataSource));
+ continue;
+ }
+ if (!(databaseDiscoveryProviderAlgorithm instanceof
MySQLNormalReplicationDatabaseDiscoveryProviderAlgorithm)
+ || enabledReplicasCount >
Integer.parseInt(databaseDiscoveryProviderAlgorithm.getProps().getProperty("min-enabled-replicas",
"0"))) {
+ enabledReplicasCount -=
disabledDataSourceNames.contains(entry.getKey()) ? 0 : 1;
+ eventBusContext.post(new DataSourceDisabledEvent(databaseName,
groupName, entry.getKey(), storageNodeDataSource));
}
}
}
diff --git
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index 9b2fe053cdc..fb97d5870a2 100644
---
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -198,7 +198,7 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
StorageNodeDataSourceChangedEvent dataSourceChangedEvent =
(StorageNodeDataSourceChangedEvent) event;
DatabaseDiscoveryDataSourceRule dataSourceRule =
dataSourceRules.get(dataSourceChangedEvent.getQualifiedDatabase().getGroupName());
Preconditions.checkNotNull(dataSourceRule, "Can not find database
discovery data source rule in database `%s`", databaseName);
- if
(StorageNodeStatus.isDisable(dataSourceChangedEvent.getDataSource().getStatus()))
{
+ if (StorageNodeStatus.DISABLED ==
dataSourceChangedEvent.getDataSource().getStatus()) {
dataSourceRule.disableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
} else {
dataSourceRule.enableDataSource(dataSourceChangedEvent.getQualifiedDatabase().getDataSourceName());
diff --git
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
index 4ed6677a198..b5cfe03ac63 100644
---
a/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
+++
b/features/readwrite-splitting/core/src/main/java/org/apache/shardingsphere/readwritesplitting/rule/ReadwriteSplittingRule.java
@@ -169,7 +169,7 @@ public final class ReadwriteSplittingRule implements
DatabaseRule, DataSourceCon
QualifiedDatabase qualifiedDatabase =
dataSourceEvent.getQualifiedDatabase();
ReadwriteSplittingDataSourceRule dataSourceRule =
dataSourceRules.get(qualifiedDatabase.getGroupName());
Preconditions.checkNotNull(dataSourceRule, "Can not find
readwrite-splitting data source rule in database `%s`",
qualifiedDatabase.getDatabaseName());
-
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(),
StorageNodeStatus.isDisable(dataSourceEvent.getDataSource().getStatus()));
+
dataSourceRule.updateDisabledDataSourceNames(dataSourceEvent.getQualifiedDatabase().getDataSourceName(),
StorageNodeStatus.DISABLED == dataSourceEvent.getDataSource().getStatus());
}
@Override
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
index 8671cb625a1..35c8f12e506 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.infra.datasource.state;
*/
public enum DataSourceState {
- DISABLED, ENABLED
+ ENABLED, DISABLED
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
index 2d374b40d3e..715569f9929 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/MetaDataContextsFactory.java
@@ -122,7 +122,7 @@ public final class MetaDataContextsFactory {
Preconditions.checkArgument(3 == values.size(), "Illegal data
source of storage node.");
String databaseName = values.get(0);
String dataSourceName = values.get(2);
- result.put(databaseName + "." + dataSourceName,
DataSourceState.valueOf(value.getStatus().toUpperCase()));
+ result.put(databaseName + "." + dataSourceName,
DataSourceState.valueOf(value.getStatus().name()));
});
return result;
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeDataSource.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeDataSource.java
index 028ca48e040..d9646bd0e54 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeDataSource.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeDataSource.java
@@ -18,32 +18,22 @@
package org.apache.shardingsphere.mode.metadata.storage;
import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
/**
* Data source of storage node.
*/
-@NoArgsConstructor
+@RequiredArgsConstructor
@Getter
-@Setter
public final class StorageNodeDataSource {
- private String role;
+ private final StorageNodeRole role;
- private String status;
+ private final StorageNodeStatus status;
- private long replicationDelayMilliseconds;
+ private final long replicationDelayMilliseconds;
public StorageNodeDataSource(final StorageNodeRole role, final
StorageNodeStatus status) {
- this.role = role.name().toLowerCase();
- this.status = status.name().toLowerCase();
- replicationDelayMilliseconds = 0L;
- }
-
- public StorageNodeDataSource(final StorageNodeRole role, final
StorageNodeStatus status, final long replicationDelayMilliseconds) {
- this.role = role.name().toLowerCase();
- this.status = status.name().toLowerCase();
- this.replicationDelayMilliseconds = replicationDelayMilliseconds;
+ this(role, status, 0L);
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeStatus.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeStatus.java
index cdbc2af99a3..edd177fb31d 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeStatus.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/storage/StorageNodeStatus.java
@@ -22,25 +22,5 @@ package org.apache.shardingsphere.mode.metadata.storage;
*/
public enum StorageNodeStatus {
- DISABLED, ENABLED;
-
- /**
- * Storage node disable or enable.
- *
- * @param status storage node status
- * @return disable or enable
- */
- public static boolean isDisable(final String status) {
- return DISABLED.name().toLowerCase().equals(status);
- }
-
- /**
- * Storage node disable or enable.
- *
- * @param status storage node status
- * @return disable or enable
- */
- public static boolean isEnable(final String status) {
- return ENABLED.name().toLowerCase().equals(status);
- }
+ ENABLED, DISABLED
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
index 31862d8cd7d..6fad75f3e2d 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/service/StorageNodeStatusService.java
@@ -19,9 +19,11 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageNode;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml.YamlStorageNodeDataSource;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml.YamlStorageNodeDataSourceSwapper;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Collection;
@@ -47,7 +49,7 @@ public final class StorageNodeStatusService {
storageNodes.forEach(each -> {
String yamlContext =
repository.getDirectly(StorageNode.getStorageNodesDataSourcePath(each));
if (!Strings.isNullOrEmpty(yamlContext)) {
- result.put(each, YamlEngine.unmarshal(yamlContext,
StorageNodeDataSource.class));
+ result.put(each, new
YamlStorageNodeDataSourceSwapper().swapToObject(YamlEngine.unmarshal(yamlContext,
YamlStorageNodeDataSource.class)));
}
});
return result;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
index 6567c35bf81..2cdab3503e5 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriber.java
@@ -18,10 +18,11 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageNode;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml.YamlStorageNodeDataSourceSwapper;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
@@ -50,7 +51,7 @@ public final class StorageNodeStatusSubscriber {
@Subscribe
public void update(final DataSourceDisabledEvent event) {
repository.persist(StorageNode.getStatusPath(new
QualifiedDatabase(event.getDatabaseName(), event.getGroupName(),
event.getDataSourceName())),
- YamlEngine.marshal(event.getStorageNodeDataSource()));
+ YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(event.getStorageNodeDataSource())));
}
/**
@@ -60,6 +61,7 @@ public final class StorageNodeStatusSubscriber {
*/
@Subscribe
public void update(final PrimaryDataSourceChangedEvent event) {
-
repository.persist(StorageNode.getStatusPath(event.getQualifiedDatabase()),
YamlEngine.marshal(new StorageNodeDataSource(StorageNodeRole.PRIMARY,
StorageNodeStatus.ENABLED)));
+
repository.persist(StorageNode.getStatusPath(event.getQualifiedDatabase()),
+ YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new
StorageNodeDataSource(StorageNodeRole.PRIMARY, StorageNodeStatus.ENABLED))));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
index d06e7878e3e..b2e5889a1b2 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
@@ -19,14 +19,16 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import com.google.common.base.Strings;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageNode;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml.YamlStorageNodeDataSource;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml.YamlStorageNodeDataSourceSwapper;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -58,8 +60,8 @@ public final class StorageNodeStateChangedWatcher implements
GovernanceWatcher<G
Optional<QualifiedDatabase> qualifiedDatabase =
StorageNode.extractQualifiedDatabase(event.getKey());
if (qualifiedDatabase.isPresent()) {
QualifiedDatabase database = qualifiedDatabase.get();
- StorageNodeDataSource storageNodeDataSource =
YamlEngine.unmarshal(event.getValue(), StorageNodeDataSource.class);
- if
(StorageNodeRole.PRIMARY.name().toLowerCase().equals(storageNodeDataSource.getRole()))
{
+ StorageNodeDataSource storageNodeDataSource = new
YamlStorageNodeDataSourceSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
YamlStorageNodeDataSource.class));
+ if (StorageNodeRole.PRIMARY == storageNodeDataSource.getRole()) {
return Optional.of(new PrimaryStateChangedEvent(database));
}
return Optional.of(new StorageNodeChangedEvent(database,
storageNodeDataSource));
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/yaml/YamlStorageNodeDataSource.java
similarity index 64%
copy from
infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
copy to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/yaml/YamlStorageNodeDataSource.java
index 8671cb625a1..6d2132eed6f 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/datasource/state/DataSourceState.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/yaml/YamlStorageNodeDataSource.java
@@ -15,12 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.datasource.state;
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Data source state.
+ * YAML storage node data source.
*/
-public enum DataSourceState {
+@Getter
+@Setter
+public final class YamlStorageNodeDataSource implements YamlConfiguration {
+
+ private String role;
+
+ private String status;
- DISABLED, ENABLED
+ private long replicationDelayMilliseconds;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/yaml/YamlStorageNodeDataSourceSwapper.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/yaml/YamlStorageNodeDataSourceSwapper.java
new file mode 100644
index 00000000000..c909d56830b
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/yaml/YamlStorageNodeDataSourceSwapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml;
+
+import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+
+/**
+ * YAML storage node data source swapper.
+ */
+public final class YamlStorageNodeDataSourceSwapper implements
YamlConfigurationSwapper<YamlStorageNodeDataSource, StorageNodeDataSource> {
+
+ @Override
+ public YamlStorageNodeDataSource swapToYamlConfiguration(final
StorageNodeDataSource data) {
+ YamlStorageNodeDataSource result = new YamlStorageNodeDataSource();
+ result.setRole(data.getRole().name());
+ result.setStatus(data.getStatus().name());
+
result.setReplicationDelayMilliseconds(data.getReplicationDelayMilliseconds());
+ return result;
+ }
+
+ @Override
+ public StorageNodeDataSource swapToObject(final YamlStorageNodeDataSource
yamlConfig) {
+ return new
StorageNodeDataSource(StorageNodeRole.valueOf(yamlConfig.getRole()),
StorageNodeStatus.valueOf(yamlConfig.getStatus()),
yamlConfig.getReplicationDelayMilliseconds());
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
index f2c582e8d63..abebb53ac9c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
@@ -129,7 +129,7 @@ public final class ConfigurationChangedSubscriber {
private void disableDataSources(final StaticDataSourceContainedRule rule) {
Map<String, StorageNodeDataSource> storageNodes =
registryCenter.getStorageNodeStatusService().loadStorageNodes();
Map<String, StorageNodeDataSource> disableDataSources =
storageNodes.entrySet()
- .stream().filter(entry ->
StorageNodeStatus.isDisable(entry.getValue().getStatus())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ .stream().filter(entry -> StorageNodeStatus.DISABLED ==
entry.getValue().getStatus()).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
disableDataSources.forEach((key, value) -> rule.updateStatus(new
StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 2961faddd68..c65e0916761 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -75,7 +75,7 @@ public final class StateChangedSubscriber {
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class);
staticDataSourceRule.ifPresent(optional -> optional.updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
DataSourceStateManager.getInstance().updateState(
- qualifiedDatabase.getDatabaseName(),
qualifiedDatabase.getDataSourceName(),
DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
+ qualifiedDatabase.getDatabaseName(),
qualifiedDatabase.getDataSourceName(),
DataSourceState.valueOf(event.getDataSource().getStatus().name()));
}
/**
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
index c4a5b51b21e..5dc9eafb96b 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/StorageNodeStatusSubscriberTest.java
@@ -17,13 +17,14 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageNode;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.yaml.YamlStorageNodeDataSourceSwapper;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node.StorageNode;
import
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
import
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -50,7 +51,8 @@ public final class StorageNodeStatusSubscriberTest {
StorageNodeDataSource storageNodeDataSource = new
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED);
DataSourceDisabledEvent dataSourceDisabledEvent = new
DataSourceDisabledEvent(databaseName, groupName, dataSourceName,
storageNodeDataSource);
new StorageNodeStatusSubscriber(repository,
eventBusContext).update(dataSourceDisabledEvent);
- verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(storageNodeDataSource));
+ verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+ YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED))));
}
@Test
@@ -61,7 +63,8 @@ public final class StorageNodeStatusSubscriberTest {
StorageNodeDataSource storageNodeDataSource = new
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.ENABLED);
DataSourceDisabledEvent dataSourceDisabledEvent = new
DataSourceDisabledEvent(databaseName, groupName, dataSourceName,
storageNodeDataSource);
new StorageNodeStatusSubscriber(repository,
eventBusContext).update(dataSourceDisabledEvent);
- verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
YamlEngine.marshal(storageNodeDataSource));
+ verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
+ YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(storageNodeDataSource)));
}
@Test
@@ -72,6 +75,6 @@ public final class StorageNodeStatusSubscriberTest {
PrimaryDataSourceChangedEvent event = new
PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName,
dataSourceName));
new StorageNodeStatusSubscriber(repository,
eventBusContext).update(event);
verify(repository).persist(StorageNode.getStatusPath(new
QualifiedDatabase(databaseName, groupName, dataSourceName)),
- YamlEngine.marshal(new
StorageNodeDataSource(StorageNodeRole.PRIMARY, StorageNodeStatus.ENABLED)));
+ YamlEngine.marshal(new
YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(new
StorageNodeDataSource(StorageNodeRole.PRIMARY, StorageNodeStatus.ENABLED))));
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcherTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcherTest.java
index 527964ea3f9..29112de08fd 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcherTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcherTest.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
import org.junit.Test;
@@ -36,7 +38,7 @@ public final class StorageNodeStateChangedWatcherTest {
@Test
public void assertCreatePrimaryStateChangedEvent() {
Optional<GovernanceEvent> actual = new
StorageNodeStateChangedWatcher().createGovernanceEvent(
- new
DataChangedEvent("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0",
"role: primary\nstatus: enable\n", Type.ADDED));
+ new
DataChangedEvent("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0",
"role: PRIMARY\nstatus: ENABLED\n", Type.ADDED));
assertTrue(actual.isPresent());
PrimaryStateChangedEvent actualEvent = (PrimaryStateChangedEvent)
actual.get();
assertThat(actualEvent.getQualifiedDatabase().getDatabaseName(),
is("replica_query_db"));
@@ -47,27 +49,27 @@ public final class StorageNodeStateChangedWatcherTest {
@Test
public void assertCreateEnabledStorageNodeChangedEvent() {
Optional<GovernanceEvent> actual = new
StorageNodeStateChangedWatcher().createGovernanceEvent(
- new
DataChangedEvent("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0",
"role: member\nstatus: enable\n", Type.ADDED));
+ new
DataChangedEvent("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0",
"role: MEMBER\nstatus: ENABLED\n", Type.ADDED));
assertTrue(actual.isPresent());
StorageNodeChangedEvent actualEvent = (StorageNodeChangedEvent)
actual.get();
assertThat(actualEvent.getQualifiedDatabase().getDatabaseName(),
is("replica_query_db"));
assertThat(actualEvent.getQualifiedDatabase().getGroupName(),
is("readwrite_ds"));
assertThat(actualEvent.getQualifiedDatabase().getDataSourceName(),
is("replica_ds_0"));
- assertThat(actualEvent.getDataSource().getRole(), is("member"));
- assertThat(actualEvent.getDataSource().getStatus(), is("enable"));
+ assertThat(actualEvent.getDataSource().getRole(),
is(StorageNodeRole.MEMBER));
+ assertThat(actualEvent.getDataSource().getStatus(),
is(StorageNodeStatus.ENABLED));
}
@Test
public void assertCreateDisabledStorageNodeChangedEvent() {
Optional<GovernanceEvent> actual = new
StorageNodeStateChangedWatcher().createGovernanceEvent(
- new
DataChangedEvent("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0",
"role: member\nstatus: disable\n", Type.DELETED));
+ new
DataChangedEvent("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0",
"role: MEMBER\nstatus: DISABLED\n", Type.DELETED));
assertTrue(actual.isPresent());
StorageNodeChangedEvent actualEvent = (StorageNodeChangedEvent)
actual.get();
assertThat(actualEvent.getQualifiedDatabase().getDatabaseName(),
is("replica_query_db"));
assertThat(actualEvent.getQualifiedDatabase().getGroupName(),
is("readwrite_ds"));
assertThat(actualEvent.getQualifiedDatabase().getDataSourceName(),
is("replica_ds_0"));
- assertThat(actualEvent.getDataSource().getRole(), is("member"));
- assertThat(actualEvent.getDataSource().getStatus(), is("disable"));
+ assertThat(actualEvent.getDataSource().getRole(),
is(StorageNodeRole.MEMBER));
+ assertThat(actualEvent.getDataSource().getStatus(),
is(StorageNodeStatus.DISABLED));
}
@Test
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowStatusFromReadwriteSplittingRulesHandler.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowStatusFromReadwriteSplittingRulesHandler.java
index 1654e5dfaf5..068a3b675dc 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowStatusFromReadwriteSplittingRulesHandler.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowStatusFromReadwriteSplittingRulesHandler.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -106,7 +107,7 @@ public final class
ShowStatusFromReadwriteSplittingRulesHandler extends Abstract
}
Map<String, StorageNodeDataSource> storageNodes = new
StorageNodeStatusService((ClusterPersistRepository)
persistService.getRepository()).loadStorageNodes();
Map<String, StorageNodeDataSource> result = new HashMap<>();
- storageNodes.entrySet().stream().filter(entry ->
"member".equalsIgnoreCase(entry.getValue().getRole())).forEach(entry -> {
+ storageNodes.entrySet().stream().filter(entry ->
StorageNodeRole.MEMBER == entry.getValue().getRole()).forEach(entry -> {
QualifiedDatabase qualifiedDatabase = new
QualifiedDatabase(entry.getKey());
if
(databaseName.equalsIgnoreCase(qualifiedDatabase.getDatabaseName())) {
result.put(qualifiedDatabase.getDataSourceName(),
entry.getValue());
@@ -117,7 +118,7 @@ public final class
ShowStatusFromReadwriteSplittingRulesHandler extends Abstract
private Collection<LocalDataQueryResultRow> buildRows(final
Collection<String> readResources, final Map<String, StorageNodeDataSource>
persistentReadResources) {
Map<String, Map<String, StorageNodeDataSource>>
persistentReadResourceGroup = persistentReadResources.entrySet().stream()
- .collect(Collectors.groupingBy(each ->
each.getValue().getStatus().toUpperCase(), Collectors.toMap(Entry::getKey,
Entry::getValue)));
+ .collect(Collectors.groupingBy(each ->
each.getValue().getStatus().name(), Collectors.toMap(Entry::getKey,
Entry::getValue)));
Map<String, StorageNodeDataSource> disabledReadResources =
persistentReadResourceGroup.getOrDefault(StorageNodeStatus.DISABLED.name(),
Collections.emptyMap());
Map<String, StorageNodeDataSource> enabledReadResources =
persistentReadResourceGroup.getOrDefault(StorageNodeStatus.ENABLED.name(),
Collections.emptyMap());
readResources.removeIf(disabledReadResources::containsKey);
@@ -132,10 +133,10 @@ public final class
ShowStatusFromReadwriteSplittingRulesHandler extends Abstract
private LocalDataQueryResultRow buildRow(final String resource, final
StorageNodeDataSource storageNodeDataSource) {
if (null == storageNodeDataSource) {
- return new LocalDataQueryResultRow(resource,
StorageNodeStatus.ENABLED.name().toLowerCase(), "0");
+ return new LocalDataQueryResultRow(resource,
StorageNodeStatus.ENABLED.name(), "0");
}
long replicationDelayMilliseconds =
storageNodeDataSource.getReplicationDelayMilliseconds();
- String status =
StorageNodeStatus.valueOf(storageNodeDataSource.getStatus().toUpperCase()).name().toLowerCase();
+ String status = storageNodeDataSource.getStatus().name();
return new LocalDataQueryResultRow(resource, status,
Long.toString(replicationDelayMilliseconds));
}
}
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterReadwriteSplittingStorageUnitStatusStatementHandler.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterReadwriteSplittingStorageUnitStatusStatementHandler.java
index 67a25c3d807..bfd2db0fe22 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterReadwriteSplittingStorageUnitStatusStatementHandler.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterReadwriteSplittingStorageUnitStatusStatementHandler.java
@@ -186,7 +186,7 @@ public final class
AlterReadwriteSplittingStorageUnitStatusStatementHandler exte
private Collection<QualifiedDatabase> getDisabledStorageNodes(final String
databaseName, final MetaDataPersistService persistService) {
Map<String, StorageNodeDataSource> storageNodes = new
StorageNodeStatusService((ClusterPersistRepository)
persistService.getRepository()).loadStorageNodes();
- return storageNodes.entrySet().stream().filter(each ->
StorageNodeStatus.DISABLED.name().equalsIgnoreCase(each.getValue().getStatus()))
+ return storageNodes.entrySet().stream().filter(each ->
StorageNodeStatus.DISABLED == each.getValue().getStatus())
.map(each -> new QualifiedDatabase(each.getKey())).filter(each
->
databaseName.equalsIgnoreCase(each.getDatabaseName())).collect(Collectors.toList());
}