This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 959aeb3769c Refactor usage of ShardingSphereRuleMetaData.findRules for 
cluster-mode-core module (#23746)
959aeb3769c is described below

commit 959aeb3769c51c0d475d433abed7e6f9d68d4a0d
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Jan 27 23:20:42 2023 +0800

    Refactor usage of ShardingSphereRuleMetaData.findRules for 
cluster-mode-core module (#23746)
    
    * Refactor usage of ShardingSphereRuleMetaData.findRules for 
ConfigurationChangedSubscriber
    
    * Refactor usage of ShardingSphereRuleMetaData.findRules for 
StateChangedSubscriber
---
 .../subscriber/ConfigurationChangedSubscriber.java | 14 ++++++------
 .../subscriber/StateChangedSubscriber.java         | 25 ++++++++++------------
 .../subscriber/StateChangedSubscriberTest.java     |  3 ++-
 3 files changed, 20 insertions(+), 22 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
index 024952a7c40..f2c582e8d63 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 import com.google.common.eventbus.Subscribe;
 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -36,6 +37,7 @@ import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSour
 
 import java.util.Collection;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
 /**
@@ -119,17 +121,15 @@ public final class ConfigurationChangedSubscriber {
     }
     
     private void disableDataSources() {
-        
contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, 
value) -> value.getRuleMetaData().getRules().forEach(each -> {
-            if (each instanceof StaticDataSourceContainedRule) {
-                disableDataSources((StaticDataSourceContainedRule) each);
-            }
-        }));
+        for (Entry<String, ShardingSphereDatabase> entry : 
contextManager.getMetaDataContexts().getMetaData().getDatabases().entrySet()) {
+            
entry.getValue().getRuleMetaData().findRules(StaticDataSourceContainedRule.class).forEach(this::disableDataSources);
+        }
     }
     
     private void disableDataSources(final StaticDataSourceContainedRule rule) {
         Map<String, StorageNodeDataSource> storageNodes = 
registryCenter.getStorageNodeStatusService().loadStorageNodes();
-        Map<String, StorageNodeDataSource> disableDataSources = 
storageNodes.entrySet().stream().filter(entry -> 
StorageNodeStatus.isDisable(entry.getValue().getStatus()))
-                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+        Map<String, StorageNodeDataSource> disableDataSources = 
storageNodes.entrySet()
+                .stream().filter(entry -> 
StorageNodeStatus.isDisable(entry.getValue().getStatus())).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
         disableDataSources.forEach((key, value) -> rule.updateStatus(new 
StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
index 20390036f02..2961faddd68 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -22,7 +22,6 @@ import 
org.apache.shardingsphere.infra.datasource.state.DataSourceState;
 import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -30,8 +29,8 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
@@ -66,16 +65,15 @@ public final class StateChangedSubscriber {
         if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName()))
 {
             return;
         }
-        Optional<ShardingSphereRule> dynamicDataSourceRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
-                .getRules().stream().filter(each -> each instanceof 
DynamicDataSourceContainedRule).findFirst();
+        Optional<DynamicDataSourceContainedRule> dynamicDataSourceRule = 
contextManager.getMetaDataContexts()
+                
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(DynamicDataSourceContainedRule.class);
         if (dynamicDataSourceRule.isPresent()) {
-            ((DynamicDataSourceContainedRule) 
dynamicDataSourceRule.get()).updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+            dynamicDataSourceRule.get().updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
             return;
         }
-        Optional<ShardingSphereRule> staticDataSourceRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
-                .getRules().stream().filter(each -> each instanceof 
StaticDataSourceContainedRule).findFirst();
-        staticDataSourceRule.ifPresent(optional -> 
((StaticDataSourceContainedRule) optional)
-                .updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+        Optional<StaticDataSourceContainedRule> staticDataSourceRule = 
contextManager.getMetaDataContexts()
+                
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class);
+        staticDataSourceRule.ifPresent(optional -> optional.updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
         DataSourceStateManager.getInstance().updateState(
                 qualifiedDatabase.getDatabaseName(), 
qualifiedDatabase.getDataSourceName(), 
DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
     }
@@ -91,11 +89,10 @@ public final class StateChangedSubscriber {
             return;
         }
         QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-        
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
-                .stream()
-                .filter(each -> each instanceof DynamicDataSourceContainedRule)
-                .forEach(each -> ((DynamicDataSourceContainedRule) each)
-                        .restartHeartBeatJob(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+        for (DynamicDataSourceContainedRule each : 
contextManager.getMetaDataContexts()
+                
.getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().findRules(DynamicDataSourceContainedRule.class))
 {
+            each.restartHeartBeatJob(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase));
+        }
     }
     
     /**
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index dbe8752829b..cb772b9fa98 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -119,7 +120,7 @@ public final class StateChangedSubscriberTest {
     @Test
     public void assertRenewForDisableStateChanged() {
         StaticDataSourceContainedRule staticDataSourceRule = 
mock(StaticDataSourceContainedRule.class);
-        
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
+        
when(database.getRuleMetaData().findSingleRule(StaticDataSourceContainedRule.class)).thenReturn(Optional.of(staticDataSourceRule));
         StorageNodeChangedEvent event = new StorageNodeChangedEvent(new 
QualifiedDatabase("db.readwrite_ds.ds_0"), new 
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
         subscriber.renew(event);
         verify(staticDataSourceRule).updateStatus(argThat(

Reply via email to