This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f1d2d480f17 Add ContextManagerSubscriberFacade (#21995)
f1d2d480f17 is described below

commit f1d2d480f172fa78825d561661cba25113e66c50
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 8 08:16:55 2022 +0800

    Add ContextManagerSubscriberFacade (#21995)
    
    * Add ResourceMetaDataCoordinator
    
    * Add coordinator.subscriber package
    
    * Add ContextManagerSubscriberFacade
    
    * Add ContextManagerSubscriberFacade
---
 .../cluster/ClusterContextManagerBuilder.java      |   4 +-
 .../coordinator/ContextManagerCoordinator.java     | 363 ---------------------
 .../subscriber/ConfigurationChangedSubscriber.java | 135 ++++++++
 .../subscriber/ContextManagerSubscriberFacade.java |  36 ++
 .../subscriber/DatabaseChangedSubscriber.java      |  91 ++++++
 .../subscriber/ProcessListChangedSubscriber.java   | 116 +++++++
 .../ResourceMetaDataChangedSubscriber.java         | 103 ++++++
 .../subscriber/StateChangedSubscriber.java         | 140 ++++++++
 .../ConfigurationChangedSubscriberTest.java}       | 157 +--------
 .../ProcessListChangedSubscriberTest.java          | 143 ++++++++
 .../ResourceMetaDataChangedSubscriberTest.java     | 163 +++++++++
 .../subscriber/StateChangedSubscriberTest.java     | 184 +++++++++++
 12 files changed, 1123 insertions(+), 512 deletions(-)

diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index e3247de01ff..0e47566a4f4 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -26,7 +26,7 @@ import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.ContextManagerCoordinator;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberFacade;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -86,7 +86,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
                                 final ContextManagerBuilderParameter 
parameter, final ContextManager contextManager) {
         
contextManager.getInstanceContext().getInstance().setLabels(parameter.getLabels());
         
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
-        new ContextManagerCoordinator(persistService, registryCenter, 
contextManager);
+        new ContextManagerSubscriberFacade(persistService, registryCenter, 
contextManager);
         
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
deleted file mode 100644
index 8e8d76c3bbb..00000000000
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinator.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
-
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
-import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
-import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
-import org.apache.shardingsphere.mode.process.node.ProcessNode;
-
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Context manager coordinator.
- */
-@SuppressWarnings("UnstableApiUsage")
-public final class ContextManagerCoordinator {
-    
-    private final MetaDataPersistService persistService;
-    
-    private final RegistryCenter registryCenter;
-    
-    private final ContextManager contextManager;
-    
-    public ContextManagerCoordinator(final MetaDataPersistService 
persistService, final RegistryCenter registryCenter, final ContextManager 
contextManager) {
-        this.persistService = persistService;
-        this.registryCenter = registryCenter;
-        this.contextManager = contextManager;
-        
contextManager.getInstanceContext().getEventBusContext().register(this);
-        new ResourceMetaDataCoordinator(contextManager);
-        disableDataSources();
-    }
-    
-    /**
-     * Renew rule configurations.
-     *
-     * @param event rule configurations changed event
-     */
-    @Subscribe
-    public synchronized void renew(final RuleConfigurationsChangedEvent event) 
{
-        if 
(persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(),
 event.getDatabaseVersion())) {
-            contextManager.alterRuleConfiguration(event.getDatabaseName(), 
event.getRuleConfigurations());
-            disableDataSources();
-        }
-    }
-    
-    /**
-     * Renew data source configuration.
-     *
-     * @param event data source changed event.
-     */
-    @Subscribe
-    public synchronized void renew(final DataSourceChangedEvent event) {
-        if 
(persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(),
 event.getDatabaseVersion())) {
-            
contextManager.alterDataSourceConfiguration(event.getDatabaseName(), 
event.getDataSourcePropertiesMap());
-            disableDataSources();
-        }
-    }
-    
-    /**
-     * Renew disabled data source names.
-     *
-     * @param event Storage node changed event
-     */
-    @Subscribe
-    public synchronized void renew(final StorageNodeChangedEvent event) {
-        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-        if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName()))
 {
-            return;
-        }
-        Optional<ShardingSphereRule> dynamicDataSourceRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
-                .getRules().stream().filter(each -> each instanceof 
DynamicDataSourceContainedRule).findFirst();
-        if (dynamicDataSourceRule.isPresent()) {
-            ((DynamicDataSourceContainedRule) 
dynamicDataSourceRule.get()).updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
-            return;
-        }
-        Optional<ShardingSphereRule> staticDataSourceRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
-                .getRules().stream().filter(each -> each instanceof 
StaticDataSourceContainedRule).findFirst();
-        staticDataSourceRule.ifPresent(optional -> 
((StaticDataSourceContainedRule) optional)
-                .updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
-        DataSourceStateManager.getInstance().updateState(
-                qualifiedDatabase.getDatabaseName(), 
qualifiedDatabase.getDataSourceName(), 
DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
-    }
-    
-    /**
-     * Renew primary data source names.
-     *
-     * @param event primary state changed event
-     */
-    @Subscribe
-    public synchronized void renew(final PrimaryStateChangedEvent event) {
-        if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName()))
 {
-            return;
-        }
-        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
-        
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
-                .stream()
-                .filter(each -> each instanceof DynamicDataSourceContainedRule)
-                .forEach(each -> ((DynamicDataSourceContainedRule) each)
-                        .restartHeartBeatJob(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase)));
-    }
-    
-    /**
-     * Renew global rule configurations.
-     *
-     * @param event global rule configurations changed event
-     */
-    @Subscribe
-    public synchronized void renew(final GlobalRuleConfigurationsChangedEvent 
event) {
-        
contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
-        disableDataSources();
-    }
-    
-    /**
-     * Renew instance status.
-     *
-     * @param event state event
-     */
-    @Subscribe
-    public synchronized void renew(final StateEvent event) {
-        
contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), 
event.getStatus());
-    }
-    
-    /**
-     * Renew instance labels.
-     * 
-     * @param event label event
-     */
-    @Subscribe
-    public synchronized void renew(final LabelsEvent event) {
-        // TODO labels may be empty
-        contextManager.getInstanceContext().updateLabel(event.getInstanceId(), 
event.getLabels());
-    }
-    
-    /**
-     * Renew instance list.
-     *
-     * @param event compute node online event
-     */
-    @Subscribe
-    public synchronized void renew(final InstanceOnlineEvent event) {
-        
contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
-    }
-    
-    /**
-     * Renew instance list.
-     *
-     * @param event compute node offline event
-     */
-    @Subscribe
-    public synchronized void renew(final InstanceOfflineEvent event) {
-        contextManager.getInstanceContext().deleteComputeNodeInstance(new 
ComputeNodeInstance(event.getInstanceMetaData()));
-    }
-    
-    /**
-     * Renew with new database version.
-     *
-     * @param event database version changed event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseVersionChangedEvent event) {
-        Map<String, DataSourceProperties> dataSourcePropertiesMap = 
persistService.getDataSourceService().load(event.getDatabaseName(), 
event.getActiveVersion());
-        Collection<RuleConfiguration> ruleConfigs = 
persistService.getDatabaseRulePersistService().load(event.getDatabaseName(), 
event.getActiveVersion());
-        
contextManager.alterDataSourceAndRuleConfiguration(event.getDatabaseName(), 
dataSourcePropertiesMap, ruleConfigs);
-        disableDataSources();
-    }
-    
-    /**
-     * Renew properties.
-     *
-     * @param event properties changed event
-     */
-    @Subscribe
-    public synchronized void renew(final PropertiesChangedEvent event) {
-        contextManager.alterProperties(event.getProps());
-    }
-    
-    /**
-     * Renew to persist ShardingSphere database data.
-     *
-     * @param event database data added event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseDataAddedEvent event) {
-        contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
-    }
-    
-    /**
-     * Renew to delete ShardingSphere data database.
-     *
-     * @param event database delete event
-     */
-    @Subscribe
-    public synchronized void renew(final DatabaseDataDeletedEvent event) {
-        contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
-    }
-    
-    /**
-     * Renew to added ShardingSphere data schema.
-     *
-     * @param event schema added event
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaDataAddedEvent event) {
-        contextManager.addShardingSphereSchemaData(event.getDatabaseName(), 
event.getSchemaName());
-    }
-    
-    /**
-     * Renew to delete ShardingSphere data schema.
-     *
-     * @param event schema delete event
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaDataDeletedEvent event) {
-        contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), 
event.getSchemaName());
-    }
-    
-    /**
-     * Renew ShardingSphere data of the table.
-     *
-     * @param event table data changed event
-     */
-    @Subscribe
-    public synchronized void renew(final TableDataChangedEvent event) {
-        contextManager.alterSchemaData(event.getDatabaseName(), 
event.getSchemaName(), event.getChangedTableData());
-        contextManager.alterSchemaData(event.getDatabaseName(), 
event.getSchemaName(), event.getDeletedTable());
-    }
-    
-    /**
-     * Trigger show process list.
-     *
-     * @param event show process list trigger event
-     */
-    @Subscribe
-    public synchronized void triggerShowProcessList(final 
ShowProcessListTriggerEvent event) {
-        if 
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
 {
-            return;
-        }
-        Collection<ExecuteProcessContext> processContexts = 
ShowProcessListManager.getInstance().getAllProcessContext();
-        if (!processContexts.isEmpty()) {
-            
registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(),
 event.getInstanceId()),
-                    YamlEngine.marshal(new 
BatchYamlExecuteProcessContext(processContexts)));
-        }
-        
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(),
 event.getProcessListId()));
-    }
-    
-    /**
-     * Trigger show process list.
-     *
-     * @param event show process list trigger event
-     * @throws SQLException SQL exception
-     */
-    @Subscribe
-    public synchronized void killProcessListId(final KillProcessListIdEvent 
event) throws SQLException {
-        if 
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
 {
-            return;
-        }
-        Collection<Statement> statements = 
ShowProcessListManager.getInstance().getProcessStatement(event.getProcessListId());
-        for (Statement statement : statements) {
-            statement.cancel();
-        }
-        
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessListId()));
-    }
-    
-    /**
-     * Complete unit show process list.
-     *
-     * @param event show process list unit complete event
-     */
-    @Subscribe
-    public synchronized void completeUnitShowProcessList(final 
ShowProcessListUnitCompleteEvent event) {
-        ShowProcessListSimpleLock simpleLock = 
ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
-        if (null != simpleLock) {
-            simpleLock.doNotify();
-        }
-    }
-    
-    /**
-     * Complete unit kill process list id.
-     *
-     * @param event kill process list id unit complete event
-     */
-    @Subscribe
-    public synchronized void completeUnitKillProcessListId(final 
KillProcessListIdUnitCompleteEvent event) {
-        ShowProcessListSimpleLock simpleLock = 
ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
-        if (null != simpleLock) {
-            simpleLock.doNotify();
-        }
-    }
-    
-    private void disableDataSources() {
-        
contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, 
value) -> value.getRuleMetaData().getRules().forEach(each -> {
-            if (each instanceof StaticDataSourceContainedRule) {
-                disableDataSources((StaticDataSourceContainedRule) each);
-            }
-        }));
-    }
-    
-    private void disableDataSources(final StaticDataSourceContainedRule rule) {
-        Map<String, StorageNodeDataSource> storageNodes = 
registryCenter.getStorageNodeStatusService().loadStorageNodes();
-        Map<String, StorageNodeDataSource> disableDataSources = 
storageNodes.entrySet().stream().filter(entry -> 
StorageNodeStatus.isDisable(entry.getValue().getStatus()))
-                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-        disableDataSources.forEach((key, value) -> rule.updateStatus(new 
StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
-    }
-}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
new file mode 100644
index 00000000000..b02dcea8072
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriber.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Configuration changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ConfigurationChangedSubscriber {
+    
+    private final MetaDataPersistService persistService;
+    
+    private final RegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public ConfigurationChangedSubscriber(final MetaDataPersistService 
persistService, final RegistryCenter registryCenter, final ContextManager 
contextManager) {
+        this.persistService = persistService;
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        
contextManager.getInstanceContext().getEventBusContext().register(this);
+        disableDataSources();
+    }
+    
+    /**
+     * Renew data source configuration.
+     *
+     * @param event data source changed event.
+     */
+    @Subscribe
+    public synchronized void renew(final DataSourceChangedEvent event) {
+        if 
(persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(),
 event.getDatabaseVersion())) {
+            
contextManager.alterDataSourceConfiguration(event.getDatabaseName(), 
event.getDataSourcePropertiesMap());
+            disableDataSources();
+        }
+    }
+    
+    /**
+     * Renew rule configurations.
+     *
+     * @param event rule configurations changed event
+     */
+    @Subscribe
+    public synchronized void renew(final RuleConfigurationsChangedEvent event) 
{
+        if 
(persistService.getMetaDataVersionPersistService().isActiveVersion(event.getDatabaseName(),
 event.getDatabaseVersion())) {
+            contextManager.alterRuleConfiguration(event.getDatabaseName(), 
event.getRuleConfigurations());
+            disableDataSources();
+        }
+    }
+    
+    /**
+     * Renew global rule configurations.
+     *
+     * @param event global rule configurations changed event
+     */
+    @Subscribe
+    public synchronized void renew(final GlobalRuleConfigurationsChangedEvent 
event) {
+        
contextManager.alterGlobalRuleConfiguration(event.getRuleConfigurations());
+        disableDataSources();
+    }
+    
+    /**
+     * Renew with new database version.
+     *
+     * @param event database version changed event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseVersionChangedEvent event) {
+        Map<String, DataSourceProperties> dataSourcePropertiesMap = 
persistService.getDataSourceService().load(event.getDatabaseName(), 
event.getActiveVersion());
+        Collection<RuleConfiguration> ruleConfigs = 
persistService.getDatabaseRulePersistService().load(event.getDatabaseName(), 
event.getActiveVersion());
+        
contextManager.alterDataSourceAndRuleConfiguration(event.getDatabaseName(), 
dataSourcePropertiesMap, ruleConfigs);
+        disableDataSources();
+    }
+    
+    /**
+     * Renew properties.
+     *
+     * @param event properties changed event
+     */
+    @Subscribe
+    public synchronized void renew(final PropertiesChangedEvent event) {
+        contextManager.alterProperties(event.getProps());
+    }
+    
+    private void disableDataSources() {
+        
contextManager.getMetaDataContexts().getMetaData().getDatabases().forEach((key, 
value) -> value.getRuleMetaData().getRules().forEach(each -> {
+            if (each instanceof StaticDataSourceContainedRule) {
+                disableDataSources((StaticDataSourceContainedRule) each);
+            }
+        }));
+    }
+    
+    private void disableDataSources(final StaticDataSourceContainedRule rule) {
+        Map<String, StorageNodeDataSource> storageNodes = 
registryCenter.getStorageNodeStatusService().loadStorageNodes();
+        Map<String, StorageNodeDataSource> disableDataSources = 
storageNodes.entrySet().stream().filter(entry -> 
StorageNodeStatus.isDisable(entry.getValue().getStatus()))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+        disableDataSources.forEach((key, value) -> rule.updateStatus(new 
StorageNodeDataSourceChangedEvent(new QualifiedDatabase(key), value)));
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
new file mode 100644
index 00000000000..783d4301ca5
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+
+/**
+ * Context manager subscriber facade.
+ */
+public final class ContextManagerSubscriberFacade {
+    
+    public ContextManagerSubscriberFacade(final MetaDataPersistService 
persistService, final RegistryCenter registryCenter, final ContextManager 
contextManager) {
+        new ConfigurationChangedSubscriber(persistService, registryCenter, 
contextManager);
+        new ResourceMetaDataChangedSubscriber(contextManager);
+        new DatabaseChangedSubscriber(contextManager);
+        new StateChangedSubscriber(registryCenter, contextManager);
+        new ProcessListChangedSubscriber(registryCenter, contextManager);
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
new file mode 100644
index 00000000000..8e89618a5f8
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/DatabaseChangedSubscriber.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.SchemaDataDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.TableDataChangedEvent;
+
+/**
+ * Database changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class DatabaseChangedSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    public DatabaseChangedSubscriber(final ContextManager contextManager) {
+        this.contextManager = contextManager;
+        
contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew to persist ShardingSphere database data.
+     *
+     * @param event database data added event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDataAddedEvent event) {
+        contextManager.addShardingSphereDatabaseData(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to delete ShardingSphere data database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDataDeletedEvent event) {
+        contextManager.dropShardingSphereDatabaseData(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to added ShardingSphere data schema.
+     *
+     * @param event schema added event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDataAddedEvent event) {
+        contextManager.addShardingSphereSchemaData(event.getDatabaseName(), 
event.getSchemaName());
+    }
+    
+    /**
+     * Renew to delete ShardingSphere data schema.
+     *
+     * @param event schema delete event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDataDeletedEvent event) {
+        contextManager.dropShardingSphereSchemaData(event.getDatabaseName(), 
event.getSchemaName());
+    }
+    
+    /**
+     * Renew ShardingSphere data of the table.
+     *
+     * @param event table data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final TableDataChangedEvent event) {
+        contextManager.alterSchemaData(event.getDatabaseName(), 
event.getSchemaName(), event.getChangedTableData());
+        contextManager.alterSchemaData(event.getDatabaseName(), 
event.getSchemaName(), event.getDeletedTable());
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
new file mode 100644
index 00000000000..1e1c774b078
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriber.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.yaml.BatchYamlExecuteProcessContext;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillProcessListIdUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
+import org.apache.shardingsphere.mode.process.node.ProcessNode;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+
+/**
+ * Process list changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ProcessListChangedSubscriber {
+    
+    private final RegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public ProcessListChangedSubscriber(final RegistryCenter registryCenter, 
final ContextManager contextManager) {
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        
contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Trigger show process list.
+     *
+     * @param event show process list trigger event
+     */
+    @Subscribe
+    public synchronized void triggerShowProcessList(final 
ShowProcessListTriggerEvent event) {
+        if 
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
 {
+            return;
+        }
+        Collection<ExecuteProcessContext> processContexts = 
ShowProcessListManager.getInstance().getAllProcessContext();
+        if (!processContexts.isEmpty()) {
+            
registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(),
 event.getInstanceId()),
+                    YamlEngine.marshal(new 
BatchYamlExecuteProcessContext(processContexts)));
+        }
+        
registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(),
 event.getProcessListId()));
+    }
+    
+    /**
+     * Trigger show process list.
+     *
+     * @param event show process list trigger event
+     * @throws SQLException SQL exception
+     */
+    @Subscribe
+    public synchronized void killProcessListId(final KillProcessListIdEvent 
event) throws SQLException {
+        if 
(!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId()))
 {
+            return;
+        }
+        Collection<Statement> statements = 
ShowProcessListManager.getInstance().getProcessStatement(event.getProcessListId());
+        for (Statement statement : statements) {
+            statement.cancel();
+        }
+        
registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(),
 event.getProcessListId()));
+    }
+    
+    /**
+     * Complete unit show process list.
+     *
+     * @param event show process list unit complete event
+     */
+    @Subscribe
+    public synchronized void completeUnitShowProcessList(final 
ShowProcessListUnitCompleteEvent event) {
+        ShowProcessListSimpleLock simpleLock = 
ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
+        if (null != simpleLock) {
+            simpleLock.doNotify();
+        }
+    }
+    
+    /**
+     * Complete unit kill process list id.
+     *
+     * @param event kill process list id unit complete event
+     */
+    @Subscribe
+    public synchronized void completeUnitKillProcessListId(final 
KillProcessListIdUnitCompleteEvent event) {
+        ShowProcessListSimpleLock simpleLock = 
ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
+        if (null != simpleLock) {
+            simpleLock.doNotify();
+        }
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
new file mode 100644
index 00000000000..340cb6ba862
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriber.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+
+/**
+ * Resource meta data changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class ResourceMetaDataChangedSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    public ResourceMetaDataChangedSubscriber(final ContextManager 
contextManager) {
+        this.contextManager = contextManager;
+        
contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew to persist meta data.
+     *
+     * @param event database added event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseAddedEvent event) {
+        contextManager.addDatabase(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to delete database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DatabaseDeletedEvent event) {
+        contextManager.dropDatabase(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to added schema.
+     *
+     * @param event schema added event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaAddedEvent event) {
+        contextManager.addSchema(event.getDatabaseName(), 
event.getSchemaName());
+    }
+    
+    /**
+     * Renew to delete schema.
+     *
+     * @param event schema delete event
+     */
+    @Subscribe
+    public synchronized void renew(final SchemaDeletedEvent event) {
+        contextManager.dropSchema(event.getDatabaseName(), 
event.getSchemaName());
+    }
+    
+    /**
+     * Renew meta data of the table.
+     *
+     * @param event table meta data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final TableMetaDataChangedEvent event) {
+        contextManager.alterSchema(event.getDatabaseName(), 
event.getSchemaName(), event.getChangedTableMetaData(), null);
+        contextManager.alterSchema(event.getDatabaseName(), 
event.getSchemaName(), event.getDeletedTable(), null);
+    }
+    
+    /**
+     * Renew meta data of the view.
+     *
+     * @param event view meta data changed event
+     */
+    @Subscribe
+    public synchronized void renew(final ViewMetaDataChangedEvent event) {
+        contextManager.alterSchema(event.getDatabaseName(), 
event.getSchemaName(), null, event.getChangedViewMetaData());
+        contextManager.alterSchema(event.getDatabaseName(), 
event.getSchemaName(), null, event.getDeletedView());
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
new file mode 100644
index 00000000000..794e189ff51
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriber.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.datasource.state.DataSourceStateManager;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.PrimaryDataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+
+import java.util.Optional;
+
+/**
+ * State changed subscriber.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public final class StateChangedSubscriber {
+    
+    private final RegistryCenter registryCenter;
+    
+    private final ContextManager contextManager;
+    
+    public StateChangedSubscriber(final RegistryCenter registryCenter, final 
ContextManager contextManager) {
+        this.registryCenter = registryCenter;
+        this.contextManager = contextManager;
+        
contextManager.getInstanceContext().getEventBusContext().register(this);
+    }
+    
+    /**
+     * Renew disabled data source names.
+     *
+     * @param event Storage node changed event
+     */
+    @Subscribe
+    public synchronized void renew(final StorageNodeChangedEvent event) {
+        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+        if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName()))
 {
+            return;
+        }
+        Optional<ShardingSphereRule> dynamicDataSourceRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+                .getRules().stream().filter(each -> each instanceof 
DynamicDataSourceContainedRule).findFirst();
+        if (dynamicDataSourceRule.isPresent()) {
+            ((DynamicDataSourceContainedRule) 
dynamicDataSourceRule.get()).updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource()));
+            return;
+        }
+        Optional<ShardingSphereRule> staticDataSourceRule = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData()
+                .getRules().stream().filter(each -> each instanceof 
StaticDataSourceContainedRule).findFirst();
+        staticDataSourceRule.ifPresent(optional -> 
((StaticDataSourceContainedRule) optional)
+                .updateStatus(new 
StorageNodeDataSourceChangedEvent(qualifiedDatabase, event.getDataSource())));
+        DataSourceStateManager.getInstance().updateState(
+                qualifiedDatabase.getDatabaseName(), 
qualifiedDatabase.getDataSourceName(), 
DataSourceState.valueOf(event.getDataSource().getStatus().toUpperCase()));
+    }
+    
+    /**
+     * Renew primary data source names.
+     *
+     * @param event primary state changed event
+     */
+    @Subscribe
+    public synchronized void renew(final PrimaryStateChangedEvent event) {
+        if 
(!contextManager.getMetaDataContexts().getMetaData().containsDatabase(event.getQualifiedDatabase().getDatabaseName()))
 {
+            return;
+        }
+        QualifiedDatabase qualifiedDatabase = event.getQualifiedDatabase();
+        
contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDatabase.getDatabaseName()).getRuleMetaData().getRules()
+                .stream()
+                .filter(each -> each instanceof DynamicDataSourceContainedRule)
+                .forEach(each -> ((DynamicDataSourceContainedRule) each)
+                        .restartHeartBeatJob(new 
PrimaryDataSourceChangedEvent(qualifiedDatabase)));
+    }
+    
+    /**
+     * Renew instance status.
+     *
+     * @param event state event
+     */
+    @Subscribe
+    public synchronized void renew(final StateEvent event) {
+        
contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), 
event.getStatus());
+    }
+    
+    /**
+     * Renew instance labels.
+     * 
+     * @param event label event
+     */
+    @Subscribe
+    public synchronized void renew(final LabelsEvent event) {
+        // TODO labels may be empty
+        contextManager.getInstanceContext().updateLabel(event.getInstanceId(), 
event.getLabels());
+    }
+    
+    /**
+     * Renew instance list.
+     *
+     * @param event compute node online event
+     */
+    @Subscribe
+    public synchronized void renew(final InstanceOnlineEvent event) {
+        
contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
+    }
+    
+    /**
+     * Renew instance list.
+     *
+     * @param event compute node offline event
+     */
+    @Subscribe
+    public synchronized void renew(final InstanceOfflineEvent event) {
+        contextManager.getInstanceContext().deleteComputeNodeInstance(new 
ComputeNodeInstance(event.getInstanceMetaData()));
+    }
+}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
similarity index 57%
rename from 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
rename to 
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
index a87baf3db0b..a9df46a4e70 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ContextManagerCoordinatorTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ConfigurationChangedSubscriberTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
 
 import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
@@ -27,48 +27,27 @@ import 
org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
 import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
-import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
-import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.resource.ShardingSphereResourceMetaData;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
 import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
-import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
-import org.apache.shardingsphere.infra.state.StateType;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.datasource.DataSourceChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.props.PropertiesChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.rule.RuleConfigurationsChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.DatabaseVersionChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
-import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
-import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
-import org.apache.shardingsphere.mode.process.ShowProcessListManager;
-import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 import org.apache.shardingsphere.sqltranslator.rule.SQLTranslatorRule;
@@ -78,7 +57,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
-import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
@@ -89,26 +67,20 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class ContextManagerCoordinatorTest {
+public final class ConfigurationChangedSubscriberTest {
     
-    private ContextManagerCoordinator coordinator;
+    private ConfigurationChangedSubscriber subscriber;
     
     private ContextManager contextManager;
     
@@ -126,7 +98,7 @@ public final class ContextManagerCoordinatorTest {
         contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
         contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
                 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new 
ConfigurationProperties(new Properties()))));
-        coordinator = new ContextManagerCoordinator(persistService, new 
RegistryCenter(mock(ClusterPersistRepository.class),
+        subscriber = new ConfigurationChangedSubscriber(persistService, new 
RegistryCenter(mock(ClusterPersistRepository.class),
                 new EventBusContext(), mock(ProxyInstanceMetaData.class), 
null), contextManager);
     }
     
@@ -155,25 +127,14 @@ public final class ContextManagerCoordinatorTest {
     public void assertRenewForRuleConfigurationsChanged() {
         
when(persistService.getMetaDataVersionPersistService().isActiveVersion("db", 
"0")).thenReturn(true);
         
assertThat(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"),
 is(database));
-        coordinator.renew(new RuleConfigurationsChangedEvent("db", "0", 
Collections.emptyList()));
+        subscriber.renew(new RuleConfigurationsChangedEvent("db", "0", 
Collections.emptyList()));
         
assertThat(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"),
 not(database));
     }
     
-    @Test
-    public void assertRenewForDisableStateChanged() {
-        StaticDataSourceContainedRule staticDataSourceRule = 
mock(StaticDataSourceContainedRule.class);
-        
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
-        StorageNodeChangedEvent event = new StorageNodeChangedEvent(new 
QualifiedDatabase("db.readwrite_ds.ds_0"), new 
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
-        coordinator.renew(event);
-        verify(staticDataSourceRule).updateStatus(argThat(
-                (ArgumentMatcher<StorageNodeDataSourceChangedEvent>) 
argumentEvent -> Objects.equals(event.getQualifiedDatabase(), 
argumentEvent.getQualifiedDatabase())
-                        && Objects.equals(event.getDataSource(), 
argumentEvent.getDataSource())));
-    }
-    
     @Test
     public void assertRenewForDataSourceChanged() {
         
when(persistService.getMetaDataVersionPersistService().isActiveVersion("db", 
"0")).thenReturn(true);
-        coordinator.renew(new DataSourceChangedEvent("db", "0", 
createChangedDataSourcePropertiesMap()));
+        subscriber.renew(new DataSourceChangedEvent("db", "0", 
createChangedDataSourcePropertiesMap()));
         
assertTrue(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getResourceMetaData().getDataSources().containsKey("ds_2"));
     }
     
@@ -189,7 +150,7 @@ public final class ContextManagerCoordinatorTest {
     @Test
     public void assertRenewForGlobalRuleConfigurationsChanged() {
         GlobalRuleConfigurationsChangedEvent event = new 
GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
-        coordinator.renew(event);
+        subscriber.renew(event);
         
assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
 not(globalRuleMetaData));
         
assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().size(),
 is(3));
         
assertThat(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules().stream().filter(each
 -> each instanceof AuthorityRule).count(), is(1L));
@@ -209,51 +170,12 @@ public final class ContextManagerCoordinatorTest {
         return result;
     }
     
-    @Test
-    public void assertRenewPrimaryDataSourceName() {
-        Collection<ShardingSphereRule> rules = new LinkedList<>();
-        DynamicDataSourceContainedRule dynamicDataSourceRule = 
mock(DynamicDataSourceContainedRule.class);
-        rules.add(dynamicDataSourceRule);
-        ShardingSphereRuleMetaData ruleMetaData = new 
ShardingSphereRuleMetaData(rules);
-        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
-        when(database.getRuleMetaData()).thenReturn(ruleMetaData);
-        
contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", 
database);
-        PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new 
PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
-        coordinator.renew(mockPrimaryStateChangedEvent);
-        verify(dynamicDataSourceRule).restartHeartBeatJob(any());
-    }
-    
-    @Test
-    public void assertRenewInstanceStatus() {
-        Collection<String> testStates = new LinkedList<>();
-        testStates.add(StateType.OK.name());
-        StateEvent mockStateEvent = new 
StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(),
 testStates);
-        coordinator.renew(mockStateEvent);
-        
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(),
 is(StateType.OK));
-        testStates.add(StateType.CIRCUIT_BREAK.name());
-        coordinator.renew(mockStateEvent);
-        
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(),
 is(StateType.CIRCUIT_BREAK));
-    }
-    
-    @Test
-    public void assertRenewInstanceLabels() {
-        Collection<String> labels = Collections.singleton("test");
-        coordinator.renew(new 
LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(),
 labels));
-        
assertThat(contextManager.getInstanceContext().getInstance().getLabels(), 
is(labels));
-    }
-    
-    @Test
-    public void assertRenewInstanceOfflineEvent() {
-        coordinator.renew(new 
InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
-        assertThat(((ProxyInstanceMetaData) 
contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), 
is(3307));
-    }
-    
     @Test
     public void assertRenewDatabaseVersionChangedEvent() {
         when(persistService.getDataSourceService().load("db", 
"1")).thenReturn(getVersionChangedDataSourcePropertiesMap());
         when(persistService.getDatabaseRulePersistService().load("db", 
"1")).thenReturn(Collections.emptyList());
         Map<String, DataSource> dataSourceMap = initContextManager();
-        coordinator.renew(new DatabaseVersionChangedEvent("db", "1"));
+        subscriber.renew(new DatabaseVersionChangedEvent("db", "1"));
         assertThat(contextManager.getDataSourceMap("db").get("ds_0"), 
is(dataSourceMap.get("ds_0")));
         assertNotNull(contextManager.getDataSourceMap("db").get("ds_1"));
         assertThat(DataSourcePropertiesCreator.create(getChangedDataSource()), 
is(DataSourcePropertiesCreator.create(contextManager.getDataSourceMap("db").get("ds_1"))));
@@ -261,73 +183,14 @@ public final class ContextManagerCoordinatorTest {
         assertThat(DataSourcePropertiesCreator.create(new MockedDataSource()), 
is(DataSourcePropertiesCreator.create(contextManager.getDataSourceMap("db").get("primary_ds"))));
     }
     
-    @Test
-    public void assertRenewInstanceOnlineEvent() {
-        InstanceMetaData instanceMetaData1 = new 
ProxyInstanceMetaData("foo_instance_3307", 3307);
-        InstanceOnlineEvent instanceOnlineEvent1 = new 
InstanceOnlineEvent(instanceMetaData1);
-        coordinator.renew(instanceOnlineEvent1);
-        
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), 
is(1));
-        assertThat(((LinkedList<ComputeNodeInstance>) 
contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(),
 is(instanceMetaData1));
-        InstanceMetaData instanceMetaData2 = new 
ProxyInstanceMetaData("foo_instance_3308", 3308);
-        InstanceOnlineEvent instanceOnlineEvent2 = new 
InstanceOnlineEvent(instanceMetaData2);
-        coordinator.renew(instanceOnlineEvent2);
-        
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), 
is(2));
-        assertThat(((LinkedList<ComputeNodeInstance>) 
contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(),
 is(instanceMetaData2));
-        coordinator.renew(instanceOnlineEvent1);
-        
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), 
is(2));
-        assertThat(((LinkedList<ComputeNodeInstance>) 
contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(),
 is(instanceMetaData1));
-    }
-    
     @Test
     public void assertRenewProperties() {
         Properties props = new Properties();
         props.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(), 
Boolean.TRUE.toString());
-        coordinator.renew(new PropertiesChangedEvent(props));
+        subscriber.renew(new PropertiesChangedEvent(props));
         
assertThat(contextManager.getMetaDataContexts().getMetaData().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()),
 is(Boolean.TRUE.toString()));
     }
     
-    @Test
-    public void assertCompleteUnitShowProcessList() {
-        String processListId = "foo_process_id";
-        ShowProcessListSimpleLock lock = new ShowProcessListSimpleLock();
-        ShowProcessListManager.getInstance().getLocks().put(processListId, 
lock);
-        long startTime = System.currentTimeMillis();
-        ExecutorService executorService = Executors.newFixedThreadPool(1);
-        executorService.submit(() -> {
-            try {
-                Thread.sleep(50L);
-            } catch (final InterruptedException ignored) {
-            }
-            coordinator.completeUnitShowProcessList(new 
ShowProcessListUnitCompleteEvent(processListId));
-        });
-        lockAndAwaitDefaultTime(lock);
-        long currentTime = System.currentTimeMillis();
-        assertTrue(currentTime >= startTime + 50L);
-        assertTrue(currentTime <= startTime + 5000L);
-        ShowProcessListManager.getInstance().getLocks().remove(processListId);
-    }
-    
-    @Test
-    public void assertTriggerShowProcessList() throws NoSuchFieldException, 
IllegalAccessException {
-        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
-        
ShowProcessListManager.getInstance().putProcessContext("foo_execution_id", 
mock(ExecuteProcessContext.class));
-        String processListId = "foo_process_id";
-        coordinator.triggerShowProcessList(new 
ShowProcessListTriggerEvent(instanceId, processListId));
-        ClusterPersistRepository repository = 
ReflectionUtil.getFieldValue(coordinator, "registryCenter", 
RegistryCenter.class).getRepository();
-        verify(repository).persist("/execution_nodes/foo_process_id/" + 
instanceId,
-                "contexts:" + System.lineSeparator() + "- startTimeMillis: 0" 
+ System.lineSeparator());
-        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + 
instanceId + ":foo_process_id");
-    }
-    
-    private void lockAndAwaitDefaultTime(final ShowProcessListSimpleLock lock) 
{
-        lock.lock();
-        try {
-            lock.awaitDefaultTime();
-        } finally {
-            lock.unlock();
-        }
-    }
-    
     private Map<String, DataSource> initContextManager() {
         Map<String, DataSource> result = getDataSourceMap();
         ShardingSphereResourceMetaData resourceMetaData = new 
ShardingSphereResourceMetaData("sharding_db", result);
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
new file mode 100644
index 00000000000..b7cab8eddf6
--- /dev/null
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ProcessListChangedSubscriberTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.executor.sql.process.model.ExecuteProcessContext;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.process.ShowProcessListManager;
+import org.apache.shardingsphere.mode.process.lock.ShowProcessListSimpleLock;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ProcessListChangedSubscriberTest {
+    
+    private ProcessListChangedSubscriber subscriber;
+    
+    private ContextManager contextManager;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() throws SQLException {
+        contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+        contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
+                
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new 
ConfigurationProperties(new Properties()))));
+        subscriber = new ProcessListChangedSubscriber(new 
RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), 
mock(ProxyInstanceMetaData.class), null), contextManager);
+    }
+    
+    private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
+        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new 
ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+        InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
+        return new ContextManagerBuilderParameter(modeConfig, 
Collections.emptyMap(), Collections.emptyList(), new Properties(), 
Collections.emptyList(), instanceMetaData, false);
+    }
+    
+    private Map<String, ShardingSphereDatabase> createDatabases() {
+        when(database.getResourceMetaData().getDataSources()).thenReturn(new 
LinkedHashMap<>());
+        
when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0",
 new MySQLDatabaseType()));
+        
when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", 
new ShardingSphereSchema()));
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        
when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(new 
LinkedList<>());
+        
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+        
when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("db", database);
+        return result;
+    }
+    
+    @Test
+    public void assertCompleteUnitShowProcessList() {
+        String processListId = "foo_process_id";
+        ShowProcessListSimpleLock lock = new ShowProcessListSimpleLock();
+        ShowProcessListManager.getInstance().getLocks().put(processListId, 
lock);
+        long startTime = System.currentTimeMillis();
+        ExecutorService executorService = Executors.newFixedThreadPool(1);
+        executorService.submit(() -> {
+            try {
+                Thread.sleep(50L);
+            } catch (final InterruptedException ignored) {
+            }
+            subscriber.completeUnitShowProcessList(new 
ShowProcessListUnitCompleteEvent(processListId));
+        });
+        lockAndAwaitDefaultTime(lock);
+        long currentTime = System.currentTimeMillis();
+        assertTrue(currentTime >= startTime + 50L);
+        assertTrue(currentTime <= startTime + 5000L);
+        ShowProcessListManager.getInstance().getLocks().remove(processListId);
+    }
+    
+    @Test
+    public void assertTriggerShowProcessList() throws NoSuchFieldException, 
IllegalAccessException {
+        String instanceId = 
contextManager.getInstanceContext().getInstance().getMetaData().getId();
+        
ShowProcessListManager.getInstance().putProcessContext("foo_execution_id", 
mock(ExecuteProcessContext.class));
+        String processListId = "foo_process_id";
+        subscriber.triggerShowProcessList(new 
ShowProcessListTriggerEvent(instanceId, processListId));
+        ClusterPersistRepository repository = 
ReflectionUtil.getFieldValue(subscriber, "registryCenter", 
RegistryCenter.class).getRepository();
+        verify(repository).persist("/execution_nodes/foo_process_id/" + 
instanceId,
+                "contexts:" + System.lineSeparator() + "- startTimeMillis: 0" 
+ System.lineSeparator());
+        verify(repository).delete("/nodes/compute_nodes/process_trigger/" + 
instanceId + ":foo_process_id");
+    }
+    
+    private void lockAndAwaitDefaultTime(final ShowProcessListSimpleLock lock) 
{
+        lock.lock();
+        try {
+            lock.awaitDefaultTime();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
new file mode 100644
index 00000000000..43097de6d25
--- /dev/null
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ResourceMetaDataChangedSubscriberTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
+import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereTable;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereView;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.TableMetaDataChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.schema.ViewMetaDataChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.apache.shardingsphere.test.mock.MockedDataSource;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class ResourceMetaDataChangedSubscriberTest {
+    
+    private ResourceMetaDataChangedSubscriber subscriber;
+    
+    private ContextManager contextManager;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MetaDataPersistService persistService;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() throws SQLException {
+        contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+        contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
+                
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new 
ConfigurationProperties(new Properties()))));
+        subscriber = new ResourceMetaDataChangedSubscriber(contextManager);
+    }
+    
+    private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
+        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new 
ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+        InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
+        return new ContextManagerBuilderParameter(modeConfig, 
Collections.emptyMap(), Collections.emptyList(), new Properties(), 
Collections.emptyList(), instanceMetaData, false);
+    }
+    
+    private Map<String, ShardingSphereDatabase> createDatabases() {
+        when(database.getName()).thenReturn("db");
+        when(database.getResourceMetaData().getDataSources()).thenReturn(new 
LinkedHashMap<>());
+        
when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0",
 new MySQLDatabaseType()));
+        
when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", 
new ShardingSphereSchema()));
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        
when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(new 
LinkedList<>());
+        
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+        
when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("db", database);
+        return result;
+    }
+    
+    @Test
+    public void assertRenewForDatabaseAdded() {
+        
when(persistService.getDataSourceService().load("db_added")).thenReturn(createDataSourcePropertiesMap());
+        
when(persistService.getDatabaseRulePersistService().load("db_added")).thenReturn(Collections.emptyList());
+        subscriber.renew(new DatabaseAddedEvent("db_added"));
+        
assertNotNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db_added").getResourceMetaData().getDataSources());
+    }
+    
+    private Map<String, DataSourceProperties> createDataSourcePropertiesMap() {
+        MockedDataSource dataSource = new MockedDataSource();
+        Map<String, DataSourceProperties> result = new LinkedHashMap<>(3, 1);
+        result.put("primary_ds", 
DataSourcePropertiesCreator.create(dataSource));
+        result.put("replica_ds_0", 
DataSourcePropertiesCreator.create(dataSource));
+        result.put("replica_ds_1", 
DataSourcePropertiesCreator.create(dataSource));
+        return result;
+    }
+    
+    @Test
+    public void assertRenewForDatabaseDeleted() {
+        subscriber.renew(new DatabaseDeletedEvent("db"));
+        
assertNull(contextManager.getMetaDataContexts().getMetaData().getDatabase("db"));
+    }
+    
+    @Test
+    public void assertRenewForSchemaAdded() {
+        subscriber.renew(new SchemaAddedEvent("db", "foo_schema"));
+        
verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).putSchema(argThat(argument
 -> argument.equals("foo_schema")), any(ShardingSphereSchema.class));
+    }
+    
+    @Test
+    public void assertRenewForSchemaDeleted() {
+        
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("foo_schema")).thenReturn(true);
+        subscriber.renew(new SchemaDeletedEvent("db", "foo_schema"));
+        
verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db")).removeSchema("foo_schema");
+    }
+    
+    @Test
+    public void assertRenewForTableMetaDataChangedChanged() {
+        
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("db")).thenReturn(true);
+        ShardingSphereTable changedTableMetaData = new 
ShardingSphereTable("t_order", Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList());
+        TableMetaDataChangedEvent event = new TableMetaDataChangedEvent("db", 
"db", changedTableMetaData, null);
+        subscriber.renew(event);
+        
verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).putTable("t_order",
 event.getChangedTableMetaData());
+    }
+    
+    @Test
+    public void assertRenewForViewMetaDataChanged() {
+        
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").containsSchema("db")).thenReturn(true);
+        ShardingSphereView changedViewMetaData = new 
ShardingSphereView("t_order_view", "");
+        ViewMetaDataChangedEvent event = new ViewMetaDataChangedEvent("db", 
"db", changedViewMetaData, null);
+        subscriber.renew(event);
+        
verify(contextManager.getMetaDataContexts().getMetaData().getDatabase("db").getSchema("db")).putView("t_order_view",
 event.getChangedViewMetaData());
+    }
+}
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
new file mode 100644
index 00000000000..ba5f3eecbb8
--- /dev/null
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
+import 
org.apache.shardingsphere.infra.instance.metadata.proxy.ProxyInstanceMetaData;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.decorator.model.ShardingSphereSchema;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.DynamicDataSourceContainedRule;
+import org.apache.shardingsphere.infra.rule.identifier.type.ResourceHeldRule;
+import 
org.apache.shardingsphere.infra.rule.identifier.type.StaticDataSourceContainedRule;
+import org.apache.shardingsphere.infra.state.StateType;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
+import org.apache.shardingsphere.mode.metadata.storage.StorageNodeStatus;
+import 
org.apache.shardingsphere.mode.metadata.storage.event.StorageNodeDataSourceChangedEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class StateChangedSubscriberTest {
+    
+    private StateChangedSubscriber subscriber;
+    
+    private ContextManager contextManager;
+    
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ShardingSphereDatabase database;
+    
+    @Before
+    public void setUp() throws SQLException {
+        contextManager = new 
ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
+        contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
+                
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new 
ConfigurationProperties(new Properties()))));
+        subscriber = new StateChangedSubscriber(new 
RegistryCenter(mock(ClusterPersistRepository.class),
+                new EventBusContext(), mock(ProxyInstanceMetaData.class), 
null), contextManager);
+    }
+    
+    private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
+        ModeConfiguration modeConfig = new ModeConfiguration("Cluster", new 
ClusterPersistRepositoryConfiguration("FIXTURE", "", "", new Properties()));
+        InstanceMetaData instanceMetaData = new 
ProxyInstanceMetaData("foo_instance_id", 3307);
+        return new ContextManagerBuilderParameter(modeConfig, 
Collections.emptyMap(), Collections.emptyList(), new Properties(), 
Collections.emptyList(), instanceMetaData, false);
+    }
+    
+    private Map<String, ShardingSphereDatabase> createDatabases() {
+        when(database.getResourceMetaData().getDataSources()).thenReturn(new 
LinkedHashMap<>());
+        
when(database.getResourceMetaData().getStorageTypes()).thenReturn(Collections.singletonMap("ds_0",
 new MySQLDatabaseType()));
+        
when(database.getSchemas()).thenReturn(Collections.singletonMap("foo_schema", 
new ShardingSphereSchema()));
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        
when(database.getSchema("foo_schema")).thenReturn(mock(ShardingSphereSchema.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(new 
LinkedList<>());
+        
when(database.getRuleMetaData().getConfigurations()).thenReturn(Collections.emptyList());
+        
when(database.getRuleMetaData().findRules(ResourceHeldRule.class)).thenReturn(Collections.emptyList());
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("db", database);
+        return result;
+    }
+    
+    @Test
+    public void assertRenewForDisableStateChanged() {
+        StaticDataSourceContainedRule staticDataSourceRule = 
mock(StaticDataSourceContainedRule.class);
+        
when(database.getRuleMetaData().getRules()).thenReturn(Collections.singletonList(staticDataSourceRule));
+        StorageNodeChangedEvent event = new StorageNodeChangedEvent(new 
QualifiedDatabase("db.readwrite_ds.ds_0"), new 
StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED));
+        subscriber.renew(event);
+        verify(staticDataSourceRule).updateStatus(argThat(
+                (ArgumentMatcher<StorageNodeDataSourceChangedEvent>) 
argumentEvent -> Objects.equals(event.getQualifiedDatabase(), 
argumentEvent.getQualifiedDatabase())
+                        && Objects.equals(event.getDataSource(), 
argumentEvent.getDataSource())));
+    }
+    
+    @Test
+    public void assertRenewPrimaryDataSourceName() {
+        Collection<ShardingSphereRule> rules = new LinkedList<>();
+        DynamicDataSourceContainedRule dynamicDataSourceRule = 
mock(DynamicDataSourceContainedRule.class);
+        rules.add(dynamicDataSourceRule);
+        ShardingSphereRuleMetaData ruleMetaData = new 
ShardingSphereRuleMetaData(rules);
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class);
+        when(database.getRuleMetaData()).thenReturn(ruleMetaData);
+        
contextManager.getMetaDataContexts().getMetaData().getDatabases().put("db", 
database);
+        PrimaryStateChangedEvent mockPrimaryStateChangedEvent = new 
PrimaryStateChangedEvent(new QualifiedDatabase("db.readwrite_ds.test_ds"));
+        subscriber.renew(mockPrimaryStateChangedEvent);
+        verify(dynamicDataSourceRule).restartHeartBeatJob(any());
+    }
+    
+    @Test
+    public void assertRenewInstanceStatus() {
+        Collection<String> testStates = new LinkedList<>();
+        testStates.add(StateType.OK.name());
+        StateEvent mockStateEvent = new 
StateEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(),
 testStates);
+        subscriber.renew(mockStateEvent);
+        
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(),
 is(StateType.OK));
+        testStates.add(StateType.CIRCUIT_BREAK.name());
+        subscriber.renew(mockStateEvent);
+        
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(),
 is(StateType.CIRCUIT_BREAK));
+    }
+    
+    @Test
+    public void assertRenewInstanceLabels() {
+        Collection<String> labels = Collections.singleton("test");
+        subscriber.renew(new 
LabelsEvent(contextManager.getInstanceContext().getInstance().getMetaData().getId(),
 labels));
+        
assertThat(contextManager.getInstanceContext().getInstance().getLabels(), 
is(labels));
+    }
+    
+    @Test
+    public void assertRenewInstanceOfflineEvent() {
+        subscriber.renew(new 
InstanceOfflineEvent(contextManager.getInstanceContext().getInstance().getMetaData()));
+        assertThat(((ProxyInstanceMetaData) 
contextManager.getInstanceContext().getInstance().getMetaData()).getPort(), 
is(3307));
+    }
+    
+    @Test
+    public void assertRenewInstanceOnlineEvent() {
+        InstanceMetaData instanceMetaData1 = new 
ProxyInstanceMetaData("foo_instance_3307", 3307);
+        InstanceOnlineEvent instanceOnlineEvent1 = new 
InstanceOnlineEvent(instanceMetaData1);
+        subscriber.renew(instanceOnlineEvent1);
+        
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), 
is(1));
+        assertThat(((LinkedList<ComputeNodeInstance>) 
contextManager.getInstanceContext().getAllClusterInstances()).get(0).getMetaData(),
 is(instanceMetaData1));
+        InstanceMetaData instanceMetaData2 = new 
ProxyInstanceMetaData("foo_instance_3308", 3308);
+        InstanceOnlineEvent instanceOnlineEvent2 = new 
InstanceOnlineEvent(instanceMetaData2);
+        subscriber.renew(instanceOnlineEvent2);
+        
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), 
is(2));
+        assertThat(((LinkedList<ComputeNodeInstance>) 
contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(),
 is(instanceMetaData2));
+        subscriber.renew(instanceOnlineEvent1);
+        
assertThat(contextManager.getInstanceContext().getAllClusterInstances().size(), 
is(2));
+        assertThat(((LinkedList<ComputeNodeInstance>) 
contextManager.getInstanceContext().getAllClusterInstances()).get(1).getMetaData(),
 is(instanceMetaData1));
+    }
+}

Reply via email to