This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch fix-33341
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git

commit c1aaaa92426336fb37fe0e6b56a095292b46dc8c
Author: zhaojinchao <[email protected]>
AuthorDate: Thu Aug 8 19:05:56 2024 +0800

    Add meta data listener assisted node path to dynamic watch create or drop 
database
---
 .../metadata/data/loader/MySQLMetaDataLoader.java  |  5 +-
 .../persist/MetaDataBasedPersistService.java       |  8 +++
 .../metadata/persist/MetaDataPersistService.java   |  4 ++
 .../metadata/persist/pojo/ListenerAssisted.java    | 38 ++++++++++++
 .../persist/pojo/ListenerAssistedType.java         | 27 +++++++++
 .../database/ListenerAssistedPersistService.java   | 51 ++++++++++++++++
 .../CreateDatabaseListenerAssistedEvent.java       | 32 ++++++++++
 .../DropDatabaseListenerAssistedEvent.java         | 32 ++++++++++
 .../mode/path/ListenerAssistedNodePath.java        | 65 ++++++++++++++++++++
 .../shardingsphere/mode/spi/PersistRepository.java |  9 +++
 .../mode/path/ListenerAssistedNodePathTest.java    | 47 +++++++++++++++
 .../cluster/ClusterContextManagerBuilder.java      | 22 ++++++-
 .../manager/cluster/ClusterModeContextManager.java |  7 +++
 .../cluster/coordinator/RegistryCenter.java        | 19 +-----
 .../DataChangedEventListenerManager.java}          | 47 +++++++--------
 .../watch/ListenerAssistedChangedWatcher.java}     | 43 +++++++-------
 .../coordinator/registry/GovernanceWatcher.java    |  3 +-
 .../registry/GovernanceWatcherFactory.java         |  4 +-
 .../config/watcher/GlobalRuleChangedWatcher.java   |  2 +-
 .../config/watcher/PropertiesChangedWatcher.java   |  2 +-
 .../data/ShardingSphereDataChangedWatcher.java     |  2 +-
 ...edWatcher.java => MetaDataChangedListener.java} | 25 ++++----
 .../watcher/ClusterStateChangedWatcher.java        |  2 +-
 .../watcher/ComputeNodeStateChangedWatcher.java    |  2 +-
 .../watcher/StorageNodeStateChangedWatcher.java    |  2 +-
 .../subscriber/ContextManagerSubscriberFacade.java |  1 +
 .../subscriber/ListenerAssistedSubscriber.java     | 69 ++++++++++++++++++++++
 ....cluster.coordinator.registry.GovernanceWatcher |  2 +-
 .../fixture/ClusterPersistRepositoryFixture.java   |  4 ++
 ...ProcessListClusterPersistRepositoryFixture.java |  4 ++
 .../ProcessListChangedSubscriberTest.java          |  2 +-
 .../subscriber/StateChangedSubscriberTest.java     |  2 +-
 .../cluster/ClusterPersistRepository.java          |  7 +++
 .../repository/cluster/etcd/EtcdRepository.java    |  4 ++
 .../cluster/zookeeper/ZookeeperRepository.java     | 16 +++++
 .../core/kernel/KernelDistSQLStatementVisitor.java |  3 +-
 .../converter/DataSourceSegmentsConverter.java     |  1 +
 .../fixture/ClusterPersistRepositoryFixture.java   |  5 ++
 38 files changed, 520 insertions(+), 100 deletions(-)

diff --git 
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java
 
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java
index 748f04d32cf..6c2007c11cb 100644
--- 
a/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java
+++ 
b/infra/database/type/mysql/src/main/java/org/apache/shardingsphere/infra/database/mysql/metadata/data/loader/MySQLMetaDataLoader.java
@@ -69,7 +69,10 @@ public final class MySQLMetaDataLoader implements 
DialectMetaDataLoader {
     public Collection<SchemaMetaData> load(final MetaDataLoaderMaterial 
material) throws SQLException {
         Collection<TableMetaData> tableMetaDataList = new LinkedList<>();
         Map<String, Collection<ColumnMetaData>> columnMetaDataMap = 
loadColumnMetaDataMap(material.getDataSource(), material.getActualTableNames());
-        Collection<String> viewNames = columnMetaDataMap.isEmpty() ? 
Collections.emptySet() : loadViewNames(material.getDataSource(), 
columnMetaDataMap.keySet());        Map<String, Collection<IndexMetaData>> 
indexMetaDataMap = columnMetaDataMap.isEmpty() ? Collections.emptyMap() : 
loadIndexMetaData(material.getDataSource(), columnMetaDataMap.keySet());
+        Collection<String> viewNames = columnMetaDataMap.isEmpty() ? 
Collections.emptySet()
+                : loadViewNames(material.getDataSource(), 
columnMetaDataMap.keySet());
+        Map<String, Collection<IndexMetaData>> indexMetaDataMap = 
columnMetaDataMap.isEmpty() ? Collections.emptyMap()
+                : loadIndexMetaData(material.getDataSource(), 
columnMetaDataMap.keySet());
         Map<String, Collection<ConstraintMetaData>> constraintMetaDataMap =
                 columnMetaDataMap.isEmpty() ? Collections.emptyMap() : 
loadConstraintMetaDataMap(material.getDataSource(), columnMetaDataMap.keySet());
         for (Entry<String, Collection<ColumnMetaData>> entry : 
columnMetaDataMap.entrySet()) {
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java
index 58ea367569f..9b0a1fefd6c 100644
--- 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataBasedPersistService.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataBasedPe
 import 
org.apache.shardingsphere.metadata.persist.service.config.database.DatabaseBasedPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.config.global.GlobalPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataBasedPersistService;
+import 
org.apache.shardingsphere.metadata.persist.service.database.ListenerAssistedPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.version.MetaDataVersionBasedPersistService;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
@@ -120,4 +121,11 @@ public interface MetaDataBasedPersistService {
      * @return effective data sources
      */
     Map<String, DataSourceConfiguration> loadDataSourceConfigurations(String 
databaseName);
+    
+    /**
+     * Get listener assisted persist service.
+     *
+     * @return listener assisted persist service
+     */
+    ListenerAssistedPersistService getListenerAssistedPersistService();
 }
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java
index 6553944e7d4..96b8fa2f26b 100644
--- 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/MetaDataPersistService.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.metadata.persist.service.config.database.rule.D
 import 
org.apache.shardingsphere.metadata.persist.service.config.global.GlobalRulePersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.config.global.PropertiesPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataPersistService;
+import 
org.apache.shardingsphere.metadata.persist.service.database.ListenerAssistedPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.version.MetaDataVersionPersistService;
 import org.apache.shardingsphere.mode.spi.PersistRepository;
 
@@ -70,6 +71,8 @@ public final class MetaDataPersistService implements 
MetaDataBasedPersistService
     
     private final ShardingSphereDataPersistService 
shardingSphereDataPersistService;
     
+    private final ListenerAssistedPersistService 
listenerAssistedPersistService;
+    
     public MetaDataPersistService(final PersistRepository repository) {
         this.repository = repository;
         metaDataVersionPersistService = new 
MetaDataVersionPersistService(repository);
@@ -80,6 +83,7 @@ public final class MetaDataPersistService implements 
MetaDataBasedPersistService
         globalRuleService = new GlobalRulePersistService(repository);
         propsService = new PropertiesPersistService(repository);
         shardingSphereDataPersistService = new 
ShardingSphereDataPersistService(repository);
+        listenerAssistedPersistService = new 
ListenerAssistedPersistService(repository);
     }
     
     /**
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssisted.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssisted.java
new file mode 100644
index 00000000000..c92e4387560
--- /dev/null
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssisted.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.metadata.persist.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+
+/**
+ * Listener assisted.
+ */
+@AllArgsConstructor
+@NoArgsConstructor
+@Getter
+@Setter
+public final class ListenerAssisted implements YamlConfiguration {
+    
+    private String databaseName;
+    
+    private ListenerAssistedType listenerAssistedType;
+}
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssistedType.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssistedType.java
new file mode 100644
index 00000000000..efa42e900e7
--- /dev/null
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/pojo/ListenerAssistedType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.metadata.persist.pojo;
+
+/**
+ * Listener assisted type.
+ */
+public enum ListenerAssistedType {
+    CREATE_DATABASE,
+    
+    DROP_DATABASE
+}
diff --git 
a/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/database/ListenerAssistedPersistService.java
 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/database/ListenerAssistedPersistService.java
new file mode 100644
index 00000000000..9b24ccff437
--- /dev/null
+++ 
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/service/database/ListenerAssistedPersistService.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.metadata.persist.service.database;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssisted;
+import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath;
+import org.apache.shardingsphere.mode.spi.PersistRepository;
+
+/**
+ * Listener assisted persist service.
+ */
+@RequiredArgsConstructor
+public final class ListenerAssistedPersistService {
+    
+    private final PersistRepository repository;
+    
+    /**
+     * Persist database name listener assisted.
+     *
+     * @param listenerAssisted listener assisted pojo
+     */
+    public void persistDatabaseNameListenerAssisted(final ListenerAssisted 
listenerAssisted) {
+        
repository.persistEphemeral(ListenerAssistedNodePath.getDatabaseNameNodePath(listenerAssisted.getDatabaseName()),
 YamlEngine.marshal(listenerAssisted));
+    }
+    
+    /**
+     * Delete database name listener assisted.
+     *
+     * @param databaseName database name
+     */
+    public void deleteDatabaseNameListenerAssisted(final String databaseName) {
+        
repository.delete(ListenerAssistedNodePath.getDatabaseNameNodePath(databaseName));
+    }
+}
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/CreateDatabaseListenerAssistedEvent.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/CreateDatabaseListenerAssistedEvent.java
new file mode 100644
index 00000000000..9a955d6b4f9
--- /dev/null
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/CreateDatabaseListenerAssistedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.assisted;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+
+/**
+ * Create database listener assisted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class CreateDatabaseListenerAssistedEvent implements 
GovernanceEvent {
+    
+    private final String databaseName;
+}
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/DropDatabaseListenerAssistedEvent.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/DropDatabaseListenerAssistedEvent.java
new file mode 100644
index 00000000000..662d9b2a8ae
--- /dev/null
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/event/assisted/DropDatabaseListenerAssistedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.event.assisted;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+
+/**
+ * Drop database listener assisted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class DropDatabaseListenerAssistedEvent implements 
GovernanceEvent {
+    
+    private final String databaseName;
+}
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
new file mode 100644
index 00000000000..b32bdf7af90
--- /dev/null
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePath.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.path;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Listener assisted node path.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ListenerAssistedNodePath {
+    
+    private static final String ROOT_NODE = "listener_assisted";
+    
+    /**
+     * Get root node path.
+     *
+     * @return root node path
+     */
+    public static String getRootNodePath() {
+        return String.join("/", "", ROOT_NODE);
+    }
+    
+    /**
+     * Get database name by node path.
+     *
+     * @param nodePath node path
+     * @return database name
+     */
+    public static Optional<String> getDatabaseName(final String nodePath) {
+        Pattern pattern = Pattern.compile(getRootNodePath() + "/(\\w+)$", 
Pattern.CASE_INSENSITIVE);
+        Matcher matcher = pattern.matcher(nodePath);
+        return matcher.find() ? Optional.of(matcher.group(1)) : 
Optional.empty();
+    }
+    
+    /**
+     * Get database base name node path.
+     *
+     * @param databaseName database name
+     * @return database name node path
+     */
+    public static String getDatabaseNameNodePath(final String databaseName) {
+        return String.join("/", "", ROOT_NODE, databaseName);
+    }
+}
diff --git 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java
 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java
index d77b3f6f118..b6d1c0cb1e0 100644
--- 
a/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java
+++ 
b/mode/api/src/main/java/org/apache/shardingsphere/mode/spi/PersistRepository.java
@@ -63,6 +63,15 @@ public interface PersistRepository extends TypedSPI {
      */
     void persist(String key, String value);
     
+    /**
+     * Persist ephemeral data.
+     *
+     * @param key key of data
+     * @param value value of data
+     */
+    default void persistEphemeral(String key, String value) {
+    }
+    
     /**
      * Update data.
      *
diff --git 
a/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
 
b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
new file mode 100644
index 00000000000..e21cc563544
--- /dev/null
+++ 
b/mode/api/src/test/java/org/apache/shardingsphere/mode/path/ListenerAssistedNodePathTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.path;
+
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ListenerAssistedNodePathTest {
+    
+    @Test
+    void assertRooPath() {
+        assertThat(ListenerAssistedNodePath.getRootNodePath(), 
is("/listener_assisted"));
+    }
+    
+    @Test
+    void assertGetDatabaseName() {
+        Optional<String> actual = 
ListenerAssistedNodePath.getDatabaseName("/listener_assisted/foo_db");
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), Matchers.is("foo_db"));
+    }
+    
+    @Test
+    void assertGetDatabaseNameNodePath() {
+        assertThat(ListenerAssistedNodePath.getDatabaseNameNodePath("foo_db"), 
is("/listener_assisted/foo_db"));
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 16966cf98fc..3dffcdfbfc6 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -21,15 +21,19 @@ import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
 import org.apache.shardingsphere.infra.instance.InstanceContext;
 import org.apache.shardingsphere.infra.instance.InstanceContextAware;
+import 
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import org.apache.shardingsphere.mode.lock.GlobalLockContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerAware;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.DataChangedEventListenerManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedListener;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberFacade;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -39,6 +43,7 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;
 
 import java.sql.SQLException;
+import java.util.Collection;
 
 /**
  * Cluster context manager builder.
@@ -48,7 +53,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
     @Override
     public ContextManager build(final ContextManagerBuilderParameter param) 
throws SQLException {
         ClusterPersistRepository repository = 
getClusterPersistRepository((ClusterPersistRepositoryConfiguration) 
param.getModeConfiguration().getRepository());
-        RegistryCenter registryCenter = new RegistryCenter(repository, new 
EventBusContext(), param.getInstanceMetaData(), param.getDatabaseConfigs());
+        RegistryCenter registryCenter = new RegistryCenter(repository, new 
EventBusContext());
         InstanceContext instanceContext = buildInstanceContext(registryCenter, 
param);
         if (registryCenter.getRepository() instanceof InstanceContextAware) {
             ((InstanceContextAware) 
registryCenter.getRepository()).setInstanceContext(instanceContext);
@@ -57,7 +62,7 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         MetaDataContexts metaDataContexts = 
MetaDataContextsFactory.create(persistService, param, instanceContext, 
registryCenter.getStorageNodeStatusService().loadStorageNodes());
         ContextManager result = new ContextManager(metaDataContexts, 
instanceContext);
         setContextManagerAware(result);
-        registerOnline(registryCenter, param, result);
+        registerOnline(registryCenter, param, persistService, result);
         return result;
     }
     
@@ -77,8 +82,9 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         ((ContextManagerAware) 
contextManager.getInstanceContext().getModeContextManager()).setContextManagerAware(contextManager);
     }
     
-    private void registerOnline(final RegistryCenter registryCenter, final 
ContextManagerBuilderParameter param, final ContextManager contextManager) {
+    private void registerOnline(final RegistryCenter registryCenter, final 
ContextManagerBuilderParameter param, final MetaDataPersistService 
persistService, final ContextManager contextManager) {
         
registryCenter.onlineInstance(contextManager.getInstanceContext().getInstance());
+        watchDatabaseMetaDataListener(param, persistService, 
contextManager.getInstanceContext().getEventBusContext());
         loadClusterStatus(registryCenter, contextManager);
         
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
         
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
@@ -86,6 +92,16 @@ public final class ClusterContextManagerBuilder implements 
ContextManagerBuilder
         new ContextManagerSubscriberFacade(registryCenter, contextManager);
     }
     
+    private void watchDatabaseMetaDataListener(final 
ContextManagerBuilderParameter param, final MetaDataPersistService 
metaDataPersistService, final EventBusContext eventBusContext) {
+        getDatabaseNames(param, metaDataPersistService).forEach(each ->
+                new DataChangedEventListenerManager((ClusterPersistRepository) 
metaDataPersistService.getRepository())
+                        
.addListener(DatabaseMetaDataNode.getDatabaseNamePath(each), new 
MetaDataChangedListener(eventBusContext)));
+    }
+    
+    private Collection<String> getDatabaseNames(final 
ContextManagerBuilderParameter param, final MetaDataPersistService 
metaDataPersistService) {
+        return param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? 
param.getDatabaseConfigs().keySet() : 
metaDataPersistService.getDatabaseMetaDataService().loadAllDatabaseNames();
+    }
+    
     private void loadClusterStatus(final RegistryCenter registryCenter, final 
ContextManager contextManager) {
         registryCenter.persistClusterState(contextManager);
         
contextManager.updateClusterState(registryCenter.getClusterStatusService().loadClusterStatus());
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java
index 05d76eee5a0..e338944c2c5 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterModeContextManager.java
@@ -26,6 +26,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp
 import 
org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaMetaDataPOJO;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.pojo.AlterSchemaPOJO;
 import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
+import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssisted;
+import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssistedType;
 import 
org.apache.shardingsphere.metadata.persist.service.config.database.DatabaseBasedPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.config.global.GlobalPersistService;
 import 
org.apache.shardingsphere.metadata.persist.service.database.DatabaseMetaDataBasedPersistService;
@@ -50,10 +52,15 @@ public final class ClusterModeContextManager implements 
ModeContextManager, Cont
     @Override
     public void createDatabase(final String databaseName) {
         
contextManager.getMetaDataContexts().getPersistService().getDatabaseMetaDataService().addDatabase(databaseName);
+        
contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService()
+                .persistDatabaseNameListenerAssisted(new 
ListenerAssisted(databaseName, ListenerAssistedType.CREATE_DATABASE));
     }
     
     @Override
     public void dropDatabase(final String databaseName) {
+        String droppedDatabaseName = 
contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName).getName();
+        
contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService()
+                .persistDatabaseNameListenerAssisted(new 
ListenerAssisted(droppedDatabaseName, ListenerAssistedType.DROP_DATABASE));
         
contextManager.getMetaDataContexts().getPersistService().getDatabaseMetaDataService().dropDatabase(databaseName);
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
index c77239724a2..be7530f6375 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java
@@ -19,10 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator;
 
 import com.google.common.base.Strings;
 import lombok.Getter;
-import org.apache.shardingsphere.infra.config.database.DatabaseConfiguration;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
-import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
-import 
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
 import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -40,7 +37,6 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
 import 
org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
 
-import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -66,23 +62,16 @@ public final class RegistryCenter {
     @Getter
     private final EventBusContext eventBusContext;
     
-    private final InstanceMetaData instanceMetaData;
-    
-    private final Map<String, DatabaseConfiguration> databaseConfigs;
-    
     private final GovernanceWatcherFactory listenerFactory;
     
-    public RegistryCenter(final ClusterPersistRepository repository, final 
EventBusContext eventBusContext,
-                          final InstanceMetaData instanceMetaData, final 
Map<String, DatabaseConfiguration> databaseConfigs) {
+    public RegistryCenter(final ClusterPersistRepository repository, final 
EventBusContext eventBusContext) {
         this.repository = repository;
         this.eventBusContext = eventBusContext;
-        this.instanceMetaData = instanceMetaData;
-        this.databaseConfigs = databaseConfigs;
         storageNodeStatusService = new StorageNodeStatusService(repository);
         clusterStatusService = new ClusterStatusService(repository);
         computeNodeStatusService = new ComputeNodeStatusService(repository);
         globalLockPersistService = new 
GlobalLockPersistService(initDistributedLockHolder(repository));
-        listenerFactory = new GovernanceWatcherFactory(repository, 
eventBusContext, getJDBCDatabaseName());
+        listenerFactory = new GovernanceWatcherFactory(repository, 
eventBusContext);
         createSubscribers(repository);
     }
     
@@ -91,10 +80,6 @@ public final class RegistryCenter {
         return null == distributedLockHolder ? new 
DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new 
Properties())) : distributedLockHolder;
     }
     
-    private String getJDBCDatabaseName() {
-        return instanceMetaData instanceof JDBCInstanceMetaData ? 
databaseConfigs.keySet().stream().findFirst().orElse(null) : null;
-    }
-    
     private void createSubscribers(final ClusterPersistRepository repository) {
         new ComputeNodeStatusSubscriber(this, repository);
         new ClusterStatusSubscriber(repository, eventBusContext);
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/DataChangedEventListenerManager.java
similarity index 51%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
copy to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/DataChangedEventListenerManager.java
index bde52b668ab..1b33cd6bd4f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/DataChangedEventListenerManager.java
@@ -15,43 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.listener;
 
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
-
-import java.util.Collection;
-import java.util.Optional;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
 /**
- * Governance watcher.
- * 
- * @param <T> type of event
+ * Data changed event listener manager.
  */
-@SingletonSPI
-public interface GovernanceWatcher<T> {
+@RequiredArgsConstructor
+public final class DataChangedEventListenerManager {
     
-    /**
-     * Get watching keys.
-     *
-     * @param databaseName database name
-     * @return watching keys
-     */
-    Collection<String> getWatchingKeys(String databaseName);
+    private final ClusterPersistRepository repository;
     
     /**
-     * Get watching types.
+     * Add listener.
      *
-     * @return watching types
+     * @param listenerKey listener key
+     * @param dataChangedEventListener data changed event listener
      */
-    Collection<Type> getWatchingTypes();
+    public void addListener(final String listenerKey, final 
DataChangedEventListener dataChangedEventListener) {
+        repository.watch(listenerKey, dataChangedEventListener);
+    }
     
     /**
-     * Create governance event.
-     * 
-     * @param event registry center data changed event
-     * @return governance event
+     * Remove listener.
+     *
+     * @param listenerKey listener key
      */
-    Optional<T> createGovernanceEvent(DataChangedEvent event);
+    public void removeListener(final String listenerKey) {
+        repository.removeDataListener(listenerKey);
+    }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
similarity index 53%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
copy to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
index 7de3c72c5cf..ffa6d2fd414 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/watch/ListenerAssistedChangedWatcher.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher;
+package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch;
 
-import com.google.common.base.Strings;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
-import org.apache.shardingsphere.mode.storage.node.StorageNode;
-import org.apache.shardingsphere.mode.storage.yaml.YamlStorageNodeDataSource;
-import 
org.apache.shardingsphere.mode.storage.yaml.YamlStorageNodeDataSourceSwapper;
-import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSource;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssisted;
+import org.apache.shardingsphere.metadata.persist.pojo.ListenerAssistedType;
 import org.apache.shardingsphere.mode.event.DataChangedEvent;
 import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
+import 
org.apache.shardingsphere.mode.event.assisted.CreateDatabaseListenerAssistedEvent;
+import 
org.apache.shardingsphere.mode.event.assisted.DropDatabaseListenerAssistedEvent;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
+import org.apache.shardingsphere.mode.path.ListenerAssistedNodePath;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -36,31 +34,32 @@ import java.util.Collections;
 import java.util.Optional;
 
 /**
- * Storage node state changed watcher.
+ * Listener assisted changed watcher.
  */
-public final class StorageNodeStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
+public final class ListenerAssistedChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
-        return Collections.singletonList(StorageNode.getRootPath());
+    public Collection<String> getWatchingKeys() {
+        return 
Collections.singleton(ListenerAssistedNodePath.getRootNodePath());
     }
     
     @Override
-    public Collection<Type> getWatchingTypes() {
+    public Collection<DataChangedEvent.Type> getWatchingTypes() {
         return Arrays.asList(Type.ADDED, Type.UPDATED);
     }
     
     @Override
     public Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
-        if (Strings.isNullOrEmpty(event.getValue())) {
+        Optional<String> databaseName = 
ListenerAssistedNodePath.getDatabaseName(event.getKey());
+        if (!databaseName.isPresent()) {
             return Optional.empty();
         }
-        Optional<QualifiedDatabase> qualifiedDatabase = 
StorageNode.extractQualifiedDatabase(event.getKey());
-        if (qualifiedDatabase.isPresent()) {
-            QualifiedDatabase database = qualifiedDatabase.get();
-            StorageNodeDataSource storageNodeDataSource = new 
YamlStorageNodeDataSourceSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(),
 YamlStorageNodeDataSource.class));
-            return Optional.of(new StorageNodeChangedEvent(database, 
storageNodeDataSource));
+        ListenerAssisted data = YamlEngine.unmarshal(event.getValue(), 
ListenerAssisted.class);
+        if (ListenerAssistedType.CREATE_DATABASE == 
data.getListenerAssistedType()) {
+            return Optional.of(new 
CreateDatabaseListenerAssistedEvent(databaseName.get()));
         }
-        return Optional.empty();
+        return ListenerAssistedType.DROP_DATABASE == 
data.getListenerAssistedType()
+                ? Optional.of(new 
DropDatabaseListenerAssistedEvent(databaseName.get()))
+                : Optional.empty();
     }
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
index bde52b668ab..efb25e49d86 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcher.java
@@ -35,10 +35,9 @@ public interface GovernanceWatcher<T> {
     /**
      * Get watching keys.
      *
-     * @param databaseName database name
      * @return watching keys
      */
-    Collection<String> getWatchingKeys(String databaseName);
+    Collection<String> getWatchingKeys();
     
     /**
      * Get watching types.
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
index 6b235b3a6c1..9e84cfa7b74 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java
@@ -32,8 +32,6 @@ public final class GovernanceWatcherFactory {
     
     private final EventBusContext eventBusContext;
     
-    private final String databaseName;
-    
     /**
      * Watch listeners.
      */
@@ -44,7 +42,7 @@ public final class GovernanceWatcherFactory {
     }
     
     private void watch(final GovernanceWatcher<?> listener) {
-        for (String each : listener.getWatchingKeys(databaseName)) {
+        for (String each : listener.getWatchingKeys()) {
             watch(each, listener);
         }
     }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java
index 7ddf0605718..b6b4c0e4931 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/GlobalRuleChangedWatcher.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 public final class GlobalRuleChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(GlobalNode.getGlobalRuleRootNode());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java
index 0bef3501fdf..a0926f82599 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/config/watcher/PropertiesChangedWatcher.java
@@ -35,7 +35,7 @@ import java.util.Optional;
 public final class PropertiesChangedWatcher implements 
GovernanceWatcher<AlterPropertiesEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(GlobalNode.getPropsRootNode());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
index 04d6a87f9eb..69566a166fe 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/data/ShardingSphereDataChangedWatcher.java
@@ -45,7 +45,7 @@ import java.util.Optional;
 public final class ShardingSphereDataChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
+    public Collection<String> getWatchingKeys() {
         return 
Collections.singleton(ShardingSphereDataNode.getShardingSphereDataNodePath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedListener.java
similarity index 92%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
rename to 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedListener.java
index 0901b698fc2..0e0899faeae 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedListener.java
@@ -18,7 +18,9 @@
 package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher;
 
 import com.google.common.base.Preconditions;
+import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
+import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
 import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
 import 
org.apache.shardingsphere.metadata.persist.node.metadata.DataSourceMetaDataNode;
 import 
org.apache.shardingsphere.metadata.persist.node.metadata.TableMetaDataNode;
@@ -35,38 +37,31 @@ import 
org.apache.shardingsphere.mode.event.schema.table.AlterTableEvent;
 import org.apache.shardingsphere.mode.event.schema.table.DropTableEvent;
 import org.apache.shardingsphere.mode.event.schema.view.AlterViewEvent;
 import org.apache.shardingsphere.mode.event.schema.view.DropViewEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
 import 
org.apache.shardingsphere.mode.metadata.builder.RuleConfigurationEventBuilder;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Optional;
 
 /**
  * Meta data changed watcher.
  */
-public final class MetaDataChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
+@RequiredArgsConstructor
+public final class MetaDataChangedListener implements DataChangedEventListener 
{
     
-    private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder 
= new RuleConfigurationEventBuilder();
+    private final EventBusContext eventBusContext;
     
-    @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
-        return null == databaseName ? 
Collections.singleton(DatabaseMetaDataNode.getMetaDataNode())
-                : 
Collections.singleton(DatabaseMetaDataNode.getDatabaseNamePath(databaseName));
-    }
+    private final RuleConfigurationEventBuilder ruleConfigurationEventBuilder 
= new RuleConfigurationEventBuilder();
     
     @Override
-    public Collection<Type> getWatchingTypes() {
-        return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
+    public void onChange(final DataChangedEvent event) {
+        createGovernanceEvent(event).ifPresent(eventBusContext::post);
     }
     
-    @Override
-    public Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
+    private Optional<GovernanceEvent> createGovernanceEvent(final 
DataChangedEvent event) {
         String key = event.getKey();
         Optional<String> databaseName = 
DatabaseMetaDataNode.getDatabaseName(key);
         if (databaseName.isPresent()) {
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
index 5d60abb8f4b..13ad1caaf0f 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/watcher/ClusterStateChangedWatcher.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 public final class ClusterStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(ComputeNode.getClusterStatusNodePath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
index 57b3e7c7471..4e0413263e0 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/watcher/ComputeNodeStateChangedWatcher.java
@@ -52,7 +52,7 @@ import java.util.regex.Pattern;
 public final class ComputeNodeStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singleton(ComputeNode.getComputeNodePath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
index 7de3c72c5cf..c3c25e1552c 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/watcher/StorageNodeStateChangedWatcher.java
@@ -41,7 +41,7 @@ import java.util.Optional;
 public final class StorageNodeStateChangedWatcher implements 
GovernanceWatcher<GovernanceEvent> {
     
     @Override
-    public Collection<String> getWatchingKeys(final String databaseName) {
+    public Collection<String> getWatchingKeys() {
         return Collections.singletonList(StorageNode.getRootPath());
     }
     
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
index 53b06867127..a5cbebbf42b 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberFacade.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.proce
 public final class ContextManagerSubscriberFacade {
     
     public ContextManagerSubscriberFacade(final RegistryCenter registryCenter, 
final ContextManager contextManager) {
+        new ListenerAssistedSubscriber(contextManager);
         new ConfigurationChangedSubscriber(contextManager);
         new ResourceMetaDataChangedSubscriber(contextManager);
         new DatabaseChangedSubscriber(contextManager);
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedSubscriber.java
new file mode 100644
index 00000000000..20d4ec5aae4
--- /dev/null
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ListenerAssistedSubscriber.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
+import 
org.apache.shardingsphere.mode.event.assisted.CreateDatabaseListenerAssistedEvent;
+import 
org.apache.shardingsphere.mode.event.assisted.DropDatabaseListenerAssistedEvent;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.DataChangedEventListenerManager;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedListener;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+
+/**
+ * Listener assisted subscriber.
+ */
+@RequiredArgsConstructor
+public final class ListenerAssistedSubscriber {
+    
+    private final ContextManager contextManager;
+    
+    private final DataChangedEventListenerManager listenerManager;
+    
+    public ListenerAssistedSubscriber(final ContextManager contextManager) {
+        this.contextManager = contextManager;
+        
contextManager.getInstanceContext().getEventBusContext().register(this);
+        listenerManager = new 
DataChangedEventListenerManager((ClusterPersistRepository) 
contextManager.getMetaDataContexts().getPersistService().getRepository());
+    }
+    
+    /**
+     * Renew to persist meta data.
+     *
+     * @param event database added event
+     */
+    @Subscribe
+    public synchronized void renew(final CreateDatabaseListenerAssistedEvent 
event) {
+        
listenerManager.addListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()),
+                new 
MetaDataChangedListener(contextManager.getInstanceContext().getEventBusContext()));
+        
contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+    }
+    
+    /**
+     * Renew to delete database.
+     *
+     * @param event database delete event
+     */
+    @Subscribe
+    public synchronized void renew(final DropDatabaseListenerAssistedEvent 
event) {
+        
listenerManager.removeListener(DatabaseMetaDataNode.getDatabaseNamePath(event.getDatabaseName()));
+        
contextManager.getMetaDataContexts().getMetaData().dropDatabase(event.getDatabaseName());
+        
contextManager.getMetaDataContexts().getPersistService().getListenerAssistedPersistService().deleteDatabaseNameListenerAssisted(event.getDatabaseName());
+    }
+}
diff --git 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
index 9c73806ae28..817b28927a9 100644
--- 
a/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
+++ 
b/mode/type/cluster/core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher
@@ -19,6 +19,6 @@ 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.stora
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.watcher.ComputeNodeStateChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.watcher.ClusterStateChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.ShardingSphereDataChangedWatcher
-org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.watcher.MetaDataChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.GlobalRuleChangedWatcher
 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.watcher.PropertiesChangedWatcher
+org.apache.shardingsphere.mode.manager.cluster.coordinator.listener.watch.ListenerAssistedChangedWatcher
\ No newline at end of file
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 149a82771a5..839cae9a53a 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -75,6 +75,10 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    }
+    
     @Override
     public void close() {
     }
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index e2514ab760d..2e3be58780c 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -82,6 +82,10 @@ public final class 
ProcessListClusterPersistRepositoryFixture implements Cluster
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    }
+    
     @Override
     public void close() {
         REGISTRY_DATA.clear();
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
index 2687874fcd0..bcea8ea4a06 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ProcessListChangedSubscriberTest.java
@@ -84,7 +84,7 @@ class ProcessListChangedSubscriberTest {
         contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
                 
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
                 new ConfigurationProperties(new Properties()))));
-        registryCenter = new 
RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), 
mock(ProxyInstanceMetaData.class), null);
+        registryCenter = new 
RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext());
         subscriber = new ProcessListChangedSubscriber(registryCenter, 
contextManager);
     }
     
diff --git 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
index 2c58feddbc7..7d21016d49b 100644
--- 
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
+++ 
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/StateChangedSubscriberTest.java
@@ -93,7 +93,7 @@ class StateChangedSubscriberTest {
         contextManager.renewMetaDataContexts(new 
MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new 
ShardingSphereMetaData(createDatabases(),
                 
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
                 new ConfigurationProperties(new Properties()))));
-        subscriber = new StateChangedSubscriber(new 
RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), 
mock(ProxyInstanceMetaData.class), null), contextManager);
+        subscriber = new StateChangedSubscriber(new 
RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext()), 
contextManager);
     }
     
     private ContextManagerBuilderParameter 
createContextManagerBuilderParameter() {
diff --git 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index cb921c52448..c69988d7844 100644
--- 
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++ 
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -63,4 +63,11 @@ public interface ClusterPersistRepository extends 
PersistRepository {
      * @param listener data changed event listener
      */
     void watch(String key, DataChangedEventListener listener);
+    
+    /**
+     * Remove listener by key.
+     *
+     * @param key key to be removed
+     */
+    void removeDataListener(String key);
 }
diff --git 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 18bf3d24746..d713ae5d255 100644
--- 
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++ 
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -166,6 +166,10 @@ public final class EtcdRepository implements 
ClusterPersistRepository {
                 
WatchOption.newBuilder().withRange(OptionsUtil.prefixEndOf(prefix)).build(), 
listener);
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    }
+    
     private Type getEventChangedType(final WatchEvent event) {
         if (1 == event.getKeyValue().getVersion()) {
             return Type.ADDED;
diff --git 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 44845d3f89a..5f4d8b6f1e7 100644
--- 
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++ 
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -52,6 +52,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
@@ -62,6 +63,8 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository, Inst
     
     private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
     
+    private final Map<String, CuratorCacheListener> dataListeners = new 
ConcurrentHashMap<>();
+    
     private final Builder builder = CuratorFrameworkFactory.builder();
     
     private CuratorFramework client;
@@ -233,6 +236,9 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository, Inst
     
     @Override
     public void watch(final String key, final DataChangedEventListener 
listener) {
+        if (null != dataListeners.get(key)) {
+            return;
+        }
         CuratorCache cache = caches.get(key);
         if (null == cache) {
             cache = CuratorCache.build(client, key);
@@ -249,6 +255,16 @@ public final class ZookeeperRepository implements 
ClusterPersistRepository, Inst
                 }).build();
         cache.listenable().addListener(curatorCacheListener);
         cache.start();
+        dataListeners.computeIfAbsent(key, curator -> curatorCacheListener);
+    }
+    
+    @Override
+    public void removeDataListener(final String key) {
+        CuratorCacheListener cacheListener = dataListeners.remove(key);
+        if (null == cacheListener) {
+            return;
+        }
+        Optional.ofNullable(caches.remove(key)).ifPresent(optional -> 
optional.listenable().removeListener(cacheListener));
     }
     
     private Type getChangedType(final TreeCacheEvent.Type type) {
diff --git 
a/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java
 
b/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java
index 908523ca816..0b81452b968 100644
--- 
a/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java
+++ 
b/parser/distsql/engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/kernel/KernelDistSQLStatementVisitor.java
@@ -152,7 +152,8 @@ public final class KernelDistSQLStatementVisitor extends 
KernelDistSQLStatementB
         String password = null == connectionCtx.password() ? "" : 
getPassword(connectionCtx.password());
         return null == connectionCtx.urlSource()
                 ? new 
HostnameAndPortBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()),
-                        
getIdentifierValue(connectionCtx.simpleSource().hostname()), 
connectionCtx.simpleSource().port().getText(), 
getIdentifierValue(connectionCtx.simpleSource().dbName()), user, password, 
props)
+                        
getIdentifierValue(connectionCtx.simpleSource().hostname()), 
connectionCtx.simpleSource().port().getText(),
+                getIdentifierValue(connectionCtx.simpleSource().dbName()), 
user, password, props)
                 : new 
URLBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), 
getIdentifierValue(connectionCtx.urlSource().url()), user, password, props);
     }
     
diff --git 
a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java
 
b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java
index b7e03665063..23469dfaade 100644
--- 
a/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java
+++ 
b/parser/distsql/statement/src/main/java/org/apache/shardingsphere/distsql/segment/converter/DataSourceSegmentsConverter.java
@@ -58,6 +58,7 @@ public final class DataSourceSegmentsConverter {
         }
         return result;
     }
+    
     /**
      * Convert data source segments to data source properties map.
      *
diff --git 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index 57971f2ee54..80e63be2398 100644
--- 
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++ 
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -81,6 +81,11 @@ public final class ClusterPersistRepositoryFixture 
implements ClusterPersistRepo
     public void watch(final String key, final DataChangedEventListener 
listener) {
     }
     
+    @Override
+    public void removeDataListener(final String key) {
+    
+    }
+    
     @Override
     public void close() {
         REGISTRY_DATA.clear();

Reply via email to