This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e73cb20  Split ContextManagerBuilder for build one time only and put 
renew into coordinator (#12276)
e73cb20 is described below

commit e73cb20c8a97bdf8c7bd7b46eb40d30034898a90
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Sep 7 22:18:12 2021 +0800

    Split ContextManagerBuilder for build one time only and put renew into 
coordinator (#12276)
---
 .../cluster/ClusterContextManagerBuilder.java      | 381 +--------------------
 .../ClusterContextManagerCoordinator.java}         | 197 +----------
 .../cluster/{ => coordinator}/RegistryCenter.java  |   2 +-
 .../ClusterContextManagerCoordinatorTest.java}     |  38 +-
 4 files changed, 31 insertions(+), 587 deletions(-)

diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 039ecdc..670b7d8 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -18,42 +18,17 @@
 package org.apache.shardingsphere.mode.manager.cluster;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
 import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.InnerLockReleasedEvent;
-import org.apache.shardingsphere.infra.lock.LockNameUtil;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
 import 
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
-import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadata;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
 import 
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
-import 
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.PrimaryStateChangedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterContextManagerCoordinator;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
@@ -71,10 +46,8 @@ import 
org.apache.shardingsphere.transaction.rule.TransactionRule;
 import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
@@ -114,7 +87,6 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     private void beforeBuildContextManager(final ModeConfiguration modeConfig, 
final Map<String, Map<String, DataSource>> dataSourcesMap,
                                            final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs, final 
Collection<RuleConfiguration> globalRuleConfigs,
                                            final Properties props, final 
boolean isOverwrite) throws SQLException {
-        ShardingSphereEventBus.getInstance().register(this);
         ClusterPersistRepository repository = 
createClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
modeConfig.getRepository());
         registryCenter = new RegistryCenter(repository);
         persistService = new PersistService(repository);
@@ -126,6 +98,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     }
     
     private void afterBuildContextManager() {
+        new ClusterContextManagerCoordinator(persistService, contextManager);
         disableDataSources();
         persistMetaData();
         registryCenter.onlineInstance();
@@ -233,247 +206,6 @@ public final class ClusterContextManagerBuilder 
implements ContextManagerBuilder
         return new TransactionContexts(engines);
     }
     
-    /**
-     * Renew to persist meta data.
-     *
-     * @param event schema added event
-     * @throws SQLException SQL exception
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaAddedEvent event) throws 
SQLException {
-        persistSchema(event.getSchemaName());
-        ShardingSphereMetaData metaData = buildMetaData(event.getSchemaName());
-        
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
 new FederateSchemaMetadata(event.getSchemaName(), 
-                metaData.getSchema().getTables()));
-        
contextManager.getMetaDataContexts().getMetaDataMap().put(event.getSchemaName(),
 metaData);
-        
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(contextManager.getMetaDataContexts().getMetaDataMap()));
-        ShardingSphereEventBus.getInstance().post(new 
DataSourceChangeCompletedEvent(event.getSchemaName(),
-                
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
 metaData.getResource().getDataSources()));
-    }
-    
-    /**
-     * Renew to delete schema.
-     *
-     * @param event schema delete event
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaDeletedEvent event) {
-        String schemaName = event.getSchemaName();
-        closeDataSources(schemaName);
-        Map<String, ShardingSphereMetaData> schemaMetaData = new 
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
-        schemaMetaData.remove(schemaName);
-        
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().remove(schemaName);
-        
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
-        ShardingSphereEventBus.getInstance().post(new 
DataSourceDeletedEvent(schemaName));
-    }
-    
-    /**
-     * Renew properties.
-     *
-     * @param event properties changed event
-     */
-    @Subscribe
-    public synchronized void renew(final PropertiesChangedEvent event) {
-        ConfigurationProperties props = new 
ConfigurationProperties(event.getProps());
-        contextManager.renewMetaDataContexts(rebuildMetaDataContexts(props));
-    }
-    
-    /**
-     * Renew authority.
-     *
-     * @param event authority changed event
-     */
-    @Subscribe
-    public synchronized void renew(final AuthorityChangedEvent event) {
-        Optional<AuthorityRule> rule = 
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules().stream()
-                .filter(each -> each instanceof 
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
-        rule.ifPresent(optional -> 
optional.refresh(contextManager.getMetaDataContexts().getMetaDataMap(), 
event.getUsers()));
-    }
-    
-    /**
-     * Renew meta data of the schema.
-     *
-     * @param event meta data changed event
-     */
-    @Subscribe
-    public synchronized void renew(final SchemaChangedEvent event) {
-        try {
-            Map<String, ShardingSphereMetaData> schemaMetaData = new 
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap().size(), 1);
-            for (Entry<String, ShardingSphereMetaData> entry : 
contextManager.getMetaDataContexts().getMetaDataMap().entrySet()) {
-                String schemaName = entry.getKey();
-                ShardingSphereMetaData originalMetaData = entry.getValue();
-                ShardingSphereMetaData metaData = 
event.getSchemaName().equals(schemaName) ? getChangedMetaData(originalMetaData, 
event.getSchema(), schemaName) : originalMetaData;
-                schemaMetaData.put(schemaName, metaData);
-                
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
-                        new FederateSchemaMetadata(event.getSchemaName(), 
metaData.getSchema().getTables()));
-            }
-            
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
-        } finally {
-            ShardingSphereEventBus.getInstance().post(new 
InnerLockReleasedEvent(LockNameUtil.getMetadataRefreshLockName()));
-        }
-    }
-    
-    /**
-     * Renew rule configurations.
-     *
-     * @param event rule configurations changed event
-     * @throws SQLException SQL exception
-     */
-    @Subscribe
-    public synchronized void renew(final RuleConfigurationsChangedEvent event) 
throws SQLException {
-        String schemaName = event.getSchemaName();
-        ShardingSphereMetaData metaData = 
getChangedMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName),
 event.getRuleConfigurations());
-        Map<String, ShardingSphereMetaData> schemaMetaData = 
rebuildSchemaMetaData(schemaName, metaData);
-        
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
-        persistService.getSchemaMetaDataService().persist(schemaName, 
schemaMetaData.get(schemaName).getSchema());
-    }
-    
-    /**
-     * Renew data source configuration.
-     *
-     * @param event data source changed event.
-     * @throws SQLException SQL exception
-     */
-    @Subscribe
-    public synchronized void renew(final DataSourceChangedEvent event) throws 
SQLException {
-        String schemaName = event.getSchemaName();
-        Collection<DataSource> pendingClosedDataSources = 
getPendingClosedDataSources(schemaName, event.getDataSourceConfigurations());
-        ShardingSphereMetaData metaData = 
rebuildMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName),
 event.getDataSourceConfigurations());
-        Map<String, ShardingSphereMetaData> schemaMetaData = 
rebuildSchemaMetaData(schemaName, metaData);
-        
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
-        ShardingSphereEventBus.getInstance().post(new 
DataSourceChangeCompletedEvent(event.getSchemaName(),
-                
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
 
-                
schemaMetaData.get(event.getSchemaName()).getResource().getDataSources()));
-        closeDataSources(schemaName, pendingClosedDataSources);
-    }
-    
-    /**
-     * Renew disabled data source names.
-     *
-     * @param event disabled state changed event
-     */
-    @Subscribe
-    public synchronized void renew(final DisabledStateChangedEvent event) {
-        GovernanceSchema governanceSchema = event.getGovernanceSchema();
-        Collection<ShardingSphereRule> rules = 
contextManager.getMetaDataContexts().getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
-        for (ShardingSphereRule each : rules) {
-            if (each instanceof StatusContainedRule) {
-                ((StatusContainedRule) each).updateStatus(new 
DataSourceNameDisabledEvent(governanceSchema.getDataSourceName(), 
event.isDisabled()));
-            }
-        }
-    }
-    
-    /**
-     * Renew primary data source names.
-     *
-     * @param event primary state changed event
-     */
-    @Subscribe
-    public synchronized void renew(final PrimaryStateChangedEvent event) {
-        GovernanceSchema governanceSchema = event.getGovernanceSchema();
-        Collection<ShardingSphereRule> rules = 
contextManager.getMetaDataContexts().getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
-        for (ShardingSphereRule each : rules) {
-            if (each instanceof StatusContainedRule) {
-                ((StatusContainedRule) each).updateStatus(new 
PrimaryDataSourceChangedEvent(governanceSchema.getSchemaName(), 
governanceSchema.getDataSourceName(), event.getPrimaryDataSourceName()));
-            }
-        }
-    }
-    
-    /**
-     * Renew global rule configurations.
-     *
-     * @param event global rule configurations changed event
-     */
-    @Subscribe
-    public synchronized void renew(final GlobalRuleConfigurationsChangedEvent 
event) {
-        Collection<RuleConfiguration> newGlobalConfigs = 
event.getRuleConfigurations();
-        if (!newGlobalConfigs.isEmpty()) {
-            ShardingSphereRuleMetaData newGlobalRuleMetaData = new 
ShardingSphereRuleMetaData(newGlobalConfigs,
-                    GlobalRulesBuilder.buildRules(newGlobalConfigs, 
contextManager.getMetaDataContexts().getMetaDataMap()));
-            
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(newGlobalRuleMetaData));
-        }
-    }
-    
-    private MetaDataContexts rebuildMetaDataContexts(final Map<String, 
ShardingSphereMetaData> schemaMetaData) {
-        
Preconditions.checkState(contextManager.getMetaDataContexts().getPersistService().isPresent());
-        return new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
-                schemaMetaData, 
contextManager.getMetaDataContexts().getGlobalRuleMetaData(), 
contextManager.getMetaDataContexts().getExecutorEngine(),
-                contextManager.getMetaDataContexts().getProps(), 
contextManager.getMetaDataContexts().getOptimizeContextFactory());
-    }
-    
-    private MetaDataContexts rebuildMetaDataContexts(final 
ConfigurationProperties props) {
-        
Preconditions.checkState(contextManager.getMetaDataContexts().getPersistService().isPresent());
-        return new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
-                contextManager.getMetaDataContexts().getMetaDataMap(), 
contextManager.getMetaDataContexts().getGlobalRuleMetaData(), 
contextManager.getMetaDataContexts().getExecutorEngine(),
-                props, 
contextManager.getMetaDataContexts().getOptimizeContextFactory());
-    }
-    
-    private MetaDataContexts rebuildMetaDataContexts(final 
ShardingSphereRuleMetaData globalRuleMetaData) {
-        
Preconditions.checkState(contextManager.getMetaDataContexts().getPersistService().isPresent());
-        return new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
-                contextManager.getMetaDataContexts().getMetaDataMap(), 
globalRuleMetaData, contextManager.getMetaDataContexts().getExecutorEngine(),
-                contextManager.getMetaDataContexts().getProps(), 
contextManager.getMetaDataContexts().getOptimizeContextFactory());
-    }
-    
-    private Map<String, ShardingSphereMetaData> rebuildSchemaMetaData(final 
String schemaName, final ShardingSphereMetaData metaData) {
-        Map<String, ShardingSphereMetaData> result = new 
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
-        result.put(schemaName, metaData);
-        
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(schemaName,
 new FederateSchemaMetadata(schemaName, metaData.getSchema().getTables()));
-        return result;
-    }
-    
-    private void persistSchema(final String schemaName) {
-        if (!persistService.getDataSourceService().isExisted(schemaName)) {
-            persistService.getDataSourceService().persist(schemaName, new 
LinkedHashMap<>());
-        }
-        if (!persistService.getSchemaRuleService().isExisted(schemaName)) {
-            persistService.getSchemaRuleService().persist(schemaName, new 
LinkedList<>());
-        }
-    }
-    
-    private ShardingSphereMetaData buildMetaData(final String schemaName) 
throws SQLException {
-        Map<String, Map<String, DataSource>> dataSourcesMap = 
createDataSourcesMap(Collections.singletonMap(schemaName, 
persistService.getDataSourceService().load(schemaName)));
-        return new MetaDataContextsBuilder(dataSourcesMap,
-                Collections.singletonMap(schemaName, 
persistService.getSchemaRuleService().load(schemaName)),
-                persistService.getGlobalRuleService().load(),
-                
contextManager.getMetaDataContexts().getProps().getProps()).build(persistService).getMetaData(schemaName);
-    }
-    
-    private ShardingSphereMetaData getChangedMetaData(final 
ShardingSphereMetaData originalMetaData, final ShardingSphereSchema schema, 
final String schemaName) {
-        // TODO refresh table addressing mapper
-        return new ShardingSphereMetaData(schemaName, 
originalMetaData.getResource(), originalMetaData.getRuleMetaData(), schema);
-    }
-    
-    private ShardingSphereMetaData getChangedMetaData(final 
ShardingSphereMetaData originalMetaData, final Collection<RuleConfiguration> 
ruleConfigs) throws SQLException {
-        MetaDataContextsBuilder builder = new 
MetaDataContextsBuilder(Collections.singletonMap(originalMetaData.getName(), 
originalMetaData.getResource().getDataSources()),
-                Collections.singletonMap(originalMetaData.getName(), 
ruleConfigs), persistService.getGlobalRuleService().load(), 
contextManager.getMetaDataContexts().getProps().getProps());
-        return 
builder.build(persistService).getMetaDataMap().values().iterator().next();
-    }
-    
-    private ShardingSphereMetaData rebuildMetaData(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceConfiguration> newDataSourceConfigs) throws SQLException {
-        Collection<String> deletedDataSources = 
getDeletedDataSources(originalMetaData, newDataSourceConfigs).keySet();
-        Map<String, DataSource> changedDataSources = 
buildChangedDataSources(originalMetaData, newDataSourceConfigs);
-        Map<String, Map<String, DataSource>> dataSourcesMap = 
Collections.singletonMap(originalMetaData.getName(),
-                
getNewDataSources(originalMetaData.getResource().getDataSources(), 
getAddedDataSources(originalMetaData, newDataSourceConfigs), 
changedDataSources, deletedDataSources));
-        return new MetaDataContextsBuilder(dataSourcesMap, 
Collections.singletonMap(originalMetaData.getName(),
-                originalMetaData.getRuleMetaData().getConfigurations()), 
persistService.getGlobalRuleService().load(),
-                
contextManager.getMetaDataContexts().getProps().getProps()).build(persistService).getMetaData(originalMetaData.getName());
-    }
-    
-    private Map<String, DataSource> getNewDataSources(final Map<String, 
DataSource> originalDataSources,
-                                                      final Map<String, 
DataSource> addedDataSources, final Map<String, DataSource> changedDataSources, 
final Collection<String> deletedDataSources) {
-        Map<String, DataSource> result = new 
LinkedHashMap<>(originalDataSources);
-        result.keySet().removeAll(deletedDataSources);
-        result.putAll(changedDataSources);
-        result.putAll(addedDataSources);
-        return result;
-    }
-    
-    private Map<String, DataSource> getDeletedDataSources(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceConfiguration> newDataSourceConfigs) {
-        return 
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
 -> !newDataSourceConfigs.containsKey(entry.getKey()))
-                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-    }
-    
     private Map<String, Map<String, DataSource>> getChangedDataSources(final 
Map<String, Map<String, DataSourceConfiguration>> 
changedDataSourceConfigurations) {
         Map<String, Map<String, DataSource>> result = new 
LinkedHashMap<>(changedDataSourceConfigurations.size(), 1);
         for (Entry<String, Map<String, DataSourceConfiguration>> entry : 
changedDataSourceConfigurations.entrySet()) {
@@ -482,119 +214,12 @@ public final class ClusterContextManagerBuilder 
implements ContextManagerBuilder
         return result;
     }
     
-    private Map<String, DataSource> getChangedDataSources(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceConfiguration> newDataSourceConfigs) {
-        Collection<String> changedDataSourceNames = 
getChangedDataSourceConfiguration(originalMetaData, 
newDataSourceConfigs).keySet();
-        return 
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
 -> changedDataSourceNames.contains(entry.getKey()))
-                .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-    }
-    
-    private Map<String, DataSource> getAddedDataSources(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceConfiguration> newDataSourceConfigs) {
-        return 
DataSourceConverter.getDataSourceMap(Maps.filterKeys(newDataSourceConfigs, each 
-> !originalMetaData.getResource().getDataSources().containsKey(each)));
-    }
-    
-    private Map<String, DataSourceConfiguration> 
getChangedDataSourceConfiguration(final ShardingSphereMetaData originalMetaData,
-                                                                               
    final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
-        return dataSourceConfigurations.entrySet().stream()
-                .filter(entry -> 
isModifiedDataSource(originalMetaData.getResource().getDataSources(), 
entry.getKey(), entry.getValue()))
-                .collect(Collectors.toMap(Entry::getKey, Entry::getValue, 
(oldValue, currentValue) -> oldValue, LinkedHashMap::new));
-    }
-    
-    private Map<String, DataSource> buildChangedDataSources(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceConfiguration> newDataSourceConfigs) {
-        return 
DataSourceConverter.getDataSourceMap(getChangedDataSourceConfiguration(originalMetaData,
 newDataSourceConfigs));
-    }
-    
-    private boolean isModifiedDataSource(final Map<String, DataSource> 
originalDataSources, final String dataSourceName, final DataSourceConfiguration 
dataSourceConfiguration) {
-        DataSourceConfiguration dataSourceConfig = 
DataSourceConverter.getDataSourceConfigurationMap(originalDataSources).get(dataSourceName);
-        return null != dataSourceConfig && 
!dataSourceConfiguration.equals(dataSourceConfig);
-    }
-    
-    private Map<String, Map<String, DataSource>> createDataSourcesMap(final 
Map<String, Map<String, DataSourceConfiguration>> dataSourcesConfigs) {
-        return 
dataSourcesConfigs.entrySet().stream().collect(Collectors.toMap(Entry::getKey, 
entry -> DataSourceConverter.getDataSourceMap(entry.getValue())));
-    }
-    
-    private Collection<DataSource> getPendingClosedDataSources(final String 
schemaName, final Map<String, DataSourceConfiguration> 
dataSourceConfigurations) {
-        Collection<DataSource> result = new LinkedList<>();
-        
result.addAll(getDeletedDataSources(contextManager.getMetaDataContexts().getMetaData(schemaName),
 dataSourceConfigurations).values());
-        
result.addAll(getChangedDataSources(contextManager.getMetaDataContexts().getMetaData(schemaName),
 dataSourceConfigurations).values());
-        return result;
-    }
-    
-    private void closeDataSources(final String schemaName) {
-        if (null != 
contextManager.getMetaDataContexts().getMetaData(schemaName) 
-                && null != 
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource()) {
-            closeDataSources(schemaName, 
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource().getDataSources().values());
-        }
-    }
-    
-    private void closeDataSources(final String schemaName, final 
Collection<DataSource> dataSources) {
-        ShardingSphereResource resource = 
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource();
-        dataSources.forEach(each -> closeDataSource(resource, each));
-    }
-    
-    private void closeDataSource(final ShardingSphereResource resource, final 
DataSource dataSource) {
-        try {
-            resource.close(dataSource);
-            // CHECKSTYLE:OFF
-        } catch (final Exception ignore) {
-            // CHECKSTYLE:ON
-        }
-    }
-    
-    /**
-     * Renew transaction manager engine contexts.
-     *
-     * @param event data source change completed event
-     * @throws Exception exception
-     */
-    @Subscribe
-    public synchronized void renewTransactionContext(final 
DataSourceChangeCompletedEvent event) throws Exception {
-        closeStaleEngine(event.getSchemaName());
-        Map<String, ShardingSphereTransactionManagerEngine> existedEngines = 
contextManager.getTransactionContexts().getEngines();
-        existedEngines.put(event.getSchemaName(), 
createNewEngine(event.getDatabaseType(), event.getDataSources()));
-        renewContexts(existedEngines);
-    }
-    
-    /**
-     * Renew transaction manager engine context.
-     *
-     * @param event data source deleted event.
-     * @throws Exception exception
-     */
-    @Subscribe
-    public synchronized void renewTransactionContext(final 
DataSourceDeletedEvent event) throws Exception {
-        closeStaleEngine(event.getSchemaName());
-        renewContexts(contextManager.getTransactionContexts().getEngines());
-    }
-    
-    private void closeStaleEngine(final String schemaName) throws Exception {
-        ShardingSphereTransactionManagerEngine staleEngine = 
contextManager.getTransactionContexts().getEngines().remove(schemaName);
-        if (null != staleEngine) {
-            staleEngine.close();
-        }
-    }
-    
-    private ShardingSphereTransactionManagerEngine createNewEngine(final 
DatabaseType databaseType, final Map<String, DataSource> dataSources) {
-        ShardingSphereTransactionManagerEngine result = new 
ShardingSphereTransactionManagerEngine();
-        result.init(databaseType, dataSources, getTransactionRule());
-        return result;
-    }
-    
-    private TransactionRule getTransactionRule() {
-        Optional<TransactionRule> transactionRule = 
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules().stream()
-            .filter(each -> each instanceof TransactionRule).map(each -> 
(TransactionRule) each).findFirst();
-        return transactionRule.orElseGet(() -> new TransactionRule(new 
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
-    }
-    
     private TransactionRule getTransactionRule(final MetaDataContexts 
metaDataContexts) {
         Optional<TransactionRule> transactionRule = 
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(
             each -> each instanceof TransactionRule).map(each -> 
(TransactionRule) each).findFirst();
         return transactionRule.orElseGet(() -> new TransactionRule(new 
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
     }
     
-    private void renewContexts(final Map<String, 
ShardingSphereTransactionManagerEngine> engines) {
-        contextManager.renewTransactionContexts(new 
TransactionContexts(engines));
-    }
-    
     private void disableDataSources() {
         metaDataContexts.getMetaDataMap().forEach((key, value)
             -> value.getRuleMetaData().getRules().stream().filter(each -> each 
instanceof StatusContainedRule).forEach(each -> disableDataSources(key, 
(StatusContainedRule) each)));
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
similarity index 68%
copy from 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
copy to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 039ecdc..49cea81 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import 
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
 import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
@@ -41,7 +40,6 @@ import 
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEve
 import 
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
@@ -58,10 +56,6 @@ import 
org.apache.shardingsphere.mode.manager.cluster.governance.schema.Governan
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
 import org.apache.shardingsphere.mode.persist.PersistService;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
 import 
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
 import 
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
 import org.apache.shardingsphere.transaction.context.TransactionContexts;
@@ -78,159 +72,21 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.Properties;
 import java.util.stream.Collectors;
 
 /**
- * Cluster context manager builder.
+ * Cluster context manager coordinator.
  */
-public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder {
+public final class ClusterContextManagerCoordinator {
     
-    static {
-        ShardingSphereServiceLoader.register(ClusterPersistRepository.class);
-    }
-    
-    private RegistryCenter registryCenter;
-    
-    private PersistService persistService;
-    
-    private MetaDataContexts metaDataContexts;
+    private final PersistService persistService;
     
-    private TransactionContexts transactionContexts;
+    private final ContextManager contextManager;
     
-    private ContextManager contextManager;
-    
-    @Override
-    public ContextManager build(final ModeConfiguration modeConfig, final 
Map<String, Map<String, DataSource>> dataSourcesMap,
-                                final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs, final 
Collection<RuleConfiguration> globalRuleConfigs,
-                                final Properties props, final boolean 
isOverwrite) throws SQLException {
-        beforeBuildContextManager(modeConfig, dataSourcesMap, 
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
-        contextManager = new ContextManager();
-        contextManager.init(metaDataContexts, transactionContexts);
-        afterBuildContextManager();
-        return contextManager;
-    }
-    
-    private void beforeBuildContextManager(final ModeConfiguration modeConfig, 
final Map<String, Map<String, DataSource>> dataSourcesMap,
-                                           final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs, final 
Collection<RuleConfiguration> globalRuleConfigs,
-                                           final Properties props, final 
boolean isOverwrite) throws SQLException {
+    public ClusterContextManagerCoordinator(final PersistService 
persistService, final ContextManager contextManager) {
+        this.persistService = persistService;
+        this.contextManager = contextManager;
         ShardingSphereEventBus.getInstance().register(this);
-        ClusterPersistRepository repository = 
createClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
modeConfig.getRepository());
-        registryCenter = new RegistryCenter(repository);
-        persistService = new PersistService(repository);
-        persistConfigurations(persistService, dataSourcesMap, 
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
-        Collection<String> schemaNames = 
persistService.getSchemaMetaDataService().loadAllNames();
-        metaDataContexts = new 
MetaDataContextsBuilder(loadDataSourcesMap(persistService, dataSourcesMap, 
schemaNames),
-                loadSchemaRules(persistService, schemaNames), 
persistService.getGlobalRuleService().load(), 
persistService.getPropsService().load()).build(persistService);
-        transactionContexts = createTransactionContexts(metaDataContexts);
-    }
-    
-    private void afterBuildContextManager() {
-        disableDataSources();
-        persistMetaData();
-        registryCenter.onlineInstance();
-    }
-    
-    private ClusterPersistRepository createClusterPersistRepository(final 
ClusterPersistRepositoryConfiguration config) {
-        Preconditions.checkNotNull(config, "Cluster persist repository 
configuration cannot be null.");
-        ClusterPersistRepository result = 
TypedSPIRegistry.getRegisteredService(ClusterPersistRepository.class, 
config.getType(), config.getProps());
-        result.init(config);
-        return result;
-    }
-    
-    private void persistConfigurations(final PersistService persistService, 
final Map<String, Map<String, DataSource>> dataSourcesMap,
-                                       final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs, final 
Collection<RuleConfiguration> globalRuleConfigs,
-                                       final Properties props, final boolean 
overwrite) {
-        if (!isEmptyLocalConfiguration(dataSourcesMap, schemaRuleConfigs, 
globalRuleConfigs, props)) {
-            
persistService.persistConfigurations(getDataSourceConfigurations(dataSourcesMap),
 schemaRuleConfigs, globalRuleConfigs, props, overwrite);
-        }
-    }
-    
-    private boolean isEmptyLocalConfiguration(final Map<String, Map<String, 
DataSource>> dataSourcesMap,
-                                              final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs, final 
Collection<RuleConfiguration> globalRuleConfigs, final Properties props) {
-        return isEmptyLocalDataSourcesMap(dataSourcesMap) && 
isEmptyLocalSchemaRuleConfigurations(schemaRuleConfigs) && 
globalRuleConfigs.isEmpty() && props.isEmpty();
-    }
-    
-    private boolean isEmptyLocalDataSourcesMap(final Map<String, Map<String, 
DataSource>> dataSourcesMap) {
-        return dataSourcesMap.entrySet().stream().allMatch(entry -> 
entry.getValue().isEmpty());
-    }
-    
-    private boolean isEmptyLocalSchemaRuleConfigurations(final Map<String, 
Collection<RuleConfiguration>> schemaRuleConfigs) {
-        return schemaRuleConfigs.entrySet().stream().allMatch(entry -> 
entry.getValue().isEmpty());
-    }
-    
-    private Map<String, Map<String, DataSourceConfiguration>> 
getDataSourceConfigurations(final Map<String, Map<String, DataSource>> 
dataSourcesMap) {
-        Map<String, Map<String, DataSourceConfiguration>> result = new 
LinkedHashMap<>(dataSourcesMap.size(), 1);
-        for (Entry<String, Map<String, DataSource>> entry : 
dataSourcesMap.entrySet()) {
-            result.put(entry.getKey(), 
DataSourceConverter.getDataSourceConfigurationMap(entry.getValue()));
-        }
-        return result;
-    }
-    
-    private Map<String, Map<String, DataSource>> loadDataSourcesMap(final 
PersistService persistService, final Map<String, Map<String, DataSource>> 
dataSourcesMap,
-                                                                    final 
Collection<String> schemaNames) {
-        Map<String, Map<String, DataSourceConfiguration>> 
loadedDataSourceConfigs = loadDataSourceConfigurations(persistService, 
schemaNames);
-        Map<String, Map<String, DataSourceConfiguration>> 
changedDataSourceConfigs = getChangedDataSourceConfigurations(dataSourcesMap, 
loadedDataSourceConfigs);
-        Map<String, Map<String, DataSource>> result = new 
LinkedHashMap<>(dataSourcesMap);
-        getChangedDataSources(changedDataSourceConfigs).forEach((key, value) 
-> {
-            if (result.containsKey(key)) {
-                result.get(key).putAll(value);
-            } else {
-                result.put(key, value);
-            }
-        });
-        return result;
-    }
-    
-    private Map<String, Map<String, DataSourceConfiguration>> 
loadDataSourceConfigurations(final PersistService persistService, final 
Collection<String> schemaNames) {
-        Map<String, Map<String, DataSourceConfiguration>> result = new 
LinkedHashMap<>();
-        for (String each : schemaNames) {
-            result.put(each, persistService.getDataSourceService().load(each));
-        }
-        return result;
-    }
-    
-    private Map<String, Map<String, DataSourceConfiguration>> 
getChangedDataSourceConfigurations(final Map<String, Map<String, DataSource>> 
configuredDataSourcesMap, 
-                                                                               
                  final Map<String, Map<String, DataSourceConfiguration>> 
loadedDataSourceConfigs) {
-        if (isEmptyLocalDataSourcesMap(configuredDataSourcesMap)) {
-            return loadedDataSourceConfigs;
-        }
-        Map<String, Map<String, DataSourceConfiguration>> result = new 
HashMap<>(loadedDataSourceConfigs.size(), 1);
-        for (Entry<String, Map<String, DataSourceConfiguration>> entry : 
loadedDataSourceConfigs.entrySet()) {
-            if (configuredDataSourcesMap.containsKey(entry.getKey())) {
-                Map<String, DataSourceConfiguration> changedDataSources = 
getChangedDataSourcesConfigurations(configuredDataSourcesMap.get(entry.getKey()),
 entry.getValue());
-                if (!changedDataSources.isEmpty()) {
-                    result.put(entry.getKey(), changedDataSources);
-                }
-            } else {
-                result.put(entry.getKey(), entry.getValue());
-            }
-        }
-        return result;
-    }
-    
-    private Map<String, DataSourceConfiguration> 
getChangedDataSourcesConfigurations(final Map<String, DataSource> 
dataSourceMap, 
-                                                                               
      final Map<String, DataSourceConfiguration> 
loadedDataSourceConfigurationMap) {
-        Map<String, DataSourceConfiguration> dataSourceConfigurationMap = 
DataSourceConverter.getDataSourceConfigurationMap(dataSourceMap);
-        return 
loadedDataSourceConfigurationMap.entrySet().stream().filter(entry -> 
!dataSourceConfigurationMap.containsKey(entry.getKey()) 
-                || 
!dataSourceConfigurationMap.get(entry.getKey()).equals(entry.getValue())).collect(Collectors.toMap(Entry::getKey,
 Entry::getValue));
-    }
-    
-    private Map<String, Collection<RuleConfiguration>> loadSchemaRules(final 
PersistService persistService, final Collection<String> schemaNames) {
-        return schemaNames.stream().collect(Collectors.toMap(
-            each -> each, each -> 
persistService.getSchemaRuleService().load(each), (oldValue, currentValue) -> 
oldValue, LinkedHashMap::new));
-    }
-    
-    private TransactionContexts createTransactionContexts(final 
MetaDataContexts metaDataContexts) {
-        Map<String, ShardingSphereTransactionManagerEngine> engines = new 
HashMap<>(metaDataContexts.getAllSchemaNames().size(), 1);
-        TransactionRule transactionRule = getTransactionRule(metaDataContexts);
-        for (String each : metaDataContexts.getAllSchemaNames()) {
-            ShardingSphereTransactionManagerEngine engine = new 
ShardingSphereTransactionManagerEngine();
-            ShardingSphereResource resource = 
metaDataContexts.getMetaData(each).getResource();
-            engine.init(resource.getDatabaseType(), resource.getDataSources(), 
transactionRule);
-            engines.put(each, engine);
-        }
-        return new TransactionContexts(engines);
     }
     
     /**
@@ -474,14 +330,6 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
                 .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
     }
     
-    private Map<String, Map<String, DataSource>> getChangedDataSources(final 
Map<String, Map<String, DataSourceConfiguration>> 
changedDataSourceConfigurations) {
-        Map<String, Map<String, DataSource>> result = new 
LinkedHashMap<>(changedDataSourceConfigurations.size(), 1);
-        for (Entry<String, Map<String, DataSourceConfiguration>> entry : 
changedDataSourceConfigurations.entrySet()) {
-            result.put(entry.getKey(), 
DataSourceConverter.getDataSourceMap(entry.getValue()));
-        }
-        return result;
-    }
-    
     private Map<String, DataSource> getChangedDataSources(final 
ShardingSphereMetaData originalMetaData, final Map<String, 
DataSourceConfiguration> newDataSourceConfigs) {
         Collection<String> changedDataSourceNames = 
getChangedDataSourceConfiguration(originalMetaData, 
newDataSourceConfigs).keySet();
         return 
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
 -> changedDataSourceNames.contains(entry.getKey()))
@@ -585,36 +433,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         return transactionRule.orElseGet(() -> new TransactionRule(new 
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
     }
     
-    private TransactionRule getTransactionRule(final MetaDataContexts 
metaDataContexts) {
-        Optional<TransactionRule> transactionRule = 
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(
-            each -> each instanceof TransactionRule).map(each -> 
(TransactionRule) each).findFirst();
-        return transactionRule.orElseGet(() -> new TransactionRule(new 
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
-    }
-    
     private void renewContexts(final Map<String, 
ShardingSphereTransactionManagerEngine> engines) {
         contextManager.renewTransactionContexts(new 
TransactionContexts(engines));
     }
-    
-    private void disableDataSources() {
-        metaDataContexts.getMetaDataMap().forEach((key, value)
-            -> value.getRuleMetaData().getRules().stream().filter(each -> each 
instanceof StatusContainedRule).forEach(each -> disableDataSources(key, 
(StatusContainedRule) each)));
-    }
-    
-    private void disableDataSources(final String schemaName, final 
StatusContainedRule rule) {
-        Collection<String> disabledDataSources = 
registryCenter.getDataSourceStatusService().loadDisabledDataSources(schemaName);
-        disabledDataSources.stream().map(this::getDataSourceName).forEach(each 
-> rule.updateStatus(new DataSourceNameDisabledEvent(each, true)));
-    }
-    
-    private String getDataSourceName(final String disabledDataSource) {
-        return new GovernanceSchema(disabledDataSource).getDataSourceName();
-    }
-    
-    private void persistMetaData() {
-        metaDataContexts.getMetaDataMap().forEach((key, value) -> 
persistService.getSchemaMetaDataService().persist(key, value.getSchema()));
-    }
-    
-    @Override
-    public String getType() {
-        return "Cluster";
-    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/RegistryCenter.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
similarity index 98%
rename from 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/RegistryCenter.java
rename to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index ce8488f..cea725c 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/RegistryCenter.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
 import lombok.Getter;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.GovernanceInstance;
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
similarity index 93%
rename from 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
rename to 
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 31e1429..f313652 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
 import lombok.SneakyThrows;
 import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
@@ -39,6 +39,7 @@ import 
org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
 import 
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadatas;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
@@ -62,7 +63,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
 import org.mockito.Mock;
-import org.mockito.internal.util.reflection.FieldSetter;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.sql.SQLException;
@@ -90,9 +90,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public final class ClusterContextManagerBuilderTest {
+public final class ClusterContextManagerCoordinatorTest {
     
-    private ClusterContextManagerBuilder builder;
+    private ClusterContextManagerCoordinator coordinator;
     
     private ContextManager contextManager;
     
@@ -116,12 +116,12 @@ public final class ClusterContextManagerBuilderTest {
     public void setUp() {
         PersistRepositoryConfiguration persistRepositoryConfiguration = new 
ClusterPersistRepositoryConfiguration("TEST", "", "", new Properties());
         ModeConfiguration configuration = new ModeConfiguration("Cluster", 
persistRepositoryConfiguration, false);
-        builder = new ClusterContextManagerBuilder();
+        ClusterContextManagerBuilder builder = new 
ClusterContextManagerBuilder();
         contextManager = builder.build(configuration, new HashMap<>(), new 
HashMap<>(), new LinkedList<>(), new Properties(), false);
-        FieldSetter.setField(builder, 
builder.getClass().getDeclaredField("persistService"), persistService);
-        contextManager.renewMetaDataContexts(new 
MetaDataContexts(mock(PersistService.class), createMetaDataMap(), 
globalRuleMetaData, mock(ExecutorEngine.class), 
+        contextManager.renewMetaDataContexts(new 
MetaDataContexts(mock(PersistService.class), createMetaDataMap(), 
globalRuleMetaData, mock(ExecutorEngine.class),
                 new ConfigurationProperties(new Properties()), 
mockOptimizeContextFactory()));
         
contextManager.renewTransactionContexts(mock(TransactionContexts.class, 
RETURNS_DEEP_STUBS));
+        coordinator = new ClusterContextManagerCoordinator(persistService, 
contextManager);
     }
     
     @Test
@@ -129,7 +129,7 @@ public final class ClusterContextManagerBuilderTest {
         SchemaAddedEvent event = new SchemaAddedEvent("schema_add");
         
when(persistService.getDataSourceService().load("schema_add")).thenReturn(getDataSourceConfigurations());
         
when(persistService.getSchemaRuleService().load("schema_add")).thenReturn(Collections.emptyList());
-        builder.renew(event);
+        coordinator.renew(event);
         
assertNotNull(contextManager.getMetaDataContexts().getMetaData("schema_add"));
         
assertNotNull(contextManager.getMetaDataContexts().getMetaData("schema_add").getResource().getDataSources());
     }
@@ -146,7 +146,7 @@ public final class ClusterContextManagerBuilderTest {
     @Test
     public void assertSchemaDelete() {
         SchemaDeletedEvent event = new SchemaDeletedEvent("schema");
-        builder.renew(event);
+        coordinator.renew(event);
         assertNull(contextManager.getMetaDataContexts().getMetaData("schema"));
     }
     
@@ -155,14 +155,14 @@ public final class ClusterContextManagerBuilderTest {
         Properties properties = new Properties();
         properties.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(), 
"true");
         PropertiesChangedEvent event = new PropertiesChangedEvent(properties);
-        builder.renew(event);
+        coordinator.renew(event);
         
assertThat(contextManager.getMetaDataContexts().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()),
 is("true"));
     }
     
     @Test
     public void assertSchemaChanged() {
         SchemaChangedEvent event = new SchemaChangedEvent("schema_changed", 
mock(ShardingSphereSchema.class));
-        builder.renew(event);
+        coordinator.renew(event);
         
assertTrue(contextManager.getMetaDataContexts().getAllSchemaNames().contains("schema"));
         
assertFalse(contextManager.getMetaDataContexts().getAllSchemaNames().contains("schema_changed"));
     }
@@ -170,7 +170,7 @@ public final class ClusterContextManagerBuilderTest {
     @Test
     public void assertSchemaChangedWithExistSchema() {
         SchemaChangedEvent event = new SchemaChangedEvent("schema", 
mock(ShardingSphereSchema.class));
-        builder.renew(event);
+        coordinator.renew(event);
         assertThat(contextManager.getMetaDataContexts().getMetaData("schema"), 
not(metaData));
     }
     
@@ -178,20 +178,20 @@ public final class ClusterContextManagerBuilderTest {
     public void assertRuleConfigurationsChanged() throws SQLException {
         assertThat(contextManager.getMetaDataContexts().getMetaData("schema"), 
is(metaData));
         RuleConfigurationsChangedEvent event = new 
RuleConfigurationsChangedEvent("schema", new LinkedList<>());
-        builder.renew(event);
+        coordinator.renew(event);
         assertThat(contextManager.getMetaDataContexts().getMetaData("schema"), 
not(metaData));
     }
     
     @Test
     public void assertDisableStateChanged() {
         DisabledStateChangedEvent event = new DisabledStateChangedEvent(new 
GovernanceSchema("schema.ds_0"), true);
-        builder.renew(event);
+        coordinator.renew(event);
     }
     
     @Test
     public void assertDataSourceChanged() throws SQLException {
         DataSourceChangedEvent event = new DataSourceChangedEvent("schema", 
getChangedDataSourceConfigurations());
-        builder.renew(event);
+        coordinator.renew(event);
         
assertTrue(contextManager.getMetaDataContexts().getMetaData("schema").getResource().getDataSources().containsKey("ds_2"));
     }
     
@@ -207,7 +207,7 @@ public final class ClusterContextManagerBuilderTest {
     @Test
     public void assertGlobalRuleConfigurationsChanged() {
         GlobalRuleConfigurationsChangedEvent event = new 
GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
-        builder.renew(event);
+        coordinator.renew(event);
         
assertThat(contextManager.getMetaDataContexts().getGlobalRuleMetaData(), 
not(globalRuleMetaData));
     }
     
@@ -227,7 +227,7 @@ public final class ClusterContextManagerBuilderTest {
     public void assertAuthorityChanged() {
         
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()).thenReturn(createAuthorityRule());
         AuthorityChangedEvent event = new 
AuthorityChangedEvent(getShardingSphereUsers());
-        builder.renew(event);
+        coordinator.renew(event);
         Optional<AuthorityRule> authorityRule = 
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()
                 .stream().filter(each -> each instanceof 
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
         assertTrue(authorityRule.isPresent());
@@ -245,7 +245,7 @@ public final class ClusterContextManagerBuilderTest {
         DataSourceChangeCompletedEvent event = new 
DataSourceChangeCompletedEvent("name", mock(DatabaseType.class), 
Collections.emptyMap());
         
when(contextManager.getTransactionContexts().getEngines()).thenReturn(engines);
         when(engines.remove("name")).thenReturn(engine);
-        builder.renewTransactionContext(event);
+        coordinator.renewTransactionContext(event);
         verify(engine).close();
         verify(engines).put(eq("name"), 
any(ShardingSphereTransactionManagerEngine.class));
     }
@@ -255,7 +255,7 @@ public final class ClusterContextManagerBuilderTest {
         DataSourceDeletedEvent event = new DataSourceDeletedEvent("name");
         
when(contextManager.getTransactionContexts().getEngines()).thenReturn(engines);
         when(engines.remove("name")).thenReturn(engine);
-        builder.renewTransactionContext(event);
+        coordinator.renewTransactionContext(event);
         verify(engine).close();
     }
     

Reply via email to