This is an automated email from the ASF dual-hosted git repository. jianglongtao pushed a commit to branch fix-33341 in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
commit c1aaaa92426336fb37fe0e6b56a095292b46dc8c Author: zhaojinchao <[email protected]> AuthorDate: Thu Aug 8 19:05:56 2024 +0800 Add meta data listener assisted node path to dynamic watch create or drop database --- .../metadata/data/loader/MySQLMetaDataLoader.java | 5 +- .../persist/MetaDataBasedPersistService.java | 8 +++ .../metadata/persist/MetaDataPersistService.java | 4 ++ .../metadata/persist/pojo/ListenerAssisted.java | 38 ++++++++++++ .../persist/pojo/ListenerAssistedType.java | 27 +++++++++ .../database/ListenerAssistedPersistService.java | 51 ++++++++++++++++ .../CreateDatabaseListenerAssistedEvent.java | 32 ++++++++++ .../DropDatabaseListenerAssistedEvent.java | 32 ++++++++++ .../mode/path/ListenerAssistedNodePath.java | 65 ++++++++++++++++++++ .../shardingsphere/mode/spi/PersistRepository.java | 9 +++ .../mode/path/ListenerAssistedNodePathTest.java | 47 +++++++++++++++ .../cluster/ClusterContextManagerBuilder.java | 22 ++++++- .../manager/cluster/ClusterModeContextManager.java | 7 +++ .../cluster/coordinator/RegistryCenter.java | 19 +----- .../DataChangedEventListenerManager.java} | 47 +++++++-------- .../watch/ListenerAssistedChangedWatcher.java} | 43 +++++++------- .../coordinator/registry/GovernanceWatcher.java | 3 +- .../registry/GovernanceWatcherFactory.java | 4 +- .../config/watcher/GlobalRuleChangedWatcher.java | 2 +- .../config/watcher/PropertiesChangedWatcher.java | 2 +- .../data/ShardingSphereDataChangedWatcher.java | 2 +- ...edWatcher.java => MetaDataChangedListener.java} | 25 ++++---- .../watcher/ClusterStateChangedWatcher.java | 2 +- .../watcher/ComputeNodeStateChangedWatcher.java | 2 +- .../watcher/StorageNodeStateChangedWatcher.java | 2 +- .../subscriber/ContextManagerSubscriberFacade.java | 1 + .../subscriber/ListenerAssistedSubscriber.java | 69 ++++++++++++++++++++++ ....cluster.coordinator.registry.GovernanceWatcher | 2 +- .../fixture/ClusterPersistRepositoryFixture.java | 4 ++ ...ProcessListClusterPersistRepositoryFixture.java | 4 ++ .../ProcessListChangedSubscriberTest.java | 2 +- .../subscriber/StateChangedSubscriberTest.java | 2 +- .../cluster/ClusterPersistRepository.java | 7 +++ .../repository/cluster/etcd/EtcdRepository.java | 4 ++ .../cluster/zookeeper/ZookeeperRepository.java | 16 +++++ .../core/kernel/KernelDistSQLStatementVisitor.java | 3 +- .../converter/DataSourceSegmentsConverter.java | 1 + .../fixture/ClusterPersistRepositoryFixture.java | 5 ++ 38 files changed, 520 insertions(+), 100 deletions(-) diff --git a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java index 748f04d32cf..6c2007c11cb 100644 --- a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java +++ b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java @@ -69,7 +69,10 @@ public final class MySQLMetaDataLoader implements DialectMetaDataLoader { public Collection<SchemaMetaData> load(final MetaDataLoaderMaterial material) throws SQLException { Collection<TableMetaData> tableMetaDataList = new LinkedList<>(); Map<String, Collection<ColumnMetaData>> columnMetaDataMap = loadColumnMetaDataMap(material.getDataSource(), material.getActualTableNames()); - Collection<String> viewNames = columnMetaDataMap.isEmpty() ? Collections.emptySet() : loadViewNames(material.getDataSource(), columnMetaDataMap.keySet()); Map<String, Collection<IndexMetaData>> indexMetaDataMap = columnMetaDataMap.isEmpty() ? Collections.emptyMap() : loadIndexMetaData(material.getDataSource(), columnMetaDataMap.keySet()); + Collection<String> viewNames = columnMetaDataMap.isEmpty() ? Collections.emptySet() + : loadViewNames(material.getDataSource(), columnMetaDataMap.keySet()); + Map<String, Collection<IndexMetaData>> indexMetaDataMap = columnMetaDataMap.isEmpty() ? Collections.emptyMap() + : loadIndexMetaData(material.getDataSource(), columnMetaDataMap.keySet()); Map<String, Collection<ConstraintMetaData>> constraintMetaDataMap = columnMetaDataMap.isEmpty() ? Collections.emptyMap() : loadConstraintMetaDataMap(material.getDataSource(), columnMetaDataMap.keySet()); for (Entry<String, Collection<ColumnMetaData>> entry : columnMetaDataMap.entrySet()) { diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java index 58ea367569f..9b0a1fefd6c 100644 --- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java @@ -26,6 +26,7 @@ import org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataBasedPe import org.apache.shardingsphere.metadata.persist.service.config.database.DatabaseBasedPersistService; import org.apache.shardingsphere.metadata.persist.service.config.global.GlobalPersistService; import org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataBasedPersistService; +import org.apache.shardingsphere.metadata.persist.service.database.ListenerAssistedPersistService; import org.apache.shardingsphere.metadata.persist.service.version.MetaDataVersionBasedPersistService; import org.apache.shardingsphere.mode.spi.PersistRepository; @@ -120,4 +121,11 @@ public interface MetaDataBasedPersistService { * @return effective data sources */ Map<String, DataSourceConfiguration> loadDataSourceConfigurations(String databaseName); + + /** + * Get listener assisted persist service. + * + * @return listener assisted persist service + */ + ListenerAssistedPersistService getListenerAssistedPersistService(); } diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java index 6553944e7d4..96b8fa2f26b 100644 --- a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java @@ -33,6 +33,7 @@ import org.apache.shardingsphere.metadata.persist.service.config.database.rule.D import org.apache.shardingsphere.metadata.persist.service.config.global.GlobalRulePersistService; import org.apache.shardingsphere.metadata.persist.service.config.global.PropertiesPersistService; import org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataPersistService; +import org.apache.shardingsphere.metadata.persist.service.database.ListenerAssistedPersistService; import org.apache.shardingsphere.metadata.persist.service.version.MetaDataVersionPersistService; import org.apache.shardingsphere.mode.spi.PersistRepository; @@ -70,6 +71,8 @@ public final class MetaDataPersistService implements MetaDataBasedPersistService private final ShardingSphereDataPersistService shardingSphereDataPersistService; + private final ListenerAssistedPersistService listenerAssistedPersistService; + public MetaDataPersistService(final PersistRepository repository) { this.repository = repository; metaDataVersionPersistService = new MetaDataVersionPersistService(repository); @@ -80,6 +83,7 @@ public final class MetaDataPersistService implements MetaDataBasedPersistService globalRuleService = new GlobalRulePersistService(repository); propsService = new PropertiesPersistService(repository); shardingSphereDataPersistService = new ShardingSphereDataPersistService(repository); + listenerAssistedPersistService = new ListenerAssistedPersistService(repository); } /** diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssisted.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssisted.java new file mode 100644 index 00000000000..c92e4387560 --- /dev/null +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssisted.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.metadata.persist.pojo; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration; + +/** + * Listener assisted. + */ +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +public final class ListenerAssisted implements YamlConfiguration { + + private String databaseName; + + private ListenerAssistedType listenerAssistedType; +} diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssistedType.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssistedType.java new file mode 100644 index 00000000000..efa42e900e7 --- /dev/null +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssistedType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.metadata.persist.pojo; + +/** + * Listener assisted type. + */ +public enum ListenerAssistedType { + CREATE_DATABASE, + + DROP_DATABASE +} diff --git a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/database/ListenerAssistedPersistService.java b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/database/ListenerAssistedPersistService.java new file mode 100644 index 00000000000..9b24ccff437 --- /dev/null +++ b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/database/ListenerAssistedPersistService.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.metadata.persist.service.database; + +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; +import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssisted; +import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath; +import org.apache.shardingsphere.mode.spi.PersistRepository; + +/** + * Listener assisted persist service. + */ +@RequiredArgsConstructor +public final class ListenerAssistedPersistService { + + private final PersistRepository repository; + + /** + * Persist database name listener assisted. + * + * @param listenerAssisted listener assisted pojo + */ + public void persistDatabaseNameListenerAssisted(final ListenerAssisted listenerAssisted) { + repository.persistEphemeral(ListenerAssistedNodePath.getDatabaseNameNodePath(listenerAssisted.getDatabaseName()), YamlEngine.marshal(listenerAssisted)); + } + + /** + * Delete database name listener assisted. + * + * @param databaseName database name + */ + public void deleteDatabaseNameListenerAssisted(final String databaseName) { + repository.delete(ListenerAssistedNodePath.getDatabaseNameNodePath(databaseName)); + } +} diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/CreateDatabaseListenerAssistedEvent.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/CreateDatabaseListenerAssistedEvent.java new file mode 100644 index 00000000000..9a955d6b4f9 --- /dev/null +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/CreateDatabaseListenerAssistedEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.event.assisted; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.rule.event.GovernanceEvent; + +/** + * Create database listener assisted event. + */ +@RequiredArgsConstructor +@Getter +public final class CreateDatabaseListenerAssistedEvent implements GovernanceEvent { + + private final String databaseName; +} diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/DropDatabaseListenerAssistedEvent.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/DropDatabaseListenerAssistedEvent.java new file mode 100644 index 00000000000..662d9b2a8ae --- /dev/null +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/DropDatabaseListenerAssistedEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.event.assisted; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.rule.event.GovernanceEvent; + +/** + * Drop database listener assisted event. + */ +@RequiredArgsConstructor +@Getter +public final class DropDatabaseListenerAssistedEvent implements GovernanceEvent { + + private final String databaseName; +} diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java new file mode 100644 index 00000000000..b32bdf7af90 --- /dev/null +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.path; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Listener assisted node path. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class ListenerAssistedNodePath { + + private static final String ROOT_NODE = "listener_assisted"; + + /** + * Get root node path. + * + * @return root node path + */ + public static String getRootNodePath() { + return String.join("/", "", ROOT_NODE); + } + + /** + * Get database name by node path. + * + * @param nodePath node path + * @return database name + */ + public static Optional<String> getDatabaseName(final String nodePath) { + Pattern pattern = Pattern.compile(getRootNodePath() + "/(\\w+)$", Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(nodePath); + return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty(); + } + + /** + * Get database base name node path. + * + * @param databaseName database name + * @return database name node path + */ + public static String getDatabaseNameNodePath(final String databaseName) { + return String.join("/", "", ROOT_NODE, databaseName); + } +} diff --git a/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java b/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java index d77b3f6f118..b6d1c0cb1e0 100644 --- a/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java +++ b/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java @@ -63,6 +63,15 @@ public interface PersistRepository extends TypedSPI { */ void persist(String key, String value); + /** + * Persist ephemeral data. + * + * @param key key of data + * @param value value of data + */ + default void persistEphemeral(String key, String value) { + } + /** * Update data. * diff --git a/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java new file mode 100644 index 00000000000..e21cc563544 --- /dev/null +++ b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.path; + +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ListenerAssistedNodePathTest { + + @Test + void assertRooPath() { + assertThat(ListenerAssistedNodePath.getRootNodePath(), is("/listener_assisted")); + } + + @Test + void assertGetDatabaseName() { + Optional<String> actual = ListenerAssistedNodePath.getDatabaseName("/listener_assisted/foo_db"); + assertTrue(actual.isPresent()); + assertThat(actual.get(), Matchers.is("foo_db")); + } + + @Test + void assertGetDatabaseNameNodePath() { + assertThat(ListenerAssistedNodePath.getDatabaseNameNodePath("foo_db"), is("/listener_assisted/foo_db")); + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index 16966cf98fc..3dffcdfbfc6 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -21,15 +21,19 @@ import com.google.common.base.Preconditions; import org.apache.shardingsphere.infra.instance.ComputeNodeInstance; import org.apache.shardingsphere.infra.instance.InstanceContext; import org.apache.shardingsphere.infra.instance.InstanceContextAware; +import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; import org.apache.shardingsphere.metadata.persist.MetaDataPersistService; +import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; import org.apache.shardingsphere.mode.lock.GlobalLockContext; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.ContextManagerAware; import org.apache.shardingsphere.mode.manager.ContextManagerBuilder; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.DataChangedEventListenerManager; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedListener; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator; import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberFacade; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; @@ -39,6 +43,7 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber; import java.sql.SQLException; +import java.util.Collection; /** * Cluster context manager builder. @@ -48,7 +53,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder @Override public ContextManager build(final ContextManagerBuilderParameter param) throws SQLException { ClusterPersistRepository repository = getClusterPersistRepository((ClusterPersistRepositoryConfiguration) param.getModeConfiguration().getRepository()); - RegistryCenter registryCenter = new RegistryCenter(repository, new EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs()); + RegistryCenter registryCenter = new RegistryCenter(repository, new EventBusContext()); InstanceContext instanceContext = buildInstanceContext(registryCenter, param); if (registryCenter.getRepository() instanceof InstanceContextAware) { ((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext); @@ -57,7 +62,7 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, instanceContext, registryCenter.getStorageNodeStatusService().loadStorageNodes()); ContextManager result = new ContextManager(metaDataContexts, instanceContext); setContextManagerAware(result); - registerOnline(registryCenter, param, result); + registerOnline(registryCenter, param, persistService, result); return result; } @@ -77,8 +82,9 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder ((ContextManagerAware) contextManager.getInstanceContext().getModeContextManager()).setContextManagerAware(contextManager); } - private void registerOnline(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final ContextManager contextManager) { + private void registerOnline(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final MetaDataPersistService persistService, final ContextManager contextManager) { registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance()); + watchDatabaseMetaDataListener(param, persistService, contextManager.getInstanceContext().getEventBusContext()); loadClusterStatus(registryCenter, contextManager); contextManager.getInstanceContext().getInstance().setLabels(param.getLabels()); contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances()); @@ -86,6 +92,16 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder new ContextManagerSubscriberFacade(registryCenter, contextManager); } + private void watchDatabaseMetaDataListener(final ContextManagerBuilderParameter param, final MetaDataPersistService metaDataPersistService, final EventBusContext eventBusContext) { + getDatabaseNames(param, metaDataPersistService).forEach(each -> + new DataChangedEventListenerManager((ClusterPersistRepository) metaDataPersistService.getRepository()) + .addListener(DatabaseMetaDataNode.getDatabaseNamePath(each), new MetaDataChangedListener(eventBusContext))); + } + + private Collection<String> getDatabaseNames(final ContextManagerBuilderParameter param, final MetaDataPersistService metaDataPersistService) { + return param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : metaDataPersistService.getDatabaseMetaDataService().loadAllDatabaseNames(); + } + private void loadClusterStatus(final RegistryCenter registryCenter, final ContextManager contextManager) { registryCenter.persistClusterState(contextManager); contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus()); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java index 05d76eee5a0..e338944c2c5 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java @@ -26,6 +26,8 @@ import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp import org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaMetaDataPOJO; import org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaPOJO; import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion; +import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssisted; +import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssistedType; import org.apache.shardingsphere.metadata.persist.service.config.database.DatabaseBasedPersistService; import org.apache.shardingsphere.metadata.persist.service.config.global.GlobalPersistService; import org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataBasedPersistService; @@ -50,10 +52,15 @@ public final class ClusterModeContextManager implements ModeContextManager, Cont @Override public void createDatabase(final String databaseName) { contextManager.getMetaDataContexts().getPersistService().getDatabaseMetaDataService().addDatabase(databaseName); + contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService() + .persistDatabaseNameListenerAssisted(new ListenerAssisted(databaseName, ListenerAssistedType.CREATE_DATABASE)); } @Override public void dropDatabase(final String databaseName) { + String droppedDatabaseName = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName).getName(); + contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService() + .persistDatabaseNameListenerAssisted(new ListenerAssisted(droppedDatabaseName, ListenerAssistedType.DROP_DATABASE)); contextManager.getMetaDataContexts().getPersistService().getDatabaseMetaDataService().dropDatabase(databaseName); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java index c77239724a2..be7530f6375 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java @@ -19,10 +19,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator; import com.google.common.base.Strings; import lombok.Getter; -import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration; import org.apache.shardingsphere.infra.instance.ComputeNodeInstance; -import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData; -import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.metadata.persist.node.ComputeNode; import org.apache.shardingsphere.mode.manager.ContextManager; @@ -40,7 +37,6 @@ import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; -import java.util.Map; import java.util.Properties; /** @@ -66,23 +62,16 @@ public final class RegistryCenter { @Getter private final EventBusContext eventBusContext; - private final InstanceMetaData instanceMetaData; - - private final Map<String, DatabaseConfiguration> databaseConfigs; - private final GovernanceWatcherFactory listenerFactory; - public RegistryCenter(final ClusterPersistRepository repository, final EventBusContext eventBusContext, - final InstanceMetaData instanceMetaData, final Map<String, DatabaseConfiguration> databaseConfigs) { + public RegistryCenter(final ClusterPersistRepository repository, final EventBusContext eventBusContext) { this.repository = repository; this.eventBusContext = eventBusContext; - this.instanceMetaData = instanceMetaData; - this.databaseConfigs = databaseConfigs; storageNodeStatusService = new StorageNodeStatusService(repository); clusterStatusService = new ClusterStatusService(repository); computeNodeStatusService = new ComputeNodeStatusService(repository); globalLockPersistService = new GlobalLockPersistService(initDistributedLockHolder(repository)); - listenerFactory = new GovernanceWatcherFactory(repository, eventBusContext, getJDBCDatabaseName()); + listenerFactory = new GovernanceWatcherFactory(repository, eventBusContext); createSubscribers(repository); } @@ -91,10 +80,6 @@ public final class RegistryCenter { return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder; } - private String getJDBCDatabaseName() { - return instanceMetaData instanceof JDBCInstanceMetaData ? databaseConfigs.keySet().stream().findFirst().orElse(null) : null; - } - private void createSubscribers(final ClusterPersistRepository repository) { new ComputeNodeStatusSubscriber(this, repository); new ClusterStatusSubscriber(repository, eventBusContext); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/DataChangedEventListenerManager.java similarity index 51% copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/DataChangedEventListenerManager.java index bde52b668ab..1b33cd6bd4f 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/DataChangedEventListenerManager.java @@ -15,43 +15,36 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry; +package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener; -import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI; -import org.apache.shardingsphere.mode.event.DataChangedEvent; -import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; - -import java.util.Collection; -import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; /** - * Governance watcher. - * - * @param <T> type of event + * Data changed event listener manager. */ -@SingletonSPI -public interface GovernanceWatcher<T> { +@RequiredArgsConstructor +public final class DataChangedEventListenerManager { - /** - * Get watching keys. - * - * @param databaseName database name - * @return watching keys - */ - Collection<String> getWatchingKeys(String databaseName); + private final ClusterPersistRepository repository; /** - * Get watching types. + * Add listener. * - * @return watching types + * @param listenerKey listener key + * @param dataChangedEventListener data changed event listener */ - Collection<Type> getWatchingTypes(); + public void addListener(final String listenerKey, final DataChangedEventListener dataChangedEventListener) { + repository.watch(listenerKey, dataChangedEventListener); + } /** - * Create governance event. - * - * @param event registry center data changed event - * @return governance event + * Remove listener. + * + * @param listenerKey listener key */ - Optional<T> createGovernanceEvent(DataChangedEvent event); + public void removeListener(final String listenerKey) { + repository.removeDataListener(listenerKey); + } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java similarity index 53% copy from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java copy to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java index 7de3c72c5cf..ffa6d2fd414 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java @@ -15,20 +15,18 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher; +package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch; -import com.google.common.base.Strings; -import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase; -import org.apache.shardingsphere.infra.util.yaml.YamlEngine; import org.apache.shardingsphere.infra.rule.event.GovernanceEvent; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent; -import org.apache.shardingsphere.mode.storage.node.StorageNode; -import org.apache.shardingsphere.mode.storage.yaml.YamlStorageNodeDataSource; -import org.apache.shardingsphere.mode.storage.yaml.YamlStorageNodeDataSourceSwapper; -import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSource; +import org.apache.shardingsphere.infra.util.yaml.YamlEngine; +import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssisted; +import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssistedType; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.event.assisted.CreateDatabaseListenerAssistedEvent; +import org.apache.shardingsphere.mode.event.assisted.DropDatabaseListenerAssistedEvent; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher; +import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath; import java.util.Arrays; import java.util.Collection; @@ -36,31 +34,32 @@ import java.util.Collections; import java.util.Optional; /** - * Storage node state changed watcher. + * Listener assisted changed watcher. */ -public final class StorageNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> { +public final class ListenerAssistedChangedWatcher implements GovernanceWatcher<GovernanceEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { - return Collections.singletonList(StorageNode.getRootPath()); + public Collection<String> getWatchingKeys() { + return Collections.singleton(ListenerAssistedNodePath.getRootNodePath()); } @Override - public Collection<Type> getWatchingTypes() { + public Collection<DataChangedEvent.Type> getWatchingTypes() { return Arrays.asList(Type.ADDED, Type.UPDATED); } @Override public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) { - if (Strings.isNullOrEmpty(event.getValue())) { + Optional<String> databaseName = ListenerAssistedNodePath.getDatabaseName(event.getKey()); + if (!databaseName.isPresent()) { return Optional.empty(); } - Optional<QualifiedDatabase> qualifiedDatabase = StorageNode.extractQualifiedDatabase(event.getKey()); - if (qualifiedDatabase.isPresent()) { - QualifiedDatabase database = qualifiedDatabase.get(); - StorageNodeDataSource storageNodeDataSource = new YamlStorageNodeDataSourceSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(), YamlStorageNodeDataSource.class)); - return Optional.of(new StorageNodeChangedEvent(database, storageNodeDataSource)); + ListenerAssisted data = YamlEngine.unmarshal(event.getValue(), ListenerAssisted.class); + if (ListenerAssistedType.CREATE_DATABASE == data.getListenerAssistedType()) { + return Optional.of(new CreateDatabaseListenerAssistedEvent(databaseName.get())); } - return Optional.empty(); + return ListenerAssistedType.DROP_DATABASE == data.getListenerAssistedType() + ? Optional.of(new DropDatabaseListenerAssistedEvent(databaseName.get())) + : Optional.empty(); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java index bde52b668ab..efb25e49d86 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java @@ -35,10 +35,9 @@ public interface GovernanceWatcher<T> { /** * Get watching keys. * - * @param databaseName database name * @return watching keys */ - Collection<String> getWatchingKeys(String databaseName); + Collection<String> getWatchingKeys(); /** * Get watching types. diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java index 6b235b3a6c1..9e84cfa7b74 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java @@ -32,8 +32,6 @@ public final class GovernanceWatcherFactory { private final EventBusContext eventBusContext; - private final String databaseName; - /** * Watch listeners. */ @@ -44,7 +42,7 @@ public final class GovernanceWatcherFactory { } private void watch(final GovernanceWatcher<?> listener) { - for (String each : listener.getWatchingKeys(databaseName)) { + for (String each : listener.getWatchingKeys()) { watch(each, listener); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java index 7ddf0605718..b6b4c0e4931 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java @@ -36,7 +36,7 @@ import java.util.Optional; public final class GlobalRuleChangedWatcher implements GovernanceWatcher<GovernanceEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { + public Collection<String> getWatchingKeys() { return Collections.singleton(GlobalNode.getGlobalRuleRootNode()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java index 0bef3501fdf..a0926f82599 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java @@ -35,7 +35,7 @@ import java.util.Optional; public final class PropertiesChangedWatcher implements GovernanceWatcher<AlterPropertiesEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { + public Collection<String> getWatchingKeys() { return Collections.singleton(GlobalNode.getPropsRootNode()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java index 04d6a87f9eb..69566a166fe 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java @@ -45,7 +45,7 @@ import java.util.Optional; public final class ShardingSphereDataChangedWatcher implements GovernanceWatcher<GovernanceEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { + public Collection<String> getWatchingKeys() { return Collections.singleton(ShardingSphereDataNode.getShardingSphereDataNodePath()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedListener.java similarity index 92% rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedListener.java index 0901b698fc2..0e0899faeae 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedListener.java @@ -18,7 +18,9 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher; import com.google.common.base.Preconditions; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.rule.event.GovernanceEvent; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; import org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode; import org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode; @@ -35,38 +37,31 @@ import org.apache.shardingsphere.mode.event.schema.table.AlterTableEvent; import org.apache.shardingsphere.mode.event.schema.table.DropTableEvent; import org.apache.shardingsphere.mode.event.schema.view.AlterViewEvent; import org.apache.shardingsphere.mode.event.schema.view.DropViewEvent; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent; import org.apache.shardingsphere.mode.metadata.builder.RuleConfigurationEventBuilder; +import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.Optional; /** * Meta data changed watcher. */ -public final class MetaDataChangedWatcher implements GovernanceWatcher<GovernanceEvent> { +@RequiredArgsConstructor +public final class MetaDataChangedListener implements DataChangedEventListener { - private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder = new RuleConfigurationEventBuilder(); + private final EventBusContext eventBusContext; - @Override - public Collection<String> getWatchingKeys(final String databaseName) { - return null == databaseName ? Collections.singleton(DatabaseMetaDataNode.getMetaDataNode()) - : Collections.singleton(DatabaseMetaDataNode.getDatabaseNamePath(databaseName)); - } + private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder = new RuleConfigurationEventBuilder(); @Override - public Collection<Type> getWatchingTypes() { - return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED); + public void onChange(final DataChangedEvent event) { + createGovernanceEvent(event).ifPresent(eventBusContext::post); } - @Override - public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) { + private Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) { String key = event.getKey(); Optional<String> databaseName = DatabaseMetaDataNode.getDatabaseName(key); if (databaseName.isPresent()) { diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java index 5d60abb8f4b..13ad1caaf0f 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java @@ -36,7 +36,7 @@ import java.util.Optional; public final class ClusterStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { + public Collection<String> getWatchingKeys() { return Collections.singleton(ComputeNode.getClusterStatusNodePath()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java index 57b3e7c7471..4e0413263e0 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java @@ -52,7 +52,7 @@ import java.util.regex.Pattern; public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { + public Collection<String> getWatchingKeys() { return Collections.singleton(ComputeNode.getComputeNodePath()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java index 7de3c72c5cf..c3c25e1552c 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java @@ -41,7 +41,7 @@ import java.util.Optional; public final class StorageNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> { @Override - public Collection<String> getWatchingKeys(final String databaseName) { + public Collection<String> getWatchingKeys() { return Collections.singletonList(StorageNode.getRootPath()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java index 53b06867127..a5cbebbf42b 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proce public final class ContextManagerSubscriberFacade { public ContextManagerSubscriberFacade(final RegistryCenter registryCenter, final ContextManager contextManager) { + new ListenerAssistedSubscriber(contextManager); new ConfigurationChangedSubscriber(contextManager); new ResourceMetaDataChangedSubscriber(contextManager); new DatabaseChangedSubscriber(contextManager); diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedSubscriber.java new file mode 100644 index 00000000000..20d4ec5aae4 --- /dev/null +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedSubscriber.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber; + +import com.google.common.eventbus.Subscribe; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode; +import org.apache.shardingsphere.mode.event.assisted.CreateDatabaseListenerAssistedEvent; +import org.apache.shardingsphere.mode.event.assisted.DropDatabaseListenerAssistedEvent; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.DataChangedEventListenerManager; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedListener; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; + +/** + * Listener assisted subscriber. + */ +@RequiredArgsConstructor +public final class ListenerAssistedSubscriber { + + private final ContextManager contextManager; + + private final DataChangedEventListenerManager listenerManager; + + public ListenerAssistedSubscriber(final ContextManager contextManager) { + this.contextManager = contextManager; + contextManager.getInstanceContext().getEventBusContext().register(this); + listenerManager = new DataChangedEventListenerManager((ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository()); + } + + /** + * Renew to persist meta data. + * + * @param event database added event + */ + @Subscribe + public synchronized void renew(final CreateDatabaseListenerAssistedEvent event) { + listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()), + new MetaDataChangedListener(contextManager.getInstanceContext().getEventBusContext())); + contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName()); + } + + /** + * Renew to delete database. + * + * @param event database delete event + */ + @Subscribe + public synchronized void renew(final DropDatabaseListenerAssistedEvent event) { + listenerManager.removeListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName())); + contextManager.getMetaDataContexts().getMetaData().dropDatabase(event.getDatabaseName()); + contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName()); + } +} diff --git a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher index 9c73806ae28..817b28927a9 100644 --- a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher +++ b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher @@ -19,6 +19,6 @@ org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.stora org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher -org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher +org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch.ListenerAssistedChangedWatcher \ No newline at end of file diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java index 149a82771a5..839cae9a53a 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java @@ -75,6 +75,10 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo public void watch(final String key, final DataChangedEventListener listener) { } + @Override + public void removeDataListener(final String key) { + } + @Override public void close() { } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java index e2514ab760d..2e3be58780c 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java @@ -82,6 +82,10 @@ public final class ProcessListClusterPersistRepositoryFixture implements Cluster public void watch(final String key, final DataChangedEventListener listener) { } + @Override + public void removeDataListener(final String key) { + } + @Override public void close() { REGISTRY_DATA.clear(); diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java index 2687874fcd0..bcea8ea4a06 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java @@ -84,7 +84,7 @@ class ProcessListChangedSubscriberTest { contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(), contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties())))); - registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null); + registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext()); subscriber = new ProcessListChangedSubscriber(registryCenter, contextManager); } diff --git a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java index 2c58feddbc7..7d21016d49b 100644 --- a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java +++ b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java @@ -93,7 +93,7 @@ class StateChangedSubscriberTest { contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(), contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(), new ConfigurationProperties(new Properties())))); - subscriber = new StateChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager); + subscriber = new StateChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext()), contextManager); } private ContextManagerBuilderParameter createContextManagerBuilderParameter() { diff --git a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java index cb921c52448..c69988d7844 100644 --- a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java +++ b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java @@ -63,4 +63,11 @@ public interface ClusterPersistRepository extends PersistRepository { * @param listener data changed event listener */ void watch(String key, DataChangedEventListener listener); + + /** + * Remove listener by key. + * + * @param key key to be removed + */ + void removeDataListener(String key); } diff --git a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java index 18bf3d24746..d713ae5d255 100644 --- a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java +++ b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java @@ -166,6 +166,10 @@ public final class EtcdRepository implements ClusterPersistRepository { WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), listener); } + @Override + public void removeDataListener(final String key) { + } + private Type getEventChangedType(final WatchEvent event) { if (1 == event.getKeyValue().getVersion()) { return Type.ADDED; diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java index 44845d3f89a..5f4d8b6f1e7 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -62,6 +63,8 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Inst private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>(); + private final Map<String, CuratorCacheListener> dataListeners = new ConcurrentHashMap<>(); + private final Builder builder = CuratorFrameworkFactory.builder(); private CuratorFramework client; @@ -233,6 +236,9 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Inst @Override public void watch(final String key, final DataChangedEventListener listener) { + if (null != dataListeners.get(key)) { + return; + } CuratorCache cache = caches.get(key); if (null == cache) { cache = CuratorCache.build(client, key); @@ -249,6 +255,16 @@ public final class ZookeeperRepository implements ClusterPersistRepository, Inst }).build(); cache.listenable().addListener(curatorCacheListener); cache.start(); + dataListeners.computeIfAbsent(key, curator -> curatorCacheListener); + } + + @Override + public void removeDataListener(final String key) { + CuratorCacheListener cacheListener = dataListeners.remove(key); + if (null == cacheListener) { + return; + } + Optional.ofNullable(caches.remove(key)).ifPresent(optional -> optional.listenable().removeListener(cacheListener)); } private Type getChangedType(final TreeCacheEvent.Type type) { diff --git a/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java b/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java index 908523ca816..0b81452b968 100644 --- a/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java +++ b/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java @@ -152,7 +152,8 @@ public final class KernelDistSQLStatementVisitor extends KernelDistSQLStatementB String password = null == connectionCtx.password() ? "" : getPassword(connectionCtx.password()); return null == connectionCtx.urlSource() ? new HostnameAndPortBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), - getIdentifierValue(connectionCtx.simpleSource().hostname()), connectionCtx.simpleSource().port().getText(), getIdentifierValue(connectionCtx.simpleSource().dbName()), user, password, props) + getIdentifierValue(connectionCtx.simpleSource().hostname()), connectionCtx.simpleSource().port().getText(), + getIdentifierValue(connectionCtx.simpleSource().dbName()), user, password, props) : new URLBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), getIdentifierValue(connectionCtx.urlSource().url()), user, password, props); } diff --git a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java index b7e03665063..23469dfaade 100644 --- a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java +++ b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java @@ -58,6 +58,7 @@ public final class DataSourceSegmentsConverter { } return result; } + /** * Convert data source segments to data source properties map. * diff --git a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java index 57971f2ee54..80e63be2398 100644 --- a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java +++ b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java @@ -81,6 +81,11 @@ public final class ClusterPersistRepositoryFixture implements ClusterPersistRepo public void watch(final String key, final DataChangedEventListener listener) { } + @Override + public void removeDataListener(final String key) { + + } + @Override public void close() { REGISTRY_DATA.clear();
