This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 959aeb3769c Refactor usage of ShardingSphereRuleMetaData.findRules for
cluster-mode-core module (#23746)
959aeb3769c is described below
commit 959aeb3769c51c0d475d433abed7e6f9d68d4a0d
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Jan 27 23:20:42 2023 +0800
Refactor usage of ShardingSphereRuleMetaData.findRules for
cluster-mode-core module (#23746)
* Refactor usage of ShardingSphereRuleMetaData.findRules for
ConfigurationChangedSubscriber
* Refactor usage of ShardingSphereRuleMetaData.findRules for
StateChangedSubscriber
---
.../subscriber/ConfigurationChangedSubscriber.java | 14 ++++++------
.../subscriber/StateChangedSubscriber.java | 25 ++++++++++------------
.../subscriber/StateChangedSubscriberTest.java | 3 ++-
3 files changed, 20 insertions(+), 22 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
index 024952a7c40..f2c582e8d63 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -36,6 +37,7 @@ import
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSour
import java.util.Collection;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
@@ -119,17 +121,15 @@ public final class ConfigurationChangedSubscriber {
}
private void disableDataSources() {
-
contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key,
value) -> value.getRuleMetaData().getRules().forEach(each -> {
- if (each instanceof StaticDataSourceContainedRule) {
- disableDataSources((StaticDataSourceContainedRule) each);
- }
- }));
+ for (Entry<String, ShardingSphereDatabase> entry :
contextManager.getMetaDataContexts().getMetaData().getDatabases().entrySet()) {
+
entry.getValue().getRuleMetaData().findRules(StaticDataSourceContainedRule.class).forEach(this::disableDataSources);
+ }
}
private void disableDataSources(final StaticDataSourceContainedRule rule) {
Map<String, StorageNodeDataSource> storageNodes =
registryCenter.getStorageNodeStatusService().loadStorageNodes();
- Map<String, StorageNodeDataSource> disableDataSources =
storageNodes.entrySet().stream().filter(entry ->
StorageNodeStatus.isDisable(entry.getValue().getStatus()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ Map<String, StorageNodeDataSource> disableDataSources =
storageNodes.entrySet()
+ .stream().filter(entry ->
StorageNodeStatus.isDisable(entry.getValue().getStatus())).collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
disableDataSources.forEach((key, value) -> rule.updateStatus(new
StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 20390036f02..2961faddd68 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -22,7 +22,6 @@ import
org.apache.shardingsphere.infra.datasource.state.DataSourceState;
import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -30,8 +29,8 @@ import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
import
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
@@ -66,16 +65,15 @@ public final class StateChangedSubscriber {
if
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName()))
{
return;
}
- Optional<ShardingSphereRule> dynamicDataSourceRule =
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
- .getRules().stream().filter(each -> each instanceof
DynamicDataSourceContainedRule).findFirst();
+ Optional<DynamicDataSourceContainedRule> dynamicDataSourceRule =
contextManager.getMetaDataContexts()
+
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class);
if (dynamicDataSourceRule.isPresent()) {
- ((DynamicDataSourceContainedRule)
dynamicDataSourceRule.get()).updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+ dynamicDataSourceRule.get().updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
return;
}
- Optional<ShardingSphereRule> staticDataSourceRule =
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
- .getRules().stream().filter(each -> each instanceof
StaticDataSourceContainedRule).findFirst();
- staticDataSourceRule.ifPresent(optional ->
((StaticDataSourceContainedRule) optional)
- .updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+ Optional<StaticDataSourceContainedRule> staticDataSourceRule =
contextManager.getMetaDataContexts()
+
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class);
+ staticDataSourceRule.ifPresent(optional -> optional.updateStatus(new
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
DataSourceStateManager.getInstance().updateState(
qualifiedDatabase.getDatabaseName(),
qualifiedDatabase.getDataSourceName(),
DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
}
@@ -91,11 +89,10 @@ public final class StateChangedSubscriber {
return;
}
QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
- .stream()
- .filter(each -> each instanceof DynamicDataSourceContainedRule)
- .forEach(each -> ((DynamicDataSourceContainedRule) each)
- .restartHeartBeatJob(new
PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+ for (DynamicDataSourceContainedRule each :
contextManager.getMetaDataContexts()
+
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findRules(DynamicDataSourceContainedRule.class))
{
+ each.restartHeartBeatJob(new
PrimaryDataSourceChangedEvent(qualifiedDatabase));
+ }
}
/**
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index dbe8752829b..cb772b9fa98 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
@@ -119,7 +120,7 @@ public final class StateChangedSubscriberTest {
@Test
public void assertRenewForDisableStateChanged() {
StaticDataSourceContainedRule staticDataSourceRule =
mock(StaticDataSourceContainedRule.class);
-
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
+
when(database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)).thenReturn(Optional.of(staticDataSourceRule));
StorageNodeChangedEvent event = new StorageNodeChangedEvent(new
QualifiedDatabase("db.readwrite_ds.ds_0"), new
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
subscriber.renew(event);
verify(staticDataSourceRule).updateStatus(argThat(