This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e73cb20 Split ContextManagerBuilder for build one time only and put
renew into coordinator (#12276)
e73cb20 is described below
commit e73cb20c8a97bdf8c7bd7b46eb40d30034898a90
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Sep 7 22:18:12 2021 +0800
Split ContextManagerBuilder for build one time only and put renew into
coordinator (#12276)
---
.../cluster/ClusterContextManagerBuilder.java | 381 +--------------------
.../ClusterContextManagerCoordinator.java} | 197 +----------
.../cluster/{ => coordinator}/RegistryCenter.java | 2 +-
.../ClusterContextManagerCoordinatorTest.java} | 38 +-
4 files changed, 31 insertions(+), 587 deletions(-)
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 039ecdc..670b7d8 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -18,42 +18,17 @@
package org.apache.shardingsphere.mode.manager.cluster;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
-import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
-import org.apache.shardingsphere.infra.lock.InnerLockReleasedEvent;
-import org.apache.shardingsphere.infra.lock.LockNameUtil;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
-import
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadata;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
-import
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.props.PropertiesChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.GlobalRuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.rule.RuleConfigurationsChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.schema.SchemaChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaAddedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.metadata.event.SchemaDeletedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.DisabledStateChangedEvent;
-import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.state.event.PrimaryStateChangedEvent;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.ClusterContextManagerCoordinator;
+import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
@@ -71,10 +46,8 @@ import
org.apache.shardingsphere.transaction.rule.TransactionRule;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
@@ -114,7 +87,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private void beforeBuildContextManager(final ModeConfiguration modeConfig,
final Map<String, Map<String, DataSource>> dataSourcesMap,
final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs,
final Properties props, final
boolean isOverwrite) throws SQLException {
- ShardingSphereEventBus.getInstance().register(this);
ClusterPersistRepository repository =
createClusterPersistRepository((ClusterPersistRepositoryConfiguration)
modeConfig.getRepository());
registryCenter = new RegistryCenter(repository);
persistService = new PersistService(repository);
@@ -126,6 +98,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
}
private void afterBuildContextManager() {
+ new ClusterContextManagerCoordinator(persistService, contextManager);
disableDataSources();
persistMetaData();
registryCenter.onlineInstance();
@@ -233,247 +206,6 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
return new TransactionContexts(engines);
}
- /**
- * Renew to persist meta data.
- *
- * @param event schema added event
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void renew(final SchemaAddedEvent event) throws
SQLException {
- persistSchema(event.getSchemaName());
- ShardingSphereMetaData metaData = buildMetaData(event.getSchemaName());
-
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
new FederateSchemaMetadata(event.getSchemaName(),
- metaData.getSchema().getTables()));
-
contextManager.getMetaDataContexts().getMetaDataMap().put(event.getSchemaName(),
metaData);
-
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(contextManager.getMetaDataContexts().getMetaDataMap()));
- ShardingSphereEventBus.getInstance().post(new
DataSourceChangeCompletedEvent(event.getSchemaName(),
-
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
metaData.getResource().getDataSources()));
- }
-
- /**
- * Renew to delete schema.
- *
- * @param event schema delete event
- */
- @Subscribe
- public synchronized void renew(final SchemaDeletedEvent event) {
- String schemaName = event.getSchemaName();
- closeDataSources(schemaName);
- Map<String, ShardingSphereMetaData> schemaMetaData = new
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
- schemaMetaData.remove(schemaName);
-
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().remove(schemaName);
-
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
- ShardingSphereEventBus.getInstance().post(new
DataSourceDeletedEvent(schemaName));
- }
-
- /**
- * Renew properties.
- *
- * @param event properties changed event
- */
- @Subscribe
- public synchronized void renew(final PropertiesChangedEvent event) {
- ConfigurationProperties props = new
ConfigurationProperties(event.getProps());
- contextManager.renewMetaDataContexts(rebuildMetaDataContexts(props));
- }
-
- /**
- * Renew authority.
- *
- * @param event authority changed event
- */
- @Subscribe
- public synchronized void renew(final AuthorityChangedEvent event) {
- Optional<AuthorityRule> rule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules().stream()
- .filter(each -> each instanceof
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
- rule.ifPresent(optional ->
optional.refresh(contextManager.getMetaDataContexts().getMetaDataMap(),
event.getUsers()));
- }
-
- /**
- * Renew meta data of the schema.
- *
- * @param event meta data changed event
- */
- @Subscribe
- public synchronized void renew(final SchemaChangedEvent event) {
- try {
- Map<String, ShardingSphereMetaData> schemaMetaData = new
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap().size(), 1);
- for (Entry<String, ShardingSphereMetaData> entry :
contextManager.getMetaDataContexts().getMetaDataMap().entrySet()) {
- String schemaName = entry.getKey();
- ShardingSphereMetaData originalMetaData = entry.getValue();
- ShardingSphereMetaData metaData =
event.getSchemaName().equals(schemaName) ? getChangedMetaData(originalMetaData,
event.getSchema(), schemaName) : originalMetaData;
- schemaMetaData.put(schemaName, metaData);
-
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
- new FederateSchemaMetadata(event.getSchemaName(),
metaData.getSchema().getTables()));
- }
-
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
- } finally {
- ShardingSphereEventBus.getInstance().post(new
InnerLockReleasedEvent(LockNameUtil.getMetadataRefreshLockName()));
- }
- }
-
- /**
- * Renew rule configurations.
- *
- * @param event rule configurations changed event
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void renew(final RuleConfigurationsChangedEvent event)
throws SQLException {
- String schemaName = event.getSchemaName();
- ShardingSphereMetaData metaData =
getChangedMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName),
event.getRuleConfigurations());
- Map<String, ShardingSphereMetaData> schemaMetaData =
rebuildSchemaMetaData(schemaName, metaData);
-
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
- persistService.getSchemaMetaDataService().persist(schemaName,
schemaMetaData.get(schemaName).getSchema());
- }
-
- /**
- * Renew data source configuration.
- *
- * @param event data source changed event.
- * @throws SQLException SQL exception
- */
- @Subscribe
- public synchronized void renew(final DataSourceChangedEvent event) throws
SQLException {
- String schemaName = event.getSchemaName();
- Collection<DataSource> pendingClosedDataSources =
getPendingClosedDataSources(schemaName, event.getDataSourceConfigurations());
- ShardingSphereMetaData metaData =
rebuildMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName),
event.getDataSourceConfigurations());
- Map<String, ShardingSphereMetaData> schemaMetaData =
rebuildSchemaMetaData(schemaName, metaData);
-
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
- ShardingSphereEventBus.getInstance().post(new
DataSourceChangeCompletedEvent(event.getSchemaName(),
-
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
-
schemaMetaData.get(event.getSchemaName()).getResource().getDataSources()));
- closeDataSources(schemaName, pendingClosedDataSources);
- }
-
- /**
- * Renew disabled data source names.
- *
- * @param event disabled state changed event
- */
- @Subscribe
- public synchronized void renew(final DisabledStateChangedEvent event) {
- GovernanceSchema governanceSchema = event.getGovernanceSchema();
- Collection<ShardingSphereRule> rules =
contextManager.getMetaDataContexts().getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
- for (ShardingSphereRule each : rules) {
- if (each instanceof StatusContainedRule) {
- ((StatusContainedRule) each).updateStatus(new
DataSourceNameDisabledEvent(governanceSchema.getDataSourceName(),
event.isDisabled()));
- }
- }
- }
-
- /**
- * Renew primary data source names.
- *
- * @param event primary state changed event
- */
- @Subscribe
- public synchronized void renew(final PrimaryStateChangedEvent event) {
- GovernanceSchema governanceSchema = event.getGovernanceSchema();
- Collection<ShardingSphereRule> rules =
contextManager.getMetaDataContexts().getMetaDataMap().get(governanceSchema.getSchemaName()).getRuleMetaData().getRules();
- for (ShardingSphereRule each : rules) {
- if (each instanceof StatusContainedRule) {
- ((StatusContainedRule) each).updateStatus(new
PrimaryDataSourceChangedEvent(governanceSchema.getSchemaName(),
governanceSchema.getDataSourceName(), event.getPrimaryDataSourceName()));
- }
- }
- }
-
- /**
- * Renew global rule configurations.
- *
- * @param event global rule configurations changed event
- */
- @Subscribe
- public synchronized void renew(final GlobalRuleConfigurationsChangedEvent
event) {
- Collection<RuleConfiguration> newGlobalConfigs =
event.getRuleConfigurations();
- if (!newGlobalConfigs.isEmpty()) {
- ShardingSphereRuleMetaData newGlobalRuleMetaData = new
ShardingSphereRuleMetaData(newGlobalConfigs,
- GlobalRulesBuilder.buildRules(newGlobalConfigs,
contextManager.getMetaDataContexts().getMetaDataMap()));
-
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(newGlobalRuleMetaData));
- }
- }
-
- private MetaDataContexts rebuildMetaDataContexts(final Map<String,
ShardingSphereMetaData> schemaMetaData) {
-
Preconditions.checkState(contextManager.getMetaDataContexts().getPersistService().isPresent());
- return new
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
- schemaMetaData,
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
- contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory());
- }
-
- private MetaDataContexts rebuildMetaDataContexts(final
ConfigurationProperties props) {
-
Preconditions.checkState(contextManager.getMetaDataContexts().getPersistService().isPresent());
- return new
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
- contextManager.getMetaDataContexts().getMetaDataMap(),
contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
- props,
contextManager.getMetaDataContexts().getOptimizeContextFactory());
- }
-
- private MetaDataContexts rebuildMetaDataContexts(final
ShardingSphereRuleMetaData globalRuleMetaData) {
-
Preconditions.checkState(contextManager.getMetaDataContexts().getPersistService().isPresent());
- return new
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
- contextManager.getMetaDataContexts().getMetaDataMap(),
globalRuleMetaData, contextManager.getMetaDataContexts().getExecutorEngine(),
- contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory());
- }
-
- private Map<String, ShardingSphereMetaData> rebuildSchemaMetaData(final
String schemaName, final ShardingSphereMetaData metaData) {
- Map<String, ShardingSphereMetaData> result = new
HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
- result.put(schemaName, metaData);
-
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(schemaName,
new FederateSchemaMetadata(schemaName, metaData.getSchema().getTables()));
- return result;
- }
-
- private void persistSchema(final String schemaName) {
- if (!persistService.getDataSourceService().isExisted(schemaName)) {
- persistService.getDataSourceService().persist(schemaName, new
LinkedHashMap<>());
- }
- if (!persistService.getSchemaRuleService().isExisted(schemaName)) {
- persistService.getSchemaRuleService().persist(schemaName, new
LinkedList<>());
- }
- }
-
- private ShardingSphereMetaData buildMetaData(final String schemaName)
throws SQLException {
- Map<String, Map<String, DataSource>> dataSourcesMap =
createDataSourcesMap(Collections.singletonMap(schemaName,
persistService.getDataSourceService().load(schemaName)));
- return new MetaDataContextsBuilder(dataSourcesMap,
- Collections.singletonMap(schemaName,
persistService.getSchemaRuleService().load(schemaName)),
- persistService.getGlobalRuleService().load(),
-
contextManager.getMetaDataContexts().getProps().getProps()).build(persistService).getMetaData(schemaName);
- }
-
- private ShardingSphereMetaData getChangedMetaData(final
ShardingSphereMetaData originalMetaData, final ShardingSphereSchema schema,
final String schemaName) {
- // TODO refresh table addressing mapper
- return new ShardingSphereMetaData(schemaName,
originalMetaData.getResource(), originalMetaData.getRuleMetaData(), schema);
- }
-
- private ShardingSphereMetaData getChangedMetaData(final
ShardingSphereMetaData originalMetaData, final Collection<RuleConfiguration>
ruleConfigs) throws SQLException {
- MetaDataContextsBuilder builder = new
MetaDataContextsBuilder(Collections.singletonMap(originalMetaData.getName(),
originalMetaData.getResource().getDataSources()),
- Collections.singletonMap(originalMetaData.getName(),
ruleConfigs), persistService.getGlobalRuleService().load(),
contextManager.getMetaDataContexts().getProps().getProps());
- return
builder.build(persistService).getMetaDataMap().values().iterator().next();
- }
-
- private ShardingSphereMetaData rebuildMetaData(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) throws SQLException {
- Collection<String> deletedDataSources =
getDeletedDataSources(originalMetaData, newDataSourceConfigs).keySet();
- Map<String, DataSource> changedDataSources =
buildChangedDataSources(originalMetaData, newDataSourceConfigs);
- Map<String, Map<String, DataSource>> dataSourcesMap =
Collections.singletonMap(originalMetaData.getName(),
-
getNewDataSources(originalMetaData.getResource().getDataSources(),
getAddedDataSources(originalMetaData, newDataSourceConfigs),
changedDataSources, deletedDataSources));
- return new MetaDataContextsBuilder(dataSourcesMap,
Collections.singletonMap(originalMetaData.getName(),
- originalMetaData.getRuleMetaData().getConfigurations()),
persistService.getGlobalRuleService().load(),
-
contextManager.getMetaDataContexts().getProps().getProps()).build(persistService).getMetaData(originalMetaData.getName());
- }
-
- private Map<String, DataSource> getNewDataSources(final Map<String,
DataSource> originalDataSources,
- final Map<String,
DataSource> addedDataSources, final Map<String, DataSource> changedDataSources,
final Collection<String> deletedDataSources) {
- Map<String, DataSource> result = new
LinkedHashMap<>(originalDataSources);
- result.keySet().removeAll(deletedDataSources);
- result.putAll(changedDataSources);
- result.putAll(addedDataSources);
- return result;
- }
-
- private Map<String, DataSource> getDeletedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> !newDataSourceConfigs.containsKey(entry.getKey()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- }
-
private Map<String, Map<String, DataSource>> getChangedDataSources(final
Map<String, Map<String, DataSourceConfiguration>>
changedDataSourceConfigurations) {
Map<String, Map<String, DataSource>> result = new
LinkedHashMap<>(changedDataSourceConfigurations.size(), 1);
for (Entry<String, Map<String, DataSourceConfiguration>> entry :
changedDataSourceConfigurations.entrySet()) {
@@ -482,119 +214,12 @@ public final class ClusterContextManagerBuilder
implements ContextManagerBuilder
return result;
}
- private Map<String, DataSource> getChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- Collection<String> changedDataSourceNames =
getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs).keySet();
- return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> changedDataSourceNames.contains(entry.getKey()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- }
-
- private Map<String, DataSource> getAddedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- return
DataSourceConverter.getDataSourceMap(Maps.filterKeys(newDataSourceConfigs, each
-> !originalMetaData.getResource().getDataSources().containsKey(each)));
- }
-
- private Map<String, DataSourceConfiguration>
getChangedDataSourceConfiguration(final ShardingSphereMetaData originalMetaData,
-
final Map<String, DataSourceConfiguration> dataSourceConfigurations) {
- return dataSourceConfigurations.entrySet().stream()
- .filter(entry ->
isModifiedDataSource(originalMetaData.getResource().getDataSources(),
entry.getKey(), entry.getValue()))
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue,
(oldValue, currentValue) -> oldValue, LinkedHashMap::new));
- }
-
- private Map<String, DataSource> buildChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
- return
DataSourceConverter.getDataSourceMap(getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs));
- }
-
- private boolean isModifiedDataSource(final Map<String, DataSource>
originalDataSources, final String dataSourceName, final DataSourceConfiguration
dataSourceConfiguration) {
- DataSourceConfiguration dataSourceConfig =
DataSourceConverter.getDataSourceConfigurationMap(originalDataSources).get(dataSourceName);
- return null != dataSourceConfig &&
!dataSourceConfiguration.equals(dataSourceConfig);
- }
-
- private Map<String, Map<String, DataSource>> createDataSourcesMap(final
Map<String, Map<String, DataSourceConfiguration>> dataSourcesConfigs) {
- return
dataSourcesConfigs.entrySet().stream().collect(Collectors.toMap(Entry::getKey,
entry -> DataSourceConverter.getDataSourceMap(entry.getValue())));
- }
-
- private Collection<DataSource> getPendingClosedDataSources(final String
schemaName, final Map<String, DataSourceConfiguration>
dataSourceConfigurations) {
- Collection<DataSource> result = new LinkedList<>();
-
result.addAll(getDeletedDataSources(contextManager.getMetaDataContexts().getMetaData(schemaName),
dataSourceConfigurations).values());
-
result.addAll(getChangedDataSources(contextManager.getMetaDataContexts().getMetaData(schemaName),
dataSourceConfigurations).values());
- return result;
- }
-
- private void closeDataSources(final String schemaName) {
- if (null !=
contextManager.getMetaDataContexts().getMetaData(schemaName)
- && null !=
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource()) {
- closeDataSources(schemaName,
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource().getDataSources().values());
- }
- }
-
- private void closeDataSources(final String schemaName, final
Collection<DataSource> dataSources) {
- ShardingSphereResource resource =
contextManager.getMetaDataContexts().getMetaData(schemaName).getResource();
- dataSources.forEach(each -> closeDataSource(resource, each));
- }
-
- private void closeDataSource(final ShardingSphereResource resource, final
DataSource dataSource) {
- try {
- resource.close(dataSource);
- // CHECKSTYLE:OFF
- } catch (final Exception ignore) {
- // CHECKSTYLE:ON
- }
- }
-
- /**
- * Renew transaction manager engine contexts.
- *
- * @param event data source change completed event
- * @throws Exception exception
- */
- @Subscribe
- public synchronized void renewTransactionContext(final
DataSourceChangeCompletedEvent event) throws Exception {
- closeStaleEngine(event.getSchemaName());
- Map<String, ShardingSphereTransactionManagerEngine> existedEngines =
contextManager.getTransactionContexts().getEngines();
- existedEngines.put(event.getSchemaName(),
createNewEngine(event.getDatabaseType(), event.getDataSources()));
- renewContexts(existedEngines);
- }
-
- /**
- * Renew transaction manager engine context.
- *
- * @param event data source deleted event.
- * @throws Exception exception
- */
- @Subscribe
- public synchronized void renewTransactionContext(final
DataSourceDeletedEvent event) throws Exception {
- closeStaleEngine(event.getSchemaName());
- renewContexts(contextManager.getTransactionContexts().getEngines());
- }
-
- private void closeStaleEngine(final String schemaName) throws Exception {
- ShardingSphereTransactionManagerEngine staleEngine =
contextManager.getTransactionContexts().getEngines().remove(schemaName);
- if (null != staleEngine) {
- staleEngine.close();
- }
- }
-
- private ShardingSphereTransactionManagerEngine createNewEngine(final
DatabaseType databaseType, final Map<String, DataSource> dataSources) {
- ShardingSphereTransactionManagerEngine result = new
ShardingSphereTransactionManagerEngine();
- result.init(databaseType, dataSources, getTransactionRule());
- return result;
- }
-
- private TransactionRule getTransactionRule() {
- Optional<TransactionRule> transactionRule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules().stream()
- .filter(each -> each instanceof TransactionRule).map(each ->
(TransactionRule) each).findFirst();
- return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
- }
-
private TransactionRule getTransactionRule(final MetaDataContexts
metaDataContexts) {
Optional<TransactionRule> transactionRule =
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(
each -> each instanceof TransactionRule).map(each ->
(TransactionRule) each).findFirst();
return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
}
- private void renewContexts(final Map<String,
ShardingSphereTransactionManagerEngine> engines) {
- contextManager.renewTransactionContexts(new
TransactionContexts(engines));
- }
-
private void disableDataSources() {
metaDataContexts.getMetaDataMap().forEach((key, value)
-> value.getRuleMetaData().getRules().stream().filter(each -> each
instanceof StatusContainedRule).forEach(each -> disableDataSources(key,
(StatusContainedRule) each)));
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
similarity index 68%
copy from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
copy to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index 039ecdc..49cea81 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -24,7 +24,6 @@ import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.infra.config.RuleConfiguration;
import
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration;
import org.apache.shardingsphere.infra.config.datasource.DataSourceConverter;
-import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationProperties;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
@@ -41,7 +40,6 @@ import
org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEve
import
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
import
org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
@@ -58,10 +56,6 @@ import
org.apache.shardingsphere.mode.manager.cluster.governance.schema.Governan
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsBuilder;
import org.apache.shardingsphere.mode.persist.PersistService;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
import
org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
import
org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
import org.apache.shardingsphere.transaction.context.TransactionContexts;
@@ -78,159 +72,21 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Properties;
import java.util.stream.Collectors;
/**
- * Cluster context manager builder.
+ * Cluster context manager coordinator.
*/
-public final class ClusterContextManagerBuilder implements
ContextManagerBuilder {
+public final class ClusterContextManagerCoordinator {
- static {
- ShardingSphereServiceLoader.register(ClusterPersistRepository.class);
- }
-
- private RegistryCenter registryCenter;
-
- private PersistService persistService;
-
- private MetaDataContexts metaDataContexts;
+ private final PersistService persistService;
- private TransactionContexts transactionContexts;
+ private final ContextManager contextManager;
- private ContextManager contextManager;
-
- @Override
- public ContextManager build(final ModeConfiguration modeConfig, final
Map<String, Map<String, DataSource>> dataSourcesMap,
- final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs,
- final Properties props, final boolean
isOverwrite) throws SQLException {
- beforeBuildContextManager(modeConfig, dataSourcesMap,
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
- contextManager = new ContextManager();
- contextManager.init(metaDataContexts, transactionContexts);
- afterBuildContextManager();
- return contextManager;
- }
-
- private void beforeBuildContextManager(final ModeConfiguration modeConfig,
final Map<String, Map<String, DataSource>> dataSourcesMap,
- final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs,
- final Properties props, final
boolean isOverwrite) throws SQLException {
+ public ClusterContextManagerCoordinator(final PersistService
persistService, final ContextManager contextManager) {
+ this.persistService = persistService;
+ this.contextManager = contextManager;
ShardingSphereEventBus.getInstance().register(this);
- ClusterPersistRepository repository =
createClusterPersistRepository((ClusterPersistRepositoryConfiguration)
modeConfig.getRepository());
- registryCenter = new RegistryCenter(repository);
- persistService = new PersistService(repository);
- persistConfigurations(persistService, dataSourcesMap,
schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
- Collection<String> schemaNames =
persistService.getSchemaMetaDataService().loadAllNames();
- metaDataContexts = new
MetaDataContextsBuilder(loadDataSourcesMap(persistService, dataSourcesMap,
schemaNames),
- loadSchemaRules(persistService, schemaNames),
persistService.getGlobalRuleService().load(),
persistService.getPropsService().load()).build(persistService);
- transactionContexts = createTransactionContexts(metaDataContexts);
- }
-
- private void afterBuildContextManager() {
- disableDataSources();
- persistMetaData();
- registryCenter.onlineInstance();
- }
-
- private ClusterPersistRepository createClusterPersistRepository(final
ClusterPersistRepositoryConfiguration config) {
- Preconditions.checkNotNull(config, "Cluster persist repository
configuration cannot be null.");
- ClusterPersistRepository result =
TypedSPIRegistry.getRegisteredService(ClusterPersistRepository.class,
config.getType(), config.getProps());
- result.init(config);
- return result;
- }
-
- private void persistConfigurations(final PersistService persistService,
final Map<String, Map<String, DataSource>> dataSourcesMap,
- final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs,
- final Properties props, final boolean
overwrite) {
- if (!isEmptyLocalConfiguration(dataSourcesMap, schemaRuleConfigs,
globalRuleConfigs, props)) {
-
persistService.persistConfigurations(getDataSourceConfigurations(dataSourcesMap),
schemaRuleConfigs, globalRuleConfigs, props, overwrite);
- }
- }
-
- private boolean isEmptyLocalConfiguration(final Map<String, Map<String,
DataSource>> dataSourcesMap,
- final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs, final
Collection<RuleConfiguration> globalRuleConfigs, final Properties props) {
- return isEmptyLocalDataSourcesMap(dataSourcesMap) &&
isEmptyLocalSchemaRuleConfigurations(schemaRuleConfigs) &&
globalRuleConfigs.isEmpty() && props.isEmpty();
- }
-
- private boolean isEmptyLocalDataSourcesMap(final Map<String, Map<String,
DataSource>> dataSourcesMap) {
- return dataSourcesMap.entrySet().stream().allMatch(entry ->
entry.getValue().isEmpty());
- }
-
- private boolean isEmptyLocalSchemaRuleConfigurations(final Map<String,
Collection<RuleConfiguration>> schemaRuleConfigs) {
- return schemaRuleConfigs.entrySet().stream().allMatch(entry ->
entry.getValue().isEmpty());
- }
-
- private Map<String, Map<String, DataSourceConfiguration>>
getDataSourceConfigurations(final Map<String, Map<String, DataSource>>
dataSourcesMap) {
- Map<String, Map<String, DataSourceConfiguration>> result = new
LinkedHashMap<>(dataSourcesMap.size(), 1);
- for (Entry<String, Map<String, DataSource>> entry :
dataSourcesMap.entrySet()) {
- result.put(entry.getKey(),
DataSourceConverter.getDataSourceConfigurationMap(entry.getValue()));
- }
- return result;
- }
-
- private Map<String, Map<String, DataSource>> loadDataSourcesMap(final
PersistService persistService, final Map<String, Map<String, DataSource>>
dataSourcesMap,
- final
Collection<String> schemaNames) {
- Map<String, Map<String, DataSourceConfiguration>>
loadedDataSourceConfigs = loadDataSourceConfigurations(persistService,
schemaNames);
- Map<String, Map<String, DataSourceConfiguration>>
changedDataSourceConfigs = getChangedDataSourceConfigurations(dataSourcesMap,
loadedDataSourceConfigs);
- Map<String, Map<String, DataSource>> result = new
LinkedHashMap<>(dataSourcesMap);
- getChangedDataSources(changedDataSourceConfigs).forEach((key, value)
-> {
- if (result.containsKey(key)) {
- result.get(key).putAll(value);
- } else {
- result.put(key, value);
- }
- });
- return result;
- }
-
- private Map<String, Map<String, DataSourceConfiguration>>
loadDataSourceConfigurations(final PersistService persistService, final
Collection<String> schemaNames) {
- Map<String, Map<String, DataSourceConfiguration>> result = new
LinkedHashMap<>();
- for (String each : schemaNames) {
- result.put(each, persistService.getDataSourceService().load(each));
- }
- return result;
- }
-
- private Map<String, Map<String, DataSourceConfiguration>>
getChangedDataSourceConfigurations(final Map<String, Map<String, DataSource>>
configuredDataSourcesMap,
-
final Map<String, Map<String, DataSourceConfiguration>>
loadedDataSourceConfigs) {
- if (isEmptyLocalDataSourcesMap(configuredDataSourcesMap)) {
- return loadedDataSourceConfigs;
- }
- Map<String, Map<String, DataSourceConfiguration>> result = new
HashMap<>(loadedDataSourceConfigs.size(), 1);
- for (Entry<String, Map<String, DataSourceConfiguration>> entry :
loadedDataSourceConfigs.entrySet()) {
- if (configuredDataSourcesMap.containsKey(entry.getKey())) {
- Map<String, DataSourceConfiguration> changedDataSources =
getChangedDataSourcesConfigurations(configuredDataSourcesMap.get(entry.getKey()),
entry.getValue());
- if (!changedDataSources.isEmpty()) {
- result.put(entry.getKey(), changedDataSources);
- }
- } else {
- result.put(entry.getKey(), entry.getValue());
- }
- }
- return result;
- }
-
- private Map<String, DataSourceConfiguration>
getChangedDataSourcesConfigurations(final Map<String, DataSource>
dataSourceMap,
-
final Map<String, DataSourceConfiguration>
loadedDataSourceConfigurationMap) {
- Map<String, DataSourceConfiguration> dataSourceConfigurationMap =
DataSourceConverter.getDataSourceConfigurationMap(dataSourceMap);
- return
loadedDataSourceConfigurationMap.entrySet().stream().filter(entry ->
!dataSourceConfigurationMap.containsKey(entry.getKey())
- ||
!dataSourceConfigurationMap.get(entry.getKey()).equals(entry.getValue())).collect(Collectors.toMap(Entry::getKey,
Entry::getValue));
- }
-
- private Map<String, Collection<RuleConfiguration>> loadSchemaRules(final
PersistService persistService, final Collection<String> schemaNames) {
- return schemaNames.stream().collect(Collectors.toMap(
- each -> each, each ->
persistService.getSchemaRuleService().load(each), (oldValue, currentValue) ->
oldValue, LinkedHashMap::new));
- }
-
- private TransactionContexts createTransactionContexts(final
MetaDataContexts metaDataContexts) {
- Map<String, ShardingSphereTransactionManagerEngine> engines = new
HashMap<>(metaDataContexts.getAllSchemaNames().size(), 1);
- TransactionRule transactionRule = getTransactionRule(metaDataContexts);
- for (String each : metaDataContexts.getAllSchemaNames()) {
- ShardingSphereTransactionManagerEngine engine = new
ShardingSphereTransactionManagerEngine();
- ShardingSphereResource resource =
metaDataContexts.getMetaData(each).getResource();
- engine.init(resource.getDatabaseType(), resource.getDataSources(),
transactionRule);
- engines.put(each, engine);
- }
- return new TransactionContexts(engines);
}
/**
@@ -474,14 +330,6 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
}
- private Map<String, Map<String, DataSource>> getChangedDataSources(final
Map<String, Map<String, DataSourceConfiguration>>
changedDataSourceConfigurations) {
- Map<String, Map<String, DataSource>> result = new
LinkedHashMap<>(changedDataSourceConfigurations.size(), 1);
- for (Entry<String, Map<String, DataSourceConfiguration>> entry :
changedDataSourceConfigurations.entrySet()) {
- result.put(entry.getKey(),
DataSourceConverter.getDataSourceMap(entry.getValue()));
- }
- return result;
- }
-
private Map<String, DataSource> getChangedDataSources(final
ShardingSphereMetaData originalMetaData, final Map<String,
DataSourceConfiguration> newDataSourceConfigs) {
Collection<String> changedDataSourceNames =
getChangedDataSourceConfiguration(originalMetaData,
newDataSourceConfigs).keySet();
return
originalMetaData.getResource().getDataSources().entrySet().stream().filter(entry
-> changedDataSourceNames.contains(entry.getKey()))
@@ -585,36 +433,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
}
- private TransactionRule getTransactionRule(final MetaDataContexts
metaDataContexts) {
- Optional<TransactionRule> transactionRule =
metaDataContexts.getGlobalRuleMetaData().getRules().stream().filter(
- each -> each instanceof TransactionRule).map(each ->
(TransactionRule) each).findFirst();
- return transactionRule.orElseGet(() -> new TransactionRule(new
TransactionRuleConfiguration(TransactionType.LOCAL.name(), null)));
- }
-
private void renewContexts(final Map<String,
ShardingSphereTransactionManagerEngine> engines) {
contextManager.renewTransactionContexts(new
TransactionContexts(engines));
}
-
- private void disableDataSources() {
- metaDataContexts.getMetaDataMap().forEach((key, value)
- -> value.getRuleMetaData().getRules().stream().filter(each -> each
instanceof StatusContainedRule).forEach(each -> disableDataSources(key,
(StatusContainedRule) each)));
- }
-
- private void disableDataSources(final String schemaName, final
StatusContainedRule rule) {
- Collection<String> disabledDataSources =
registryCenter.getDataSourceStatusService().loadDisabledDataSources(schemaName);
- disabledDataSources.stream().map(this::getDataSourceName).forEach(each
-> rule.updateStatus(new DataSourceNameDisabledEvent(each, true)));
- }
-
- private String getDataSourceName(final String disabledDataSource) {
- return new GovernanceSchema(disabledDataSource).getDataSourceName();
- }
-
- private void persistMetaData() {
- metaDataContexts.getMetaDataMap().forEach((key, value) ->
persistService.getSchemaMetaDataService().persist(key, value.getSchema()));
- }
-
- @Override
- public String getType() {
- return "Cluster";
- }
}
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/RegistryCenter.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
similarity index 98%
rename from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/RegistryCenter.java
rename to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index ce8488f..cea725c 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/RegistryCenter.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import lombok.Getter;
import
org.apache.shardingsphere.mode.manager.cluster.governance.GovernanceInstance;
diff --git
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
similarity index 93%
rename from
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
rename to
shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 31e1429..f313652 100644
---
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilderTest.java
+++
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator;
import lombok.SneakyThrows;
import org.apache.shardingsphere.authority.config.AuthorityRuleConfiguration;
@@ -39,6 +39,7 @@ import
org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
import
org.apache.shardingsphere.infra.optimize.core.metadata.FederateSchemaMetadatas;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
+import
org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.authority.event.AuthorityChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangeCompletedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.governance.registry.config.event.datasource.DataSourceChangedEvent;
@@ -62,7 +63,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
-import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
import java.sql.SQLException;
@@ -90,9 +90,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public final class ClusterContextManagerBuilderTest {
+public final class ClusterContextManagerCoordinatorTest {
- private ClusterContextManagerBuilder builder;
+ private ClusterContextManagerCoordinator coordinator;
private ContextManager contextManager;
@@ -116,12 +116,12 @@ public final class ClusterContextManagerBuilderTest {
public void setUp() {
PersistRepositoryConfiguration persistRepositoryConfiguration = new
ClusterPersistRepositoryConfiguration("TEST", "", "", new Properties());
ModeConfiguration configuration = new ModeConfiguration("Cluster",
persistRepositoryConfiguration, false);
- builder = new ClusterContextManagerBuilder();
+ ClusterContextManagerBuilder builder = new
ClusterContextManagerBuilder();
contextManager = builder.build(configuration, new HashMap<>(), new
HashMap<>(), new LinkedList<>(), new Properties(), false);
- FieldSetter.setField(builder,
builder.getClass().getDeclaredField("persistService"), persistService);
- contextManager.renewMetaDataContexts(new
MetaDataContexts(mock(PersistService.class), createMetaDataMap(),
globalRuleMetaData, mock(ExecutorEngine.class),
+ contextManager.renewMetaDataContexts(new
MetaDataContexts(mock(PersistService.class), createMetaDataMap(),
globalRuleMetaData, mock(ExecutorEngine.class),
new ConfigurationProperties(new Properties()),
mockOptimizeContextFactory()));
contextManager.renewTransactionContexts(mock(TransactionContexts.class,
RETURNS_DEEP_STUBS));
+ coordinator = new ClusterContextManagerCoordinator(persistService,
contextManager);
}
@Test
@@ -129,7 +129,7 @@ public final class ClusterContextManagerBuilderTest {
SchemaAddedEvent event = new SchemaAddedEvent("schema_add");
when(persistService.getDataSourceService().load("schema_add")).thenReturn(getDataSourceConfigurations());
when(persistService.getSchemaRuleService().load("schema_add")).thenReturn(Collections.emptyList());
- builder.renew(event);
+ coordinator.renew(event);
assertNotNull(contextManager.getMetaDataContexts().getMetaData("schema_add"));
assertNotNull(contextManager.getMetaDataContexts().getMetaData("schema_add").getResource().getDataSources());
}
@@ -146,7 +146,7 @@ public final class ClusterContextManagerBuilderTest {
@Test
public void assertSchemaDelete() {
SchemaDeletedEvent event = new SchemaDeletedEvent("schema");
- builder.renew(event);
+ coordinator.renew(event);
assertNull(contextManager.getMetaDataContexts().getMetaData("schema"));
}
@@ -155,14 +155,14 @@ public final class ClusterContextManagerBuilderTest {
Properties properties = new Properties();
properties.setProperty(ConfigurationPropertyKey.SQL_SHOW.getKey(),
"true");
PropertiesChangedEvent event = new PropertiesChangedEvent(properties);
- builder.renew(event);
+ coordinator.renew(event);
assertThat(contextManager.getMetaDataContexts().getProps().getProps().getProperty(ConfigurationPropertyKey.SQL_SHOW.getKey()),
is("true"));
}
@Test
public void assertSchemaChanged() {
SchemaChangedEvent event = new SchemaChangedEvent("schema_changed",
mock(ShardingSphereSchema.class));
- builder.renew(event);
+ coordinator.renew(event);
assertTrue(contextManager.getMetaDataContexts().getAllSchemaNames().contains("schema"));
assertFalse(contextManager.getMetaDataContexts().getAllSchemaNames().contains("schema_changed"));
}
@@ -170,7 +170,7 @@ public final class ClusterContextManagerBuilderTest {
@Test
public void assertSchemaChangedWithExistSchema() {
SchemaChangedEvent event = new SchemaChangedEvent("schema",
mock(ShardingSphereSchema.class));
- builder.renew(event);
+ coordinator.renew(event);
assertThat(contextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
}
@@ -178,20 +178,20 @@ public final class ClusterContextManagerBuilderTest {
public void assertRuleConfigurationsChanged() throws SQLException {
assertThat(contextManager.getMetaDataContexts().getMetaData("schema"),
is(metaData));
RuleConfigurationsChangedEvent event = new
RuleConfigurationsChangedEvent("schema", new LinkedList<>());
- builder.renew(event);
+ coordinator.renew(event);
assertThat(contextManager.getMetaDataContexts().getMetaData("schema"),
not(metaData));
}
@Test
public void assertDisableStateChanged() {
DisabledStateChangedEvent event = new DisabledStateChangedEvent(new
GovernanceSchema("schema.ds_0"), true);
- builder.renew(event);
+ coordinator.renew(event);
}
@Test
public void assertDataSourceChanged() throws SQLException {
DataSourceChangedEvent event = new DataSourceChangedEvent("schema",
getChangedDataSourceConfigurations());
- builder.renew(event);
+ coordinator.renew(event);
assertTrue(contextManager.getMetaDataContexts().getMetaData("schema").getResource().getDataSources().containsKey("ds_2"));
}
@@ -207,7 +207,7 @@ public final class ClusterContextManagerBuilderTest {
@Test
public void assertGlobalRuleConfigurationsChanged() {
GlobalRuleConfigurationsChangedEvent event = new
GlobalRuleConfigurationsChangedEvent(getChangedGlobalRuleConfigurations());
- builder.renew(event);
+ coordinator.renew(event);
assertThat(contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
not(globalRuleMetaData));
}
@@ -227,7 +227,7 @@ public final class ClusterContextManagerBuilderTest {
public void assertAuthorityChanged() {
when(contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()).thenReturn(createAuthorityRule());
AuthorityChangedEvent event = new
AuthorityChangedEvent(getShardingSphereUsers());
- builder.renew(event);
+ coordinator.renew(event);
Optional<AuthorityRule> authorityRule =
contextManager.getMetaDataContexts().getGlobalRuleMetaData().getRules()
.stream().filter(each -> each instanceof
AuthorityRule).findAny().map(each -> (AuthorityRule) each);
assertTrue(authorityRule.isPresent());
@@ -245,7 +245,7 @@ public final class ClusterContextManagerBuilderTest {
DataSourceChangeCompletedEvent event = new
DataSourceChangeCompletedEvent("name", mock(DatabaseType.class),
Collections.emptyMap());
when(contextManager.getTransactionContexts().getEngines()).thenReturn(engines);
when(engines.remove("name")).thenReturn(engine);
- builder.renewTransactionContext(event);
+ coordinator.renewTransactionContext(event);
verify(engine).close();
verify(engines).put(eq("name"),
any(ShardingSphereTransactionManagerEngine.class));
}
@@ -255,7 +255,7 @@ public final class ClusterContextManagerBuilderTest {
DataSourceDeletedEvent event = new DataSourceDeletedEvent("name");
when(contextManager.getTransactionContexts().getEngines()).thenReturn(engines);
when(engines.remove("name")).thenReturn(engine);
- builder.renewTransactionContext(event);
+ coordinator.renewTransactionContext(event);
verify(engine).close();
}