This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 7550751bb [type:feat] Admin sync data  (#4735)
7550751bb is described below

commit 7550751bb9c7317d14d506900adbc763085bd027
Author: 杨文杰 <[email protected]>
AuthorDate: Wed Jun 21 22:46:45 2023 +0800

    [type:feat] Admin sync data  (#4735)
    
    * admin data handler
    
    * add handler
    
    add handler
    
    * discovery sync
    
    * discovery sync
    
    * discovery sync
    
    * admin data handler
    
    * admin data handler
    
    * admin data handler
    
    * admin data handler
    
    * admin data handler
    
    * discovery sync
    
    * change zk discovery
    
    * merge 代码
    
    * Structure the code
    
    * Structure the code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * change code
    
    * trigger ci
    
    * trigger ci
    
    * trigger ci
    
    * change  org.apache.shenyu.admin.listener.DataChangedEvent
    
    ---------
    
    Co-authored-by: xiaoyu <[email protected]>
    Co-authored-by: moremind <[email protected]>
---
 shenyu-admin/pom.xml                               |   6 +
 .../apache/shenyu/admin/ShenyuAdminBootstrap.java  |   1 +
 .../admin/config/DiscoveryConfiguration.java       |  73 +++++++
 .../admin/discovery/DefaultDiscoveryProcessor.java | 166 +++++++++++++++
 .../DiscoveryDataChangedEventSyncListener.java     | 142 +++++++++++++
 .../shenyu/admin/discovery/DiscoveryMode.java      |  18 +-
 .../shenyu/admin/discovery/DiscoveryProcessor.java | 101 +++++++++
 .../admin/discovery/DiscoveryProcessorHolder.java  |  41 ++--
 .../admin/discovery/LocalDiscoveryProcessor.java   |  77 +++++++
 .../parse/CustomDiscoveryUpstreamParser.java       | 118 +++++++++++
 .../admin/discovery/parse/KeyValueParser.java      |  27 ++-
 .../listener/AbstractListDataChangedListener.java  |  43 ++--
 .../listener/AbstractNodeDataChangedListener.java  |  21 +-
 .../admin/listener/DataChangedEventDispatcher.java |   7 +-
 .../shenyu/admin/listener/DataChangedListener.java |   6 +-
 .../admin/mapper/DiscoveryUpstreamMapper.java      |  18 ++
 .../shenyu/admin/mapper/ProxySelectorMapper.java   |   8 +
 .../admin/model/dto/DiscoveryHandlerDTO.java       | 121 +++++++++++
 .../shenyu/admin/transfer/DiscoveryTransfer.java   |  71 +++++++
 shenyu-admin/src/main/resources/application.yml    |   8 +-
 .../mappers/discovery-upstream-sqlmap.xml          |  23 ++
 .../resources/mappers/proxy-selector-sqlmap.xml    |   7 +
 .../src/main/resources/sql-script/h2/schema.sql    |  29 ++-
 shenyu-bootstrap/pom.xml                           |  12 +-
 .../common/constant/DefaultPathConstants.java      |  15 +-
 .../shenyu/common/dto/DiscoverySyncData.java       |  64 ++++++
 .../shenyu/common/dto/DiscoveryUpstreamData.java   | 232 +++++++++++++++++++++
 .../shenyu/common/dto/ProxySelectorData.java       |  23 --
 .../shenyu/common/enums/ConfigGroupEnum.java       |   5 +-
 .../discovery/api/ShenyuDiscoveryService.java      |  12 ++
 .../api/listener/DataChangedEventListener.java     |   2 +-
 ...edEvent.java => DiscoveryDataChangedEvent.java} |   4 +-
 .../zookeeper/ZookeeperDiscoveryService.java       | 100 +++++----
 .../cache/CommonProxySelectorDataSubscriber.java   |   4 +-
 .../base/handler/ProxySelectorDataHandler.java     |   4 +-
 .../plugin/tcp/handler/TcpUpstreamDataHandler.java |   8 +-
 .../shenyu/protocol/tcp/BootstrapServer.java       |   4 +-
 .../shenyu/protocol/tcp/TcpBootstrapServer.java    |   5 +-
 .../shenyu/protocol/tcp/UpstreamProvider.java      |  27 ++-
 .../tcp/connection/ActivityConnectionObserver.java |   8 +-
 .../DefaultConnectionConfigProvider.java           |  12 +-
 .../sync/data/api/ProxySelectorDataSubscriber.java |   6 +-
 .../shenyu/sync/data/apollo/ApolloDataService.java |  18 +-
 .../sync/data/nacos/handler/NacosCacheHandler.java |  10 +-
 .../data/zookeeper/ZookeeperSyncDataService.java   |  20 +-
 45 files changed, 1507 insertions(+), 220 deletions(-)

diff --git a/shenyu-admin/pom.xml b/shenyu-admin/pom.xml
index ff267bbda..e1def0f43 100644
--- a/shenyu-admin/pom.xml
+++ b/shenyu-admin/pom.xml
@@ -280,6 +280,12 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-discovery-zookeeper</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
index 568e3e4ed..9bd502620 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
@@ -35,4 +35,5 @@ public class ShenyuAdminBootstrap {
     public static void main(final String[] args) {
         SpringApplication.run(ShenyuAdminBootstrap.class, args);
     }
+
 }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/config/DiscoveryConfiguration.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/config/DiscoveryConfiguration.java
new file mode 100644
index 000000000..a90b0d6e5
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/config/DiscoveryConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.config;
+
+import org.apache.shenyu.admin.discovery.DefaultDiscoveryProcessor;
+import org.apache.shenyu.admin.discovery.DiscoveryProcessor;
+import org.apache.shenyu.admin.discovery.DiscoveryProcessorHolder;
+import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
+import org.apache.shenyu.admin.mapper.ProxySelectorMapper;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * DiscoveryConfiguration.
+ */
+@Configuration
+public class DiscoveryConfiguration {
+
+    /**
+     * discoveryProcessor.
+     *
+     * @param discoveryUpstreamMapper discoveryUpstreamMapper
+     * @param proxySelectorMapper     proxySelectorMapper
+     * @return DiscoveryProcessor
+     */
+    @Bean("DefaultDiscoveryProcessor")
+    public DiscoveryProcessor discoveryDefaultProcessor(final 
DiscoveryUpstreamMapper discoveryUpstreamMapper, final ProxySelectorMapper 
proxySelectorMapper) {
+        return new DefaultDiscoveryProcessor(discoveryUpstreamMapper, 
proxySelectorMapper);
+    }
+
+    /**
+     * discoveryLocalProcessor.
+     *
+     * @param discoveryUpstreamMapper discoveryUpstreamMapper
+     * @param proxySelectorMapper     proxySelectorMapper
+     * @return LocalDiscoveryProcessor
+     */
+    @Bean("LocalDiscoveryProcessor")
+    public DiscoveryProcessor discoveryLocalProcessor(final 
DiscoveryUpstreamMapper discoveryUpstreamMapper, final ProxySelectorMapper 
proxySelectorMapper) {
+        return new DefaultDiscoveryProcessor(discoveryUpstreamMapper, 
proxySelectorMapper);
+    }
+
+    /**
+     * discoveryProcessorHolder.
+     *
+     * @param defaultDiscoveryProcessor defaultDiscoveryProcessor
+     * @param localDiscoveryProcessor   localDiscoveryProcessor
+     * @return DiscoveryProcessorHolder
+     */
+    @Bean
+    public DiscoveryProcessorHolder 
discoveryProcessorHolder(@Qualifier("DefaultDiscoveryProcessor") final 
DiscoveryProcessor defaultDiscoveryProcessor,
+                                                             
@Qualifier("LocalDiscoveryProcessor") final DiscoveryProcessor 
localDiscoveryProcessor
+    ) {
+        return new DiscoveryProcessorHolder(defaultDiscoveryProcessor, 
localDiscoveryProcessor);
+    }
+
+}
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
new file mode 100644
index 000000000..4fbd5d470
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.admin.discovery.parse.CustomDiscoveryUpstreamParser;
+import org.apache.shenyu.admin.listener.DataChangedEvent;
+import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
+import org.apache.shenyu.admin.mapper.ProxySelectorMapper;
+import org.apache.shenyu.admin.model.dto.DiscoveryHandlerDTO;
+import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
+import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
+import org.apache.shenyu.admin.model.entity.DiscoveryDO;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.enums.DataEventTypeEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
+import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+import org.apache.shenyu.spi.ExtensionLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DefaultDiscoveryProcessor.
+ */
+public class DefaultDiscoveryProcessor implements DiscoveryProcessor, 
ApplicationEventPublisherAware {
+
+    private static final String KEY_TEMPLATE = "%s/%s/%s/%s";
+
+    private static final String DEFAULT_LISTENER_NODE = "/shenyu/discovery";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDiscoveryProcessor.class);
+
+    private final Map<String, ShenyuDiscoveryService> discoveryServiceCache;
+
+    private ApplicationEventPublisher eventPublisher;
+
+    private final DiscoveryUpstreamMapper discoveryUpstreamMapper;
+
+    private final ProxySelectorMapper proxySelectorMapper;
+
+    /**
+     * DefaultDiscoveryProcessor.
+     *
+     * @param discoveryUpstreamMapper discoveryUpstreamMapper
+     * @param proxySelectorMapper     proxySelectorMapper
+     */
+    public DefaultDiscoveryProcessor(final DiscoveryUpstreamMapper 
discoveryUpstreamMapper, final ProxySelectorMapper proxySelectorMapper) {
+        this.discoveryUpstreamMapper = discoveryUpstreamMapper;
+        this.proxySelectorMapper = proxySelectorMapper;
+        this.discoveryServiceCache = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public void createDiscovery(final DiscoveryDO discoveryDO) {
+        String type = discoveryDO.getType();
+        ShenyuDiscoveryService discoveryService = 
ExtensionLoader.getExtensionLoader(ShenyuDiscoveryService.class).getJoin(type);
+        String props = discoveryDO.getProps();
+        DiscoveryConfig discoveryConfig = GsonUtils.getGson().fromJson(props, 
DiscoveryConfig.class);
+        discoveryConfig.setServerList(discoveryDO.getServiceList());
+        discoveryService.init(discoveryConfig);
+        discoveryServiceCache.put(discoveryDO.getId(), discoveryService);
+    }
+
+    @Override
+    public void createProxySelector(final DiscoveryHandlerDTO 
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+        ShenyuDiscoveryService shenyuDiscoveryService = 
discoveryServiceCache.get(discoveryHandlerDTO.getDiscoveryId());
+        if (Objects.isNull(shenyuDiscoveryService)) {
+            LOG.warn("before start ProxySelector you need init 
DiscoveryId={}", discoveryHandlerDTO.getDiscoveryId());
+            return;
+        }
+        String key = buildProxySelectorKey(discoveryHandlerDTO, 
proxySelectorDTO);
+        if (StringUtils.isEmpty(shenyuDiscoveryService.getData(key))) {
+            LOG.info("shenyu discovery {} is empty need register it ", key);
+            shenyuDiscoveryService.register(key, 
GsonUtils.getInstance().toJson(proxySelectorDTO));
+        }
+        shenyuDiscoveryService.watcher(key, 
getDiscoveryDataChangedEventListener(proxySelectorDTO.getType(), 
discoveryHandlerDTO.getProps()));
+        DataChangedEvent dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE, 
Collections.singletonList(covert(proxySelectorDTO, null)));
+        eventPublisher.publishEvent(dataChangedEvent);
+    }
+
+    /**
+     * removeDiscovery by ShenyuDiscoveryService#shutdown .
+     *
+     * @param discoveryDO discoveryDO
+     */
+    @Override
+    public void removeDiscovery(final DiscoveryDO discoveryDO) {
+        ShenyuDiscoveryService shenyuDiscoveryService = 
discoveryServiceCache.get(discoveryDO.getId());
+        shenyuDiscoveryService.shutdown();
+    }
+
+    /**
+     * removeProxySelector.
+     *
+     * @param proxySelectorDTO proxySelectorDTO
+     */
+    @Override
+    public void removeProxySelector(final DiscoveryHandlerDTO 
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+        ShenyuDiscoveryService shenyuDiscoveryService = 
discoveryServiceCache.get(discoveryHandlerDTO.getDiscoveryId());
+        String key = buildProxySelectorKey(discoveryHandlerDTO, 
proxySelectorDTO);
+        shenyuDiscoveryService.unWatcher(key);
+        DataChangedEvent dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE, 
Collections.singletonList(covert(proxySelectorDTO, null)));
+        eventPublisher.publishEvent(dataChangedEvent);
+    }
+
+    @Override
+    public void changeUpstream(final DiscoveryHandlerDTO discoveryHandlerDTO, 
final ProxySelectorDTO proxySelectorDTO, final List<DiscoveryUpstreamDTO> 
upstreamDTOS) {
+        throw new NotImplementedException("shenyu discovery local mode do 
nothing in changeUpstream");
+    }
+
+    /**
+     * buildProxySelectorKey.
+     *
+     * @param discoveryHandlerDTO discoveryHandlerDTO
+     * @param proxySelectorDTO    proxySelectorDTO
+     * @return key
+     */
+    private String buildProxySelectorKey(final DiscoveryHandlerDTO 
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+        String key = 
StringUtils.isBlank(discoveryHandlerDTO.getListenerNode()) ? 
DEFAULT_LISTENER_NODE : discoveryHandlerDTO.getListenerNode();
+        return String.format(KEY_TEMPLATE, key, 
proxySelectorDTO.getPluginName(), proxySelectorDTO.getId(), 
discoveryHandlerDTO.getId());
+    }
+
+    /**
+     * getDiscoveryDataChangedEventListener.
+     *
+     * @param discoveryType discoveryType
+     * @param customProps   customProps
+     * @return DataChangedEventListener
+     */
+    private DataChangedEventListener 
getDiscoveryDataChangedEventListener(final String discoveryType, final String 
customProps) {
+        Map<String, String> customMap = 
GsonUtils.getInstance().toObjectMap(customProps, String.class);
+        return new DiscoveryDataChangedEventSyncListener(eventPublisher, 
discoveryUpstreamMapper,
+                new CustomDiscoveryUpstreamParser(customMap, 
proxySelectorMapper), 
!DiscoveryMode.LOCAL.name().equalsIgnoreCase(discoveryType));
+    }
+
+    @Override
+    public void setApplicationEventPublisher(final ApplicationEventPublisher 
eventPublisher) {
+        this.eventPublisher = eventPublisher;
+    }
+}
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryDataChangedEventSyncListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryDataChangedEventSyncListener.java
new file mode 100644
index 000000000..9e5229450
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryDataChangedEventSyncListener.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.admin.discovery.parse.KeyValueParser;
+import org.apache.shenyu.admin.listener.DataChangedEvent;
+import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
+import org.apache.shenyu.admin.model.entity.DiscoveryUpstreamDO;
+import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.enums.DataEventTypeEnum;
+import org.apache.shenyu.common.utils.UUIDUtils;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * DiscoveryHandler.
+ */
+public class DiscoveryDataChangedEventSyncListener implements 
DataChangedEventListener {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DiscoveryDataChangedEventSyncListener.class);
+
+    private final KeyValueParser keyValueParser;
+
+    private final ApplicationEventPublisher eventPublisher;
+
+    private final DiscoveryUpstreamMapper discoveryUpstreamMapper;
+
+    private final Boolean needPersistence;
+
+    public DiscoveryDataChangedEventSyncListener(final 
ApplicationEventPublisher eventPublisher,
+                                                 final DiscoveryUpstreamMapper 
discoveryUpstreamMapper,
+                                                 final KeyValueParser 
keyValueParser,
+                                                 final Boolean 
needPersistence) {
+        this.eventPublisher = eventPublisher;
+        this.keyValueParser = keyValueParser;
+        this.discoveryUpstreamMapper = discoveryUpstreamMapper;
+        this.needPersistence = needPersistence;
+    }
+
+    @Override
+    public void onChange(final DiscoveryDataChangedEvent event) {
+        DiscoveryDataChangedEvent.Event currentEvent = event.getEvent();
+        if (DiscoveryDataChangedEvent.Event.IGNORED.equals(currentEvent)) {
+            return;
+        }
+        DiscoverySyncData discoverySyncData = 
buildProxySelectorData(event.getKey(), event.getValue());
+        DataChangedEvent dataChangedEvent = null;
+        List<DiscoveryUpstreamData> upstreamDataList = 
discoverySyncData.getUpstreamDataList();
+        if (needPersistence) {
+            if (CollectionUtils.isEmpty(upstreamDataList)) {
+                LOGGER.warn("shenyu proxySelectorData#discoveryUpstreamList is 
empty");
+                return;
+            }
+            switch (currentEvent) {
+                case ADDED:
+                    upstreamDataList.forEach(d -> {
+                        d.setId(UUIDUtils.getInstance().generateShortUuid());
+                        d.setDateCreated(new 
Timestamp(System.currentTimeMillis()));
+                        d.setDateUpdated(new 
Timestamp(System.currentTimeMillis()));
+                        
discoveryUpstreamMapper.insert(DiscoveryTransfer.INSTANCE.mapToDo(d));
+                        LOGGER.info("shenyu 
[DiscoveryDataChangedEventSyncListener] ADDED Upstream {}", d.getUrl());
+                    });
+                    fillFullyDiscoverySyncData(discoverySyncData);
+                    dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE, 
Collections.singletonList(discoverySyncData));
+                    break;
+                case UPDATED:
+                    upstreamDataList.forEach(d -> {
+                        DiscoveryUpstreamDO discoveryUpstreamDO = 
DiscoveryTransfer.INSTANCE.mapToDo(d);
+                        discoveryUpstreamMapper.update(discoveryUpstreamDO);
+                        LOGGER.info("shenyu 
[DiscoveryDataChangedEventSyncListener] UPDATE Upstream {}", 
discoveryUpstreamDO.getUrl());
+                    });
+                    fillFullyDiscoverySyncData(discoverySyncData);
+                    dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.UPDATE, 
Collections.singletonList(discoverySyncData));
+                    break;
+                case DELETED:
+                    if (CollectionUtils.isNotEmpty(upstreamDataList)) {
+                        upstreamDataList.forEach(up -> {
+                            discoveryUpstreamMapper.deleteByUrl(up.getUrl());
+                            LOGGER.info("shenyu 
[DiscoveryDataChangedEventSyncListener] DELETE Upstream {}", up.getUrl());
+                        });
+                    }
+                    fillFullyDiscoverySyncData(discoverySyncData);
+                    dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.UPDATE, 
Collections.singletonList(discoverySyncData));
+                    break;
+                default:
+                    throw new IllegalStateException("shenyu 
DiscoveryDataChangedEventSyncListener find IllegalState");
+            }
+        }
+        if (Objects.nonNull(dataChangedEvent)) {
+            eventPublisher.publishEvent(dataChangedEvent);
+        }
+    }
+
+    private void fillFullyDiscoverySyncData(final DiscoverySyncData 
discoverySyncData) {
+        ProxySelectorData proxySelectorData = 
discoverySyncData.getProxySelectorData();
+        List<DiscoveryUpstreamDO> discoveryUpstreamDOS = 
discoveryUpstreamMapper.selectByProxySelectorId(proxySelectorData.getId());
+        List<DiscoveryUpstreamData> collect = 
discoveryUpstreamDOS.stream().map(DiscoveryTransfer.INSTANCE::mapToData).collect(Collectors.toList());
+        discoverySyncData.setUpstreamDataList(collect);
+    }
+
+    private DiscoverySyncData buildProxySelectorData(final String key, final 
String value) {
+        List<DiscoveryUpstreamData> discoveryUpstreamDTOS = 
keyValueParser.parseValue(value);
+        ProxySelectorData proxySelectorData = keyValueParser.parseKey(key);
+        String[] split = key.split("/");
+        String discoveryHandleId = split[split.length - 2];
+        discoveryUpstreamDTOS.forEach(s -> 
s.setDiscoveryHandlerId(discoveryHandleId));
+        DiscoverySyncData data = new DiscoverySyncData();
+        data.setUpstreamDataList(discoveryUpstreamDTOS);
+        data.setProxySelectorData(proxySelectorData);
+        return data;
+    }
+
+}
diff --git 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
similarity index 74%
copy from 
shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
copy to 
shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
index 33580bb90..8c0175321 100644
--- 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
@@ -15,17 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.discovery.api.listener;
+package org.apache.shenyu.admin.discovery;
 
-/**
- * Data changed listener.
- */
-public interface DataChangedEventListener {
-    
-    /**
-     * when data changed, fire this event.
-     * 
-     * @param event data changed event
-     */
-    void onChange(DataChangedEvent event);
+public enum DiscoveryMode {
+    LOCAL,
+    ZOOKEEPER,
+    NACOS,
+    EUREKA
 }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessor.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessor.java
new file mode 100644
index 000000000..a5d86b3f4
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.admin.model.dto.DiscoveryHandlerDTO;
+import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
+import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
+import org.apache.shenyu.admin.model.entity.DiscoveryDO;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.springframework.beans.BeanUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DiscoveryProcessor.
+ */
+public interface DiscoveryProcessor {
+
+    /**
+     * createDiscovery.
+     *
+     * @param discoveryDO discoveryDO
+     */
+    void createDiscovery(DiscoveryDO discoveryDO);
+
+    /**
+     * createProxySelector.
+     *
+     * @param discoveryHandlerDTO discoveryHandlerDTO
+     * @param proxySelectorDTO    proxySelectorDTO
+     */
+    void createProxySelector(DiscoveryHandlerDTO discoveryHandlerDTO, 
ProxySelectorDTO proxySelectorDTO);
+
+    /**
+     * removeDiscovery.
+     *
+     * @param discoveryDO discoveryDO
+     */
+    void removeDiscovery(DiscoveryDO discoveryDO);
+
+    /**
+     * removeProxySelector.
+     *
+     * @param discoveryHandlerDTO discoveryHandlerDTO
+     * @param proxySelectorDTO    proxySelectorDTO
+     */
+    void removeProxySelector(DiscoveryHandlerDTO discoveryHandlerDTO, 
ProxySelectorDTO proxySelectorDTO);
+
+
+    /**
+     * only use in local mode to sync upstreamDTOS.
+     *
+     * @param discoveryHandlerDTO discoveryHandlerDTO
+     * @param proxySelectorDTO    proxySelectorDTO
+     * @param upstreamDTOS        upstreamDTOS
+     */
+    void changeUpstream(DiscoveryHandlerDTO discoveryHandlerDTO, 
ProxySelectorDTO proxySelectorDTO, List<DiscoveryUpstreamDTO> upstreamDTOS);
+
+    /**
+     * covert.
+     *
+     * @param proxySelectorDTO proxySelectorDTO
+     * @param upstreamDTOS     upstreamDTOS
+     * @return DiscoverySyncData
+     */
+    default DiscoverySyncData covert(final ProxySelectorDTO proxySelectorDTO, 
final List<DiscoveryUpstreamDTO> upstreamDTOS) {
+        ProxySelectorData proxySelectorData = new ProxySelectorData();
+        BeanUtils.copyProperties(proxySelectorDTO, proxySelectorData);
+        DiscoverySyncData discoverySyncData = new DiscoverySyncData();
+        discoverySyncData.setProxySelectorData(proxySelectorData);
+        if (CollectionUtils.isNotEmpty(upstreamDTOS)) {
+            List<DiscoveryUpstreamData> collect = upstreamDTOS.stream().map(up 
-> {
+                DiscoveryUpstreamData discoveryUpstreamData = new 
DiscoveryUpstreamData();
+                BeanUtils.copyProperties(up, discoveryUpstreamData);
+                return discoveryUpstreamData;
+            }).collect(Collectors.toList());
+            discoverySyncData.setUpstreamDataList(collect);
+        }
+        return discoverySyncData;
+    }
+
+}
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
similarity index 50%
copy from 
shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
copy to 
shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
index ac2b9d92a..e95eb8752 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
@@ -15,36 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.sync.data.api;
+package org.apache.shenyu.admin.discovery;
 
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+public class DiscoveryProcessorHolder {
 
-import java.util.List;
+    private final DiscoveryProcessor defaultDiscoveryProcessor;
 
-/**
- * ProxySelectorDataSubscriber.
- */
-public interface ProxySelectorDataSubscriber {
+    private final DiscoveryProcessor localDiscoveryProcessor;
 
-    /**
-     * On subscribe.
-     *
-     * @param proxySelectorData the proxySelector data
-     * @param upstreamsList upstreamsList
-     */
-    void onSubscribe(ProxySelectorData proxySelectorData, 
List<DiscoveryUpstream> upstreamsList);
+    public DiscoveryProcessorHolder(final DiscoveryProcessor 
defaultDiscoveryProcessor, final DiscoveryProcessor localDiscoveryProcessor) {
+        this.defaultDiscoveryProcessor = defaultDiscoveryProcessor;
+        this.localDiscoveryProcessor = localDiscoveryProcessor;
+    }
 
     /**
-     * Un subscribe.
+     * chooseProcessor.
      *
-     * @param proxySelectorData the proxySelector data
+     * @param mode mode
+     * @return DiscoveryProcessor
      */
-    void unSubscribe(ProxySelectorData proxySelectorData);
-
-    /**
-     * Refresh.
-     */
-    default void refresh() {
+    public DiscoveryProcessor chooseProcessor(final String mode) {
+        if (DiscoveryMode.LOCAL.name().equalsIgnoreCase(mode)) {
+            return localDiscoveryProcessor;
+        } else {
+            return defaultDiscoveryProcessor;
+        }
     }
+
 }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
new file mode 100644
index 000000000..2d91b4f59
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.shenyu.admin.listener.DataChangedEvent;
+import org.apache.shenyu.admin.model.dto.DiscoveryHandlerDTO;
+import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
+import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
+import org.apache.shenyu.admin.model.entity.DiscoveryDO;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.enums.DataEventTypeEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * LocalDiscoveryProcessor.
+ */
+public class LocalDiscoveryProcessor implements DiscoveryProcessor, 
ApplicationEventPublisherAware {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalDiscoveryProcessor.class);
+
+    private ApplicationEventPublisher eventPublisher;
+
+    @Override
+    public void createDiscovery(final DiscoveryDO discoveryDO) {
+        LOG.info("shenyu discovery local mode do nothing in createDiscovery");
+    }
+
+    @Override
+    public void createProxySelector(final DiscoveryHandlerDTO 
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+        DataChangedEvent dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE, 
Collections.singletonList(covert(proxySelectorDTO, null)));
+        eventPublisher.publishEvent(dataChangedEvent);
+    }
+
+    @Override
+    public void removeDiscovery(final DiscoveryDO discoveryDO) {
+        LOG.info("shenyu discovery local mode do nothing in removeDiscovery");
+    }
+
+    @Override
+    public void removeProxySelector(final DiscoveryHandlerDTO 
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+        DataChangedEvent dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE, 
Collections.singletonList(covert(proxySelectorDTO, null)));
+        eventPublisher.publishEvent(dataChangedEvent);
+    }
+
+    @Override
+    public void changeUpstream(final DiscoveryHandlerDTO discoveryHandlerDTO, 
final ProxySelectorDTO proxySelectorDTO, final List<DiscoveryUpstreamDTO> 
upstreamDTOS) {
+        DataChangedEvent dataChangedEvent = new 
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE, 
Collections.singletonList(covert(proxySelectorDTO, upstreamDTOS)));
+        eventPublisher.publishEvent(dataChangedEvent);
+    }
+
+    @Override
+    public void setApplicationEventPublisher(final ApplicationEventPublisher 
applicationEventPublisher) {
+        this.eventPublisher = applicationEventPublisher;
+    }
+
+}
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/CustomDiscoveryUpstreamParser.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/CustomDiscoveryUpstreamParser.java
new file mode 100644
index 000000000..e2001d9bd
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/CustomDiscoveryUpstreamParser.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery.parse;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.admin.mapper.ProxySelectorMapper;
+import org.apache.shenyu.admin.model.entity.ProxySelectorDO;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
+
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DiscoveryUpstreamParser.
+ *
+ * <p>
+ * You can define a custom map mapper  if your custom upstream doesn't fit
+ * </p>
+ */
+public class CustomDiscoveryUpstreamParser implements 
JsonDeserializer<DiscoveryUpstreamData>, KeyValueParser {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CustomDiscoveryUpstreamParser.class);
+
+    private final Map<String, String> conversion;
+
+    private final ProxySelectorMapper proxySelectorMapper;
+
+    /**
+     * CustomDiscoveryUpstreamParser.
+     *
+     * @param conversion          conversion
+     * @param proxySelectorMapper proxySelectorMapper
+     */
+    public CustomDiscoveryUpstreamParser(final Map<String, String> conversion, 
final ProxySelectorMapper proxySelectorMapper) {
+        this.conversion = conversion;
+        this.proxySelectorMapper = proxySelectorMapper;
+    }
+
+    @Override
+    public DiscoveryUpstreamData deserialize(final JsonElement jsonElement,
+                                             final Type type,
+                                             final JsonDeserializationContext 
jsonDeserializationContext) throws JsonParseException {
+        JsonObject asJsonObject = jsonElement.getAsJsonObject();
+        JsonObject afterJson = new JsonObject();
+        for (Map.Entry<String, JsonElement> elementEntry : 
asJsonObject.entrySet()) {
+            String key = elementEntry.getKey();
+            if (conversion.containsKey(key)) {
+                String transferKey = conversion.get(key);
+                afterJson.add(transferKey, elementEntry.getValue());
+            } else {
+                afterJson.add(key, elementEntry.getValue());
+            }
+        }
+        return GsonUtils.getInstance().fromJson(afterJson, 
DiscoveryUpstreamData.class);
+    }
+
+    @Override
+    public List<DiscoveryUpstreamData> parseValue(final String jsonString) {
+        if (StringUtils.isBlank(jsonString)) {
+            return Collections.emptyList();
+        }
+        GsonBuilder gsonBuilder = new 
GsonBuilder().registerTypeAdapter(DiscoveryUpstreamData.class, this);
+        Gson gson = gsonBuilder.create();
+        return Collections.singletonList(gson.fromJson(jsonString, 
DiscoveryUpstreamData.class));
+    }
+
+    /**
+     * parseKey.
+     *
+     * <p>
+     * /.../{pluginName}/{selectorId}/{discoveryHandlerId}/{upstream_suq}
+     * </p>
+     *
+     * @param key key
+     * @return ProxySelectorData
+     */
+    @Override
+    public ProxySelectorData parseKey(final String key) {
+        String[] subArray = key.split("/");
+        String proxySelectorId = subArray[subArray.length - 2];
+        ProxySelectorData proxySelectorData = new ProxySelectorData();
+        ProxySelectorDO proxySelectorDO = 
proxySelectorMapper.selectById(proxySelectorId);
+        BeanUtils.copyProperties(proxySelectorDO, proxySelectorData);
+        LOG.info("shenyu parseKey 
pluginName={}|proxySelectorName={}|type={}|forwardPort={}", 
proxySelectorData.getPluginName(),
+                proxySelectorData.getName(), proxySelectorData.getType(), 
proxySelectorData.getForwardPort());
+        return proxySelectorData;
+    }
+
+}
diff --git 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/KeyValueParser.java
similarity index 63%
copy from 
shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
copy to 
shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/KeyValueParser.java
index ff009d567..10ca9fa9f 100644
--- 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/KeyValueParser.java
@@ -15,33 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shenyu.protocol.tcp;
+package org.apache.shenyu.admin.discovery.parse;
 
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
 
 import java.util.List;
 
 /**
- * BootstrapServer.
+ * parse value to ProxySelectorData list.
  */
-public interface BootstrapServer {
+public interface KeyValueParser {
 
     /**
-     * start.
+     * parseValue.
      *
-     * @param tcpServerConfiguration tcpServerConfiguration
+     * @param value value
+     * @return DiscoveryUpstreamData list
      */
-    void start(TcpServerConfiguration tcpServerConfiguration);
+    List<DiscoveryUpstreamData> parseValue(String value);
 
     /**
-     * doOnUpdate.
+     * parseKey.
      *
-     * @param removeList removeList
+     * @param key discovery key
+     * @return ProxySelectorData.
      */
-    void removeCommonUpstream(List<DiscoveryUpstream> removeList);
+    ProxySelectorData parseKey(String key);
 
-    /**
-     * shutdown.
-     */
-    void shutdown();
 }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
index f409178a7..b467f70c7 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
@@ -22,11 +22,12 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
 import org.apache.shenyu.common.enums.DataEventTypeEnum;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.slf4j.Logger;
@@ -249,11 +250,12 @@ public abstract class AbstractListDataChangedListener 
implements DataChangedList
     }
 
     @Override
-    public void onProxySelectorChanged(final List<ProxySelectorData> changed, 
final DataEventTypeEnum eventType) {
+    public void onProxySelectorChanged(final List<DiscoverySyncData> changed, 
final DataEventTypeEnum eventType) {
         updateProxySelectorMap(getConfig(changeData.getProxySelectorDataId()));
         switch (eventType) {
             case DELETE:
-                changed.forEach(proxySelectorData -> {
+                changed.forEach(discoverySyncData -> {
+                    ProxySelectorData proxySelectorData = 
discoverySyncData.getProxySelectorData();
                     List<ProxySelectorData> ls = PROXY_SELECTOR_MAP
                             .getOrDefault(proxySelectorData.getId(), new 
ArrayList<>())
                             .stream()
@@ -266,25 +268,27 @@ public abstract class AbstractListDataChangedListener 
implements DataChangedList
             case MYSELF:
                 Set<String> selectIdSet = changed
                         .stream()
-                        .map(ProxySelectorData::getId)
+                        .map(discoverySyncData ->
+                                
discoverySyncData.getProxySelectorData().getId()
+                        )
                         .collect(Collectors.toSet());
                 PROXY_SELECTOR_MAP.keySet().removeAll(selectIdSet);
-                changed.forEach(proxySelectorData -> {
-                    List<ProxySelectorData> ls = new 
ArrayList<>(PROXY_SELECTOR_MAP.getOrDefault(proxySelectorData.getId(),
+                changed.forEach(discoverySyncData -> {
+                    List<ProxySelectorData> ls = new 
ArrayList<>(PROXY_SELECTOR_MAP.getOrDefault(discoverySyncData.getProxySelectorData().getId(),
                             new ArrayList<>()));
-                    ls.add(proxySelectorData);
-                    PROXY_SELECTOR_MAP.put(proxySelectorData.getId(), ls);
+                    ls.add(discoverySyncData.getProxySelectorData());
+                    
PROXY_SELECTOR_MAP.put(discoverySyncData.getProxySelectorData().getId(), ls);
                 });
                 break;
             default:
-                changed.forEach(proxySelectorData -> {
+                changed.forEach(discoverySyncData -> {
                     List<ProxySelectorData> ls = PROXY_SELECTOR_MAP
-                            .getOrDefault(proxySelectorData.getId(), new 
ArrayList<>())
+                            
.getOrDefault(discoverySyncData.getProxySelectorData().getId(), new 
ArrayList<>())
                             .stream()
-                            .filter(s -> 
!s.getId().equals(proxySelectorData.getId()))
+                            .filter(s -> 
!s.getId().equals(discoverySyncData.getProxySelectorData().getId()))
                             .collect(Collectors.toList());
-                    ls.add(proxySelectorData);
-                    PROXY_SELECTOR_MAP.put(proxySelectorData.getId(), ls);
+                    ls.add(discoverySyncData.getProxySelectorData());
+                    
PROXY_SELECTOR_MAP.put(discoverySyncData.getProxySelectorData().getId(), ls);
                 });
                 break;
         }
@@ -362,7 +366,7 @@ public abstract class AbstractListDataChangedListener 
implements DataChangedList
      * publishConfig.
      *
      * @param dataId dataId
-     * @param data data
+     * @param data   data
      */
     public abstract void publishConfig(String dataId, Object data);
 
@@ -409,11 +413,11 @@ public abstract class AbstractListDataChangedListener 
implements DataChangedList
         /**
          * ChangeData.
          *
-         * @param pluginDataId pluginDataId
+         * @param pluginDataId   pluginDataId
          * @param selectorDataId selectorDataId
-         * @param ruleDataId ruleDataId
-         * @param authDataId authDataId
-         * @param metaDataId metaDataId
+         * @param ruleDataId     ruleDataId
+         * @param authDataId     authDataId
+         * @param metaDataId     metaDataId
          */
         public ChangeData(final String pluginDataId, final String 
selectorDataId,
                           final String ruleDataId, final String authDataId,
@@ -473,6 +477,7 @@ public abstract class AbstractListDataChangedListener 
implements DataChangedList
 
         /**
          * get proxySelectorDataId.
+         *
          * @return proxySelectorDataId
          */
         public String getProxySelectorDataId() {
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
index eb76dc555..07a0fec5e 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
@@ -19,10 +19,11 @@ package org.apache.shenyu.admin.listener;
 
 import org.apache.shenyu.common.constant.DefaultPathConstants;
 import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.enums.DataEventTypeEnum;
 import org.apache.shenyu.common.exception.ShenyuException;
 import org.slf4j.Logger;
@@ -80,6 +81,22 @@ public abstract class AbstractNodeDataChangedListener 
implements DataChangedList
         }
     }
 
+    @Override
+    public void onProxySelectorChanged(final List<DiscoverySyncData> changed, 
final DataEventTypeEnum eventType) {
+        for (DiscoverySyncData data : changed) {
+            String proxySelectorPath = 
DefaultPathConstants.buildProxySelectorPath(data.getProxySelectorData().getPluginName(),
 data.getProxySelectorData().getName());
+            // delete
+            if (eventType == DataEventTypeEnum.DELETE) {
+                deleteNode(proxySelectorPath);
+                LOG.debug("[DataChangedListener] delete appKey {}", 
proxySelectorPath);
+                continue;
+            }
+            // create or update
+            createOrUpdate(proxySelectorPath, data);
+            LOG.info("[DataChangedListener] change proxySelector 
path={}|data={}", proxySelectorPath, data);
+        }
+    }
+
     @Override
     public void onSelectorChanged(final List<SelectorData> changed, final 
DataEventTypeEnum eventType) {
         if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
@@ -145,7 +162,7 @@ public abstract class AbstractNodeDataChangedListener 
implements DataChangedList
      * createOrUpdate.
      *
      * @param pluginPath pluginPath
-     * @param data data
+     * @param data       data
      */
     public abstract void createOrUpdate(String pluginPath, Object data);
 
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
index 56c3936f6..3389ece26 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
@@ -19,11 +19,11 @@ package org.apache.shenyu.admin.listener;
 
 import org.apache.shenyu.admin.service.manager.LoadServiceDocEntry;
 import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.ApplicationListener;
@@ -70,9 +70,8 @@ public class DataChangedEventDispatcher implements 
ApplicationListener<DataChang
                     listener.onMetaDataChanged((List<MetaData>) 
event.getSource(), event.getEventType());
                     break;
                 case PROXY_SELECTOR:
-                    listener.onProxySelectorChanged((List<ProxySelectorData>) 
event.getSource(), event.getEventType());
+                    listener.onProxySelectorChanged((List<DiscoverySyncData>) 
event.getSource(), event.getEventType());
                     break;
-
                 default:
                     throw new IllegalStateException("Unexpected value: " + 
event.getGroupKey());
             }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
index da3ae8a30..9e23d3da8 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
@@ -20,7 +20,7 @@ package org.apache.shenyu.admin.listener;
 import org.apache.shenyu.common.dto.AppAuthData;
 import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
 import org.apache.shenyu.common.enums.DataEventTypeEnum;
@@ -80,12 +80,12 @@ public interface DataChangedListener {
     }
 
     /**
-     * invoke this method when proxy selector was received.
+     * invoke this method when ProxySelector was changed.
      *
      * @param changed   the changed
      * @param eventType the event type
      */
-    default void onProxySelectorChanged(List<ProxySelectorData> changed, 
DataEventTypeEnum eventType) {
+    default void onProxySelectorChanged(List<DiscoverySyncData> changed, 
DataEventTypeEnum eventType) {
     }
 
 }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
index f2ed75ea7..ae4cbcfe8 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
@@ -45,6 +45,15 @@ public interface DiscoveryUpstreamMapper extends 
ExistProvider {
      */
     List<DiscoveryUpstreamDO> selectByIds(@Param("ids") List<String> ids);
 
+
+    /**
+     * selectByProxySelectorId.
+     *
+     * @param proxySelectorId proxySelectorId
+     * @return DiscoveryUpstreamDO list
+     */
+    List<DiscoveryUpstreamDO> 
selectByProxySelectorId(@Param("proxySelectorId") String proxySelectorId);
+
     /**
      * insert.
      *
@@ -68,4 +77,13 @@ public interface DiscoveryUpstreamMapper extends 
ExistProvider {
      * @return rows int
      */
     int deleteByIds(@Param("ids") List<String> ids);
+
+    /**
+     * deleteByUrl.
+     *
+     * @param url url
+     * @return rows int
+     */
+    int deleteByUrl(@Param("url") String url);
+
 }
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
index 933819e40..9d4479706 100644
--- 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
@@ -70,6 +70,14 @@ public interface ProxySelectorMapper extends ExistProvider {
      */
     int update(ProxySelectorDO proxySelectorDO);
 
+    /**
+     * selectById.
+     *
+     * @param id id
+     * @return ProxySelectorDO
+     */
+    ProxySelectorDO selectById(@Param("id") String id);
+
     /**
      * selectByIds.
      *
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/DiscoveryHandlerDTO.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/DiscoveryHandlerDTO.java
new file mode 100644
index 000000000..47cf3d8ec
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/DiscoveryHandlerDTO.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.model.dto;
+
+public class DiscoveryHandlerDTO {
+
+    private String id;
+
+    private String discoveryId;
+
+    private String handler;
+
+    private String listenerNode;
+
+    private String props;
+
+    /**
+     * getId.
+     *
+     * @return id
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * setId.
+     *
+     * @param id id
+     */
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    /**
+     * getDiscoveryId.
+     *
+     * @return discoveryId
+     */
+    public String getDiscoveryId() {
+        return discoveryId;
+    }
+
+    /**
+     * setDiscoveryId.
+     *
+     * @param discoveryId discoveryId
+     */
+    public void setDiscoveryId(final String discoveryId) {
+        this.discoveryId = discoveryId;
+    }
+
+    /**
+     * getHandler.
+     *
+     * @return handler
+     */
+    public String getHandler() {
+        return handler;
+    }
+
+    /**
+     * setHandler.
+     *
+     * @param handler handler
+     */
+    public void setHandler(final String handler) {
+        this.handler = handler;
+    }
+
+    /**
+     * getListenerNode.
+     *
+     * @return listenerNode
+     */
+    public String getListenerNode() {
+        return listenerNode;
+    }
+
+    /**
+     * setListenerNode.
+     *
+     * @param listenerNode listenerNode
+     */
+    public void setListenerNode(final String listenerNode) {
+        this.listenerNode = listenerNode;
+    }
+
+    /**
+     * getProps.
+     *
+     * @return props
+     */
+    public String getProps() {
+        return props;
+    }
+
+    /**
+     * setProps.
+     *
+     * @param props props
+     */
+    public void setProps(final String props) {
+        this.props = props;
+    }
+}
diff --git 
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
new file mode 100644
index 000000000..10d9f7818
--- /dev/null
+++ 
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.transfer;
+
+import org.apache.shenyu.admin.model.entity.DiscoveryUpstreamDO;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+
+/**
+ * DiscoveryTransfer.
+ */
+public enum DiscoveryTransfer {
+    /**
+     * The constant INSTANCE.
+     */
+    INSTANCE;
+
+    /**
+     * mapToDo.
+     *
+     * @param discoveryUpstreamData discoveryUpstreamData
+     * @return DiscoveryUpstreamDO
+     */
+    public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData 
discoveryUpstreamData) {
+        return DiscoveryUpstreamDO.builder()
+                
.discoveryHandlerId(discoveryUpstreamData.getDiscoveryHandlerId())
+                .id(discoveryUpstreamData.getId())
+                .protocol(discoveryUpstreamData.getProtocol())
+                .status(discoveryUpstreamData.getStatus())
+                .weight(discoveryUpstreamData.getWeight())
+                .props(discoveryUpstreamData.getProps())
+                .url(discoveryUpstreamData.getUrl())
+                .dateUpdated(discoveryUpstreamData.getDateUpdated())
+                .dateCreated(discoveryUpstreamData.getDateCreated()).build();
+    }
+
+    /**
+     * mapToData.
+     *
+     * @param discoveryUpstreamDO discoveryUpstreamDO
+     * @return DiscoveryUpstreamData
+     */
+    public DiscoveryUpstreamData mapToData(DiscoveryUpstreamDO 
discoveryUpstreamDO) {
+        DiscoveryUpstreamData discoveryUpstreamData = new 
DiscoveryUpstreamData();
+        discoveryUpstreamData.setId(discoveryUpstreamDO.getId());
+        discoveryUpstreamData.setProtocol(discoveryUpstreamDO.getProtocol());
+        discoveryUpstreamData.setUrl(discoveryUpstreamDO.getUrl());
+        discoveryUpstreamData.setStatus(discoveryUpstreamDO.getStatus());
+        
discoveryUpstreamData.setDiscoveryHandlerId(discoveryUpstreamDO.getDiscoveryHandlerId());
+        discoveryUpstreamData.setWeight(discoveryUpstreamDO.getWeight());
+        discoveryUpstreamData.setProps(discoveryUpstreamDO.getProps());
+        
discoveryUpstreamData.setDateUpdated(discoveryUpstreamDO.getDateUpdated());
+        
discoveryUpstreamData.setDateCreated(discoveryUpstreamDO.getDateCreated());
+        return discoveryUpstreamData;
+    }
+
+}
diff --git a/shenyu-admin/src/main/resources/application.yml 
b/shenyu-admin/src/main/resources/application.yml
index 78f41bee6..d6c949fb6 100755
--- a/shenyu-admin/src/main/resources/application.yml
+++ b/shenyu-admin/src/main/resources/application.yml
@@ -70,10 +70,10 @@ shenyu:
 #      clusterName: test
 #      namespace: application
 #      token: 0fff5645fc74ee5e0d63a6389433c8c8afc0beea31eed0279ecc1c8961d12da9
-#      zookeeper:
-#        url: localhost:2181
-#        sessionTimeout: 5000
-#        connectionTimeout: 2000
+#    zookeeper:
+#      url: localhost:2181
+#      sessionTimeout: 5000
+#      connectionTimeout: 2000
 #      http:
 #        enabled: true
 #      nacos:
diff --git 
a/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml 
b/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
index 982a34a9c..59c4d90a1 100644
--- a/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
+++ b/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
@@ -93,10 +93,33 @@
         </foreach>
     </select>
 
+    <select id="selectByProxySelectorId" resultMap="BaseResultMap">
+        SELECT
+            du.id,
+            du.date_created,
+            du.date_updated,
+            du.discovery_handler_id,
+            du.protocol,
+            du.url,
+            du.status,
+            du.weight,
+            du.props
+        FROM discovery_upstream du
+        INNER JOIN discovery_rel dr ON du.discovery_handler_id = 
dr.discovery_handler_id
+        <where>
+            dr.proxy_selector_id = #{proxySelectorId , jdbcType=VARCHAR}
+        </where>
+    </select>
+
     <delete id="deleteByIds" parameterType="java.util.List">
         DELETE FROM discovery_upstream WHERE id IN
         <foreach item="id" collection="ids" open="(" separator="," close=")">
             #{id, jdbcType=VARCHAR}
         </foreach>
     </delete>
+
+    <delete id="deleteByUrl">
+        DELETE FROM discovery_upstream WHERE url = #{url}
+    </delete>
+
 </mapper>
diff --git a/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml 
b/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
index 5df080149..33feef495 100644
--- a/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
+++ b/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
@@ -109,6 +109,13 @@
         </foreach>
     </select>
 
+    <select id="selectById" resultMap="BaseResultMap">
+        SELECT
+        <include refid="Base_Column_List"/>
+        FROM proxy_selector
+        WHERE id  = #{id, jdbcType=VARCHAR}
+    </select>
+
     <delete id="deleteByIds" parameterType="java.util.List">
         DELETE FROM proxy_selector WHERE id IN
         <foreach item="id" collection="ids" open="(" separator="," close=")">
diff --git a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql 
b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
index c96fce3ed..3e842ff90 100755
--- a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
@@ -1036,21 +1036,32 @@ CREATE TABLE `proxy_selector`
     PRIMARY KEY (`id`)
 );
 
+-- ----------------------------
+-- Table structure for discovery_handler
+-- ----------------------------
+CREATE TABLE `discovery_handler`
+(
+    `id`           varchar(128)  NOT NULL COMMENT 'primary key id',
+    `discovery_id` varchar(128)  NOT NULL COMMENT 'the discovery id',
+    `handler`         varchar(255)  NOT NULL COMMENT 'the handler',
+    `listener_node` varchar(255)  COMMENT 'register server listener to node',
+    `props`     text COMMENT 'the discovery pops (json) ',
+    `date_created` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 
'create time',
+    `date_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON 
UPDATE CURRENT_TIMESTAMP(3) COMMENT 'update time',
+    PRIMARY KEY (`id`)
+) ;
+
 -- ----------------------------
 -- Table structure for discovery_rel
 -- ----------------------------
 CREATE TABLE `discovery_rel`
 (
-    `id`           varchar(128) NOT NULL COMMENT 'primary key id',
-    `level`        varchar(64)  NOT NULL COMMENT 'plugin or selector',
-    `discovery_id` varchar(128) NOT NULL COMMENT 'the discovery id',
-    `service_id` varchar(128)   NOT NULL COMMENT 'the selector id or proxy 
selector id',
+    `id`           varchar(128)  NOT NULL COMMENT 'primary key id',
+    `level`        varchar(64)   NOT NULL COMMENT 'plugin or selector',
+    `discovery_handler_id` varchar(128)  NOT NULL COMMENT 'the discovery 
handler id',
+    `selector_id` varchar(128)  COMMENT 'the selector id ',
+    `proxy_selector_id` varchar(128)  COMMENT 'the proxy selector id',
     `date_created` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT 
'create time',
     `date_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON 
UPDATE CURRENT_TIMESTAMP(3) COMMENT 'update time',
     PRIMARY KEY (`id`)
 );
-
-
-
-
-
diff --git a/shenyu-bootstrap/pom.xml b/shenyu-bootstrap/pom.xml
index 8195fa1cf..149e5de05 100644
--- a/shenyu-bootstrap/pom.xml
+++ b/shenyu-bootstrap/pom.xml
@@ -232,12 +232,12 @@
         <!--shenyu tars plugin end-->
 
         <!--Tcp Plugin Start-->
-<!--        <dependency>-->
-<!--            <groupId>org.apache.shenyu</groupId>-->
-<!--            
<artifactId>shenyu-spring-boot-starter-plugin-tcp</artifactId>-->
-<!--            <version>${project.version}</version>-->
-<!--        </dependency>-->
-        <!--Tcp Plugin end-->
+        <dependency>
+            <groupId>org.apache.shenyu</groupId>
+            <artifactId>shenyu-spring-boot-starter-plugin-tcp</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+<!--        Tcp Plugin end-->
 
         <!--shenyu sofa plugin start-->
         <dependency>
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
index d39d91c70..a5e48653b 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
@@ -52,9 +52,9 @@ public final class DefaultPathConstants implements Constants {
     public static final String META_DATA = PRE_FIX + "/metaData";
 
     /**
-     * The constant PROXY_SELECTOR_DATA.
+     * The constant PROXY_SELECTOR.
      */
-    public static final String PROXY_SELECTOR_DATA = PRE_FIX + 
"/proxySelectorData";
+    public static final String PROXY_SELECTOR = PRE_FIX + "/proxySelectorData";
 
     /**
      * acquire app_auth_path.
@@ -138,4 +138,15 @@ public final class DefaultPathConstants implements 
Constants {
         return String.join(PATH_SEPARATOR, buildRuleParentPath(pluginName), 
String.join(SELECTOR_JOIN_RULE, selectorId, ruleId));
     }
 
+    /**
+     * buildProxySelector.
+     *
+     * @param pluginName        pluginName
+     * @param proxySelectorName selectorId
+     * @return /shenyu/proxySelectorData/pluginName/proxySelectorName
+     */
+    public static String buildProxySelectorPath(final String pluginName, final 
String proxySelectorName) {
+        return String.join(PATH_SEPARATOR, PROXY_SELECTOR, pluginName, 
proxySelectorName);
+    }
+
 }
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoverySyncData.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoverySyncData.java
new file mode 100644
index 000000000..ca3108319
--- /dev/null
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoverySyncData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.dto;
+
+import java.util.List;
+
+public class DiscoverySyncData {
+
+    private ProxySelectorData proxySelectorData;
+
+    private List<DiscoveryUpstreamData> upstreamDataList;
+
+    /**
+     * getProxySelectorData.
+     *
+     * @return proxySelectorData
+     */
+    public ProxySelectorData getProxySelectorData() {
+        return proxySelectorData;
+    }
+
+    /**
+     * setProxySelectorData.
+     *
+     * @param proxySelectorData proxySelectorData
+     */
+    public void setProxySelectorData(final ProxySelectorData 
proxySelectorData) {
+        this.proxySelectorData = proxySelectorData;
+    }
+
+    /**
+     * getUpstreamDataList.
+     *
+     * @return upstreamDataList
+     */
+    public List<DiscoveryUpstreamData> getUpstreamDataList() {
+        return upstreamDataList;
+    }
+
+    /**
+     * setUpstreamDataList.
+     *
+     * @param upstreamDataList upstreamDataList
+     */
+    public void setUpstreamDataList(final List<DiscoveryUpstreamData> 
upstreamDataList) {
+        this.upstreamDataList = upstreamDataList;
+    }
+
+}
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoveryUpstreamData.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoveryUpstreamData.java
new file mode 100644
index 000000000..234c35583
--- /dev/null
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoveryUpstreamData.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.dto;
+
+import java.sql.Timestamp;
+
+public class DiscoveryUpstreamData {
+
+    /**
+     * primary key.
+     */
+    private String id;
+
+    /**
+     * created time.
+     */
+    private Timestamp dateCreated;
+
+    /**
+     * updated time.
+     */
+    private Timestamp dateUpdated;
+
+
+    /**
+     * discoveryHandlerId.
+     */
+    private String discoveryHandlerId;
+
+    /**
+     * protocol.
+     */
+    private String protocol;
+
+    /**
+     * url.
+     */
+    private String url;
+
+    /**
+     * status.
+     */
+    private int status;
+
+    /**
+     * weight.
+     */
+    private int weight;
+
+    /**
+     * props.
+     */
+    private String props;
+
+
+    /**
+     * getDiscoveryHandlerId.
+     *
+     * @return discoveryHandlerId
+     */
+    public String getDiscoveryHandlerId() {
+        return discoveryHandlerId;
+    }
+
+    /**
+     * setDiscoveryHandlerId.
+     *
+     * @param discoveryHandlerId discoveryHandlerId
+     */
+    public void setDiscoveryHandlerId(final String discoveryHandlerId) {
+        this.discoveryHandlerId = discoveryHandlerId;
+    }
+
+    /**
+     * getProtocol.
+     *
+     * @return protocol
+     */
+    public String getProtocol() {
+        return protocol;
+    }
+
+    /**
+     * setProtocol.
+     *
+     * @param protocol protocol
+     */
+    public void setProtocol(final String protocol) {
+        this.protocol = protocol;
+    }
+
+    /**
+     * getUrl.
+     *
+     * @return url
+     */
+    public String getUrl() {
+        return url;
+    }
+
+    /**
+     * setUrl.
+     *
+     * @param url url
+     */
+    public void setUrl(final String url) {
+        this.url = url;
+    }
+
+    /**
+     * getStatus.
+     *
+     * @return status
+     */
+    public int getStatus() {
+        return status;
+    }
+
+    /**
+     * setStatus.
+     *
+     * @param status status
+     */
+    public void setStatus(final int status) {
+        this.status = status;
+    }
+
+    /**
+     * getWeight.
+     *
+     * @return weight
+     */
+    public int getWeight() {
+        return weight;
+    }
+
+    /**
+     * setWeight.
+     *
+     * @param weight weight
+     */
+    public void setWeight(final int weight) {
+        this.weight = weight;
+    }
+
+    /**
+     * getProps.
+     *
+     * @return props
+     */
+    public String getProps() {
+        return props;
+    }
+
+    /**
+     * setProps.
+     *
+     * @param props props
+     */
+    public void setProps(final String props) {
+        this.props = props;
+    }
+
+    /**
+     * Gets the value of id.
+     *
+     * @return the value of id
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Sets the id.
+     *
+     * @param id id
+     */
+    public void setId(final String id) {
+        this.id = id;
+    }
+
+    /**
+     * Gets the value of dateCreated.
+     *
+     * @return the value of dateCreated
+     */
+    public Timestamp getDateCreated() {
+        return dateCreated;
+    }
+
+    /**
+     * Sets the dateCreated.
+     *
+     * @param dateCreated dateCreated
+     */
+    public void setDateCreated(final Timestamp dateCreated) {
+        this.dateCreated = dateCreated;
+    }
+
+    /**
+     * Gets the value of dateUpdated.
+     *
+     * @return the value of dateUpdated
+     */
+    public Timestamp getDateUpdated() {
+        return dateUpdated;
+    }
+
+    /**
+     * setDateUpdated.
+     *
+     * @param dateUpdated dateUpdated
+     */
+    public void setDateUpdated(final Timestamp dateUpdated) {
+        this.dateUpdated = dateUpdated;
+    }
+}
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
index 9ff547243..0f28eedbb 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
@@ -17,9 +17,6 @@
 
 package org.apache.shenyu.common.dto;
 
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
-
-import java.util.List;
 import java.util.Properties;
 
 /**
@@ -39,8 +36,6 @@ public class ProxySelectorData {
 
     private Properties props = new Properties();
 
-    private List<DiscoveryUpstream> discoveryUpstreamList;
-
     /**
      * getId.
      *
@@ -149,22 +144,4 @@ public class ProxySelectorData {
         this.props = props;
     }
 
-    /**
-     * getDiscoveryUpstreamList.
-     *
-     * @return discoveryUpstreamList
-     */
-    public List<DiscoveryUpstream> getDiscoveryUpstreamList() {
-        return discoveryUpstreamList;
-    }
-
-    /**
-     * setDiscoveryUpstreamList.
-     *
-     * @param discoveryUpstreamList discoveryUpstreamList
-     */
-    public void setDiscoveryUpstreamList(final List<DiscoveryUpstream> 
discoveryUpstreamList) {
-        this.discoveryUpstreamList = discoveryUpstreamList;
-    }
-
 }
diff --git 
a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
 
b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
index 49a2d4f47..70bb892a4 100644
--- 
a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
+++ 
b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 
 /**
  * configuration group.
- *
  */
 public enum ConfigGroupEnum {
 
@@ -53,13 +52,11 @@ public enum ConfigGroupEnum {
      */
     META_DATA,
 
-
     /**
-     * proxy selector enum.
+     * ProxySelector data group enum.
      */
     PROXY_SELECTOR;
 
-
     /**
      * Acquire by name config group enum.
      *
diff --git 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
 
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
index f59b13f64..dece0d7d2 100644
--- 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++ 
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
@@ -42,6 +42,13 @@ public interface ShenyuDiscoveryService {
      */
     void watcher(String key, DataChangedEventListener listener);
 
+    /**
+     * unWatcher path.
+     *
+     * @param key key
+     */
+    void unWatcher(String key);
+
     /**
      * Register data.
      *
@@ -58,4 +65,9 @@ public interface ShenyuDiscoveryService {
      */
     String getData(String key);
 
+    /**
+     * shutdown.
+     */
+    void shutdown();
+
 }
diff --git 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
 
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
index 33580bb90..9e7c97e72 100644
--- 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
+++ 
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
@@ -27,5 +27,5 @@ public interface DataChangedEventListener {
      * 
      * @param event data changed event
      */
-    void onChange(DataChangedEvent event);
+    void onChange(DiscoveryDataChangedEvent event);
 }
diff --git 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEvent.java
 
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DiscoveryDataChangedEvent.java
similarity index 93%
rename from 
shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEvent.java
rename to 
shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DiscoveryDataChangedEvent.java
index 5ebe41874..414034590 100644
--- 
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEvent.java
+++ 
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DiscoveryDataChangedEvent.java
@@ -20,7 +20,7 @@ package org.apache.shenyu.discovery.api.listener;
 /**
  * Data changed event.
  */
-public final class DataChangedEvent {
+public final class DiscoveryDataChangedEvent {
     
     private final String key;
     
@@ -35,7 +35,7 @@ public final class DataChangedEvent {
      * @param value the value
      * @param event the event
      */
-    public DataChangedEvent(final String key, final String value, final Event 
event) {
+    public DiscoveryDataChangedEvent(final String key, final String value, 
final Event event) {
         this.key = key;
         this.value = value;
         this.event = event;
diff --git 
a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
 
b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
index 63e676f4d..8483a4a37 100644
--- 
a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++ 
b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
@@ -20,17 +20,19 @@ package org.apache.shenyu.discovery.zookeeper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.shenyu.common.exception.ShenyuException;
 import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
 import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEvent;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
 import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
 import org.apache.shenyu.spi.Join;
 import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +53,8 @@ public class ZookeeperDiscoveryService implements 
ShenyuDiscoveryService {
 
     private final Map<String, String> nodeDataMap = new HashMap<>();
 
+    private final Map<String, TreeCache> cacheMap = new HashMap<>();
+
     @Override
     public void init(final DiscoveryConfig config) {
         String baseSleepTimeMilliseconds = 
config.getProps().getProperty("baseSleepTimeMilliseconds", "1000");
@@ -94,7 +98,7 @@ public class ZookeeperDiscoveryService implements 
ShenyuDiscoveryService {
 
     private boolean isExist(final String key) {
         try {
-            return Objects.nonNull(client.checkExists().forPath(key));
+            return null != client.checkExists().forPath(key);
         } catch (Exception e) {
             throw new ShenyuException(e);
         }
@@ -112,59 +116,81 @@ public class ZookeeperDiscoveryService implements 
ShenyuDiscoveryService {
     @Override
     public void watcher(final String key, final DataChangedEventListener 
listener) {
         try {
-            this.client.getData().usingWatcher(new CuratorWatcher() {
-                @Override
-                public void process(final WatchedEvent watchedEvent) throws 
Exception {
-                    if (Objects.nonNull(listener)) {
-                        String path = Objects.isNull(watchedEvent.getPath()) ? 
"" : watchedEvent.getPath();
-                        if (StringUtils.isNoneBlank(path)) {
-                            client.getData().usingWatcher(this).forPath(path);
-                            byte[] ret = client.getData().forPath(key);
-                            String data = Objects.isNull(ret) ? null : new 
String(ret, StandardCharsets.UTF_8);
-                            LOGGER.info("shenyu ZookeeperDiscoveryService 
onChange key={}", path);
-                            listener.onChange(buildDataChangedEvent(path, 
data, watchedEvent));
-                        }
+            TreeCache treeCache = new TreeCache(client, key);
+            TreeCacheListener treeCacheListener = (curatorFramework, event) -> 
{
+                ChildData data = event.getData();
+                DiscoveryDataChangedEvent dataChangedEvent;
+                if (Objects.nonNull(data) && Objects.nonNull(data.getData())) {
+                    String currentPath = data.getPath();
+                    Stat stat = data.getStat();
+                    boolean isEphemeral = Objects.nonNull(stat) && 
stat.getEphemeralOwner() > 0;
+                    if (!isEphemeral) {
+                        LOGGER.info("shenyu Ignore non-ephemeral node 
changes");
+                        return;
+                    }
+                    switch (event.getType()) {
+                        case NODE_ADDED:
+                            dataChangedEvent = new 
DiscoveryDataChangedEvent(currentPath, new String(data.getData(), 
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.ADDED);
+                            break;
+                        case NODE_UPDATED:
+                            dataChangedEvent = new 
DiscoveryDataChangedEvent(currentPath, new String(data.getData(), 
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.UPDATED);
+                            break;
+                        case NODE_REMOVED:
+                            dataChangedEvent = new 
DiscoveryDataChangedEvent(currentPath, new String(data.getData(), 
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.DELETED);
+                            break;
+                        default:
+                            dataChangedEvent = new 
DiscoveryDataChangedEvent(currentPath, new String(data.getData(), 
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.IGNORED);
+                            break;
                     }
+                    listener.onChange(dataChangedEvent);
                 }
-            }).forPath(key);
+            };
+            treeCache.getListenable().addListener(treeCacheListener);
+            treeCache.start();
+            cacheMap.put(key, treeCache);
         } catch (Exception e) {
             throw new ShenyuException(e);
         }
     }
 
+    @Override
+    public void unWatcher(final String key) {
+        if (cacheMap.containsKey(key)) {
+            cacheMap.remove(key).close();
+        }
+    }
+
     @Override
     public void register(final String key, final String value) {
-        this.createOrUpdate(key, value, CreateMode.EPHEMERAL);
+        this.createOrUpdate(key, value, CreateMode.PERSISTENT);
     }
 
     @Override
     public String getData(final String key) {
         try {
-            byte[] ret = client.getData().forPath(key);
+            TreeCache treeCache = cacheMap.get(key);
+            if (Objects.isNull(treeCache)) {
+                return null;
+            }
+            ChildData currentData = treeCache.getCurrentData(key);
+            byte[] ret = currentData.getData();
             return Objects.isNull(ret) ? null : new String(ret, 
StandardCharsets.UTF_8);
         } catch (Exception e) {
             throw new ShenyuException(e);
         }
     }
 
-    private DataChangedEvent buildDataChangedEvent(final String key, final 
String value, final WatchedEvent watchedEvent) {
-        DataChangedEvent.Event event = null;
-        switch (watchedEvent.getType()) {
-            case NodeCreated:
-                event = DataChangedEvent.Event.ADDED;
-                break;
-            case NodeDeleted:
-                event = DataChangedEvent.Event.DELETED;
-                break;
-            case NodeDataChanged:
-                event = DataChangedEvent.Event.UPDATED;
-                break;
-            case NodeChildrenChanged:
-            case DataWatchRemoved:
-            case ChildWatchRemoved:
-            default:
-                event = DataChangedEvent.Event.IGNORED;
+    @Override
+    public void shutdown() {
+        try {
+            //close treeCache
+            for (String key : cacheMap.keySet()) {
+                cacheMap.get(key).close();
+            }
+            client.close();
+        } catch (Exception e) {
+            throw new ShenyuException(e);
         }
-        return new DataChangedEvent(key, value, event);
+
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
index 955d579d2..8315de259 100644
--- 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
+++ 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
@@ -17,8 +17,8 @@
 
 package org.apache.shenyu.plugin.base.cache;
 
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
 import org.apache.shenyu.plugin.base.handler.ProxySelectorDataHandler;
 import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
 
@@ -39,7 +39,7 @@ public class CommonProxySelectorDataSubscriber implements 
ProxySelectorDataSubsc
     }
 
     @Override
-    public void onSubscribe(final ProxySelectorData proxySelectorData, final 
List<DiscoveryUpstream> upstreamsList) {
+    public void onSubscribe(final ProxySelectorData proxySelectorData, final 
List<DiscoveryUpstreamData> upstreamsList) {
         Optional.ofNullable(handlerMap.get(proxySelectorData.getPluginName()))
                 .ifPresent(handler -> 
handler.handlerProxySelector(proxySelectorData, upstreamsList));
     }
diff --git 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
index baf2b34fb..21567392f 100644
--- 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
@@ -17,8 +17,8 @@
 
 package org.apache.shenyu.plugin.base.handler;
 
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
 
 import java.util.List;
 
@@ -33,7 +33,7 @@ public interface ProxySelectorDataHandler {
      * @param selectorData  selectorData
      * @param upstreamsList upstreamsList
      */
-    void handlerProxySelector(ProxySelectorData selectorData, 
List<DiscoveryUpstream> upstreamsList);
+    void handlerProxySelector(ProxySelectorData selectorData, 
List<DiscoveryUpstreamData> upstreamsList);
 
 
     /**
diff --git 
a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
 
b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
index d9c6cdab8..454355ef0 100644
--- 
a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
+++ 
b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
@@ -17,8 +17,8 @@
 
 package org.apache.shenyu.plugin.tcp.handler;
 
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.plugin.base.handler.ProxySelectorDataHandler;
 import org.apache.shenyu.protocol.tcp.BootstrapServer;
@@ -41,7 +41,7 @@ public class TcpUpstreamDataHandler implements 
ProxySelectorDataHandler {
     private final Map<String, BootstrapServer> cache = new 
ConcurrentHashMap<>();
 
     @Override
-    public synchronized void handlerProxySelector(final ProxySelectorData 
proxySelectorData, final List<DiscoveryUpstream> upstreamsList) {
+    public synchronized void handlerProxySelector(final ProxySelectorData 
proxySelectorData, final List<DiscoveryUpstreamData> upstreamsList) {
         String name = proxySelectorData.getName();
         if (!cache.containsKey(name)) {
             Integer forwardPort = proxySelectorData.getForwardPort();
@@ -54,10 +54,10 @@ public class TcpUpstreamDataHandler implements 
ProxySelectorDataHandler {
             cache.put(name, bootstrapServer);
             LOG.info("shenyu create TcpBootstrapServer success port is {}", 
forwardPort);
         } else {
-            List<DiscoveryUpstream> removed = 
UpstreamProvider.getSingleton().refreshCache(name, upstreamsList);
+            List<DiscoveryUpstreamData> removed = 
UpstreamProvider.getSingleton().refreshCache(name, upstreamsList);
             BootstrapServer bootstrapServer = cache.get(name);
             bootstrapServer.removeCommonUpstream(removed);
-            LOG.info("shenyu update TcpBootstrapServer success remove is {}", 
removed);
+            LOG.info("shenyu update TcpBootstrapServer success upstream is 
{}", upstreamsList);
         }
     }
 
diff --git 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
index ff009d567..59f1f35b0 100644
--- 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
+++ 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
@@ -17,7 +17,7 @@
 
 package org.apache.shenyu.protocol.tcp;
 
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 
 import java.util.List;
 
@@ -38,7 +38,7 @@ public interface BootstrapServer {
      *
      * @param removeList removeList
      */
-    void removeCommonUpstream(List<DiscoveryUpstream> removeList);
+    void removeCommonUpstream(List<DiscoveryUpstreamData> removeList);
 
     /**
      * shutdown.
diff --git 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
index 0a25555c6..3d162d3c1 100644
--- 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
+++ 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
@@ -21,7 +21,7 @@ import com.google.common.eventbus.EventBus;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.protocol.tcp.connection.Bridge;
 import org.apache.shenyu.protocol.tcp.connection.ConnectionContext;
 import 
org.apache.shenyu.protocol.tcp.connection.DefaultConnectionConfigProvider;
@@ -66,7 +66,6 @@ public class TcpBootstrapServer implements BootstrapServer {
         connectionContext.init(tcpServerConfiguration.getProps());
         LoopResources loopResources = 
LoopResources.create("shenyu-tcp-bootstrap-server", 
tcpServerConfiguration.getBossGroupThreadCount(),
                 tcpServerConfiguration.getWorkerGroupThreadCount(), true);
-
         TcpServer tcpServer = TcpServer.create()
                 .doOnChannelInit((connObserver, channel, remoteAddress) -> 
channel.pipeline().addFirst(new LoggingHandler(LogLevel.INFO)))
                 .wiretap(true)
@@ -101,7 +100,7 @@ public class TcpBootstrapServer implements BootstrapServer {
      * @param removeList removeList
      */
     @Override
-    public void removeCommonUpstream(final List<DiscoveryUpstream> removeList) 
{
+    public void removeCommonUpstream(final List<DiscoveryUpstreamData> 
removeList) {
         eventBus.post(removeList);
     }
 
diff --git 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
index 116b1baed..acf3a7b06 100644
--- 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
+++ 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
@@ -17,11 +17,15 @@
 
 package org.apache.shenyu.protocol.tcp;
 
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /**
  * UpstreamProvider.
@@ -30,7 +34,7 @@ public final class UpstreamProvider {
 
     private static final UpstreamProvider SINGLETON = new UpstreamProvider();
 
-    private final Map<String, List<DiscoveryUpstream>> cache = new 
ConcurrentHashMap<>();
+    private final Map<String, List<DiscoveryUpstreamData>> cache = new 
ConcurrentHashMap<>();
 
     private UpstreamProvider() {
     }
@@ -50,8 +54,8 @@ public final class UpstreamProvider {
      * @param pluginSelectorName pluginSelectorName
      * @return UpstreamList
      */
-    public List<DiscoveryUpstream> provide(final String pluginSelectorName) {
-        return cache.get(pluginSelectorName);
+    public List<DiscoveryUpstreamData> provide(final String 
pluginSelectorName) {
+        return cache.getOrDefault(pluginSelectorName, new ArrayList<>());
     }
 
     /**
@@ -60,8 +64,9 @@ public final class UpstreamProvider {
      * @param pluginSelectorName pluginSelectorName
      * @param upstreams          upstreams
      */
-    public void createUpstreams(final String pluginSelectorName, final 
List<DiscoveryUpstream> upstreams) {
-        cache.put(pluginSelectorName, upstreams);
+    public void createUpstreams(final String pluginSelectorName, final 
List<DiscoveryUpstreamData> upstreams) {
+        List<DiscoveryUpstreamData> discoveryUpstreamDataList = 
Optional.ofNullable(upstreams).orElseGet(ArrayList::new);
+        cache.put(pluginSelectorName, discoveryUpstreamDataList);
     }
 
     /**
@@ -71,9 +76,11 @@ public final class UpstreamProvider {
      * @param upstreams          upstreams
      * @return removeList
      */
-    public List<DiscoveryUpstream> refreshCache(final String 
pluginSelectorName, final List<DiscoveryUpstream> upstreams) {
-        List<DiscoveryUpstream> remove = cache.remove(pluginSelectorName);
-        cache.put(pluginSelectorName, upstreams);
-        return remove;
+    public List<DiscoveryUpstreamData> refreshCache(final String 
pluginSelectorName, final List<DiscoveryUpstreamData> upstreams) {
+        List<DiscoveryUpstreamData> remove = cache.remove(pluginSelectorName);
+        List<DiscoveryUpstreamData> discoveryUpstreamDataList = 
Optional.ofNullable(upstreams).orElse(new ArrayList<>());
+        cache.put(pluginSelectorName, discoveryUpstreamDataList);
+        Set<String> urlSet = 
discoveryUpstreamDataList.stream().map(DiscoveryUpstreamData::getUrl).collect(Collectors.toSet());
+        return remove.stream().filter(r -> 
!urlSet.contains(r.getUrl())).collect(Collectors.toList());
     }
 }
diff --git 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
index 0cbdf5403..9c8b1c580 100644
--- 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
+++ 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
@@ -19,7 +19,7 @@ package org.apache.shenyu.protocol.tcp.connection;
 
 import com.google.common.eventbus.Subscribe;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.netty.Connection;
@@ -67,7 +67,7 @@ public class ActivityConnectionObserver implements 
ConnectionObserver {
      * @param remove removeList
      */
     @Subscribe
-    public void onRemove(final List<DiscoveryUpstream> remove) {
+    public void onRemove(final List<DiscoveryUpstreamData> remove) {
         LOG.info("shenyu {} ConnectionObserver  do on remove upstreams", name);
         for (Connection connection : cache.keySet()) {
             SocketAddress socketAddress = connection.channel().remoteAddress();
@@ -85,10 +85,10 @@ public class ActivityConnectionObserver implements 
ConnectionObserver {
      * @param cacheSocketAddress cacheSocketAddress
      * @return boolean
      */
-    private boolean in(final List<DiscoveryUpstream> removeList, final 
SocketAddress cacheSocketAddress) {
+    private boolean in(final List<DiscoveryUpstreamData> removeList, final 
SocketAddress cacheSocketAddress) {
         return removeList.stream().anyMatch(u -> {
             String cacheUrl = cacheSocketAddress.toString().substring(1);
-            String removedUrl = u.getUpstreamUrl();
+            String removedUrl = u.getUrl();
             LOG.info("compare {} , {}", cacheUrl, removedUrl);
             return StringUtils.equals(cacheUrl, removedUrl);
         });
diff --git 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
index 2a0e7b270..3f4240993 100644
--- 
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
+++ 
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.shenyu.protocol.tcp.connection;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.shenyu.common.exception.ShenyuException;
 import org.apache.shenyu.loadbalancer.entity.Upstream;
 import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 
@@ -48,10 +50,12 @@ public class DefaultConnectionConfigProvider implements 
ClientConnectionConfigPr
 
     @Override
     public URI getProxiedService(final String ip) {
-        List<Upstream> upstreamList = 
UpstreamProvider.getSingleton().provide(this.pluginSelectorName)
-                .stream()
-                .map(dp -> 
Upstream.builder().url(dp.getUpstreamUrl()).status(dp.isStatus()).weight(dp.getWeight()).protocol(dp.getProtocol()).build())
-                .collect(Collectors.toList());
+        List<Upstream> upstreamList = 
UpstreamProvider.getSingleton().provide(this.pluginSelectorName).stream().map(dp
 -> {
+            return 
Upstream.builder().url(dp.getUrl()).status(Objects.equals(dp.getStatus(), 
1)).weight(dp.getWeight()).protocol(dp.getProtocol()).build();
+        }).collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(upstreamList)) {
+            throw new ShenyuException("shenyu TcpProxy don't have any 
upstream");
+        }
         Upstream upstream = LoadBalancerFactory.selector(upstreamList, 
loadBalanceAlgorithm, ip);
         return cover(upstream);
     }
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
 
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
index ac2b9d92a..32185a657 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
@@ -17,8 +17,8 @@
 
 package org.apache.shenyu.sync.data.api;
 
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
 
 import java.util.List;
 
@@ -31,9 +31,9 @@ public interface ProxySelectorDataSubscriber {
      * On subscribe.
      *
      * @param proxySelectorData the proxySelector data
-     * @param upstreamsList upstreamsList
+     * @param upstreamsList     upstreamsList
      */
-    void onSubscribe(ProxySelectorData proxySelectorData, 
List<DiscoveryUpstream> upstreamsList);
+    void onSubscribe(ProxySelectorData proxySelectorData, 
List<DiscoveryUpstreamData> upstreamsList);
 
     /**
      * Un subscribe.
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
index ffa77ed6a..4b5b9950d 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
@@ -25,7 +25,7 @@ import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
 import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
@@ -169,12 +169,12 @@ public class ApolloDataService implements SyncDataService 
{
      * subscriber proxy selector data.
      * @return proxy selector data list
      */
-    public List<ProxySelectorData> subscriberProxySelectorData() {
-        List<ProxySelectorData> proxySelectorDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configService.getProperty(ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
 "{}"),
-                ProxySelectorData.class).values());
-        proxySelectorDataList.forEach(proxySelectorData -> 
proxySelectorDataSubscribers.forEach(subscriber -> {
-            subscriber.unSubscribe(proxySelectorData);
-            subscriber.onSubscribe(proxySelectorData, 
proxySelectorData.getDiscoveryUpstreamList());
+    public List<DiscoverySyncData> subscriberProxySelectorData() {
+        List<DiscoverySyncData> proxySelectorDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configService.getProperty(ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
 "{}"),
+                DiscoverySyncData.class).values());
+        proxySelectorDataList.forEach(discoverySyncData -> 
proxySelectorDataSubscribers.forEach(subscriber -> {
+            subscriber.unSubscribe(discoverySyncData.getProxySelectorData());
+            subscriber.onSubscribe(discoverySyncData.getProxySelectorData(), 
discoverySyncData.getUpstreamDataList());
         }));
         return proxySelectorDataList;
     }
@@ -206,8 +206,8 @@ public class ApolloDataService implements SyncDataService {
                     LOG.info("apollo listener authData: {}", appAuthDataList);
                     break;
                 case ApolloPathConstants.PROXY_SELECTOR_DATA_ID:
-                    List<ProxySelectorData> proxySelectorDataList = 
subscriberProxySelectorData();
-                    LOG.info("apollo listener ProxySelectorData: {}", 
proxySelectorDataList);
+                    List<DiscoverySyncData> discoverySyncData = 
subscriberProxySelectorData();
+                    LOG.info("apollo listener ProxySelectorData: {}", 
discoverySyncData);
                     break;
                 default:
                     break;
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
 
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
index 66c16e0bf..6dd635a6f 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
@@ -29,6 +29,8 @@ import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.common.utils.MapUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
@@ -154,10 +156,12 @@ public class NacosCacheHandler {
 
     protected void updateProxySelectorMap(final String configInfo) {
         try {
-            List<ProxySelectorData> proxySelectorDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
ProxySelectorData.class).values());
-            proxySelectorDataList.forEach(proxySelectorData -> 
proxySelectorDataSubscribers.forEach(subscriber -> {
+            List<DiscoverySyncData> discoverySyncDataList = new 
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, 
DiscoverySyncData.class).values());
+            discoverySyncDataList.forEach(discoverySyncData -> 
proxySelectorDataSubscribers.forEach(subscriber -> {
+                ProxySelectorData proxySelectorData = 
discoverySyncData.getProxySelectorData();
+                List<DiscoveryUpstreamData> upstreamDataList = 
discoverySyncData.getUpstreamDataList();
                 subscriber.unSubscribe(proxySelectorData);
-                subscriber.onSubscribe(proxySelectorData, 
proxySelectorData.getDiscoveryUpstreamList());
+                subscriber.onSubscribe(proxySelectorData, upstreamDataList);
             }));
         } catch (JsonParseException e) {
             LOG.error("sync proxy selector data have error", e);
diff --git 
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
 
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
index 81c3cb4b4..c9caea41b 100644
--- 
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
+++ 
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
@@ -25,12 +25,14 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
 import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.shenyu.common.constant.DefaultPathConstants;
+import org.apache.shenyu.common.dto.AppAuthData;
 import org.apache.shenyu.common.dto.PluginData;
 import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
 import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
 import org.apache.shenyu.common.exception.ShenyuException;
 import org.apache.shenyu.common.utils.GsonUtils;
 import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
@@ -61,7 +63,6 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
 
     private final List<ProxySelectorDataSubscriber> 
proxySelectorDataSubscribers;
 
-
     /**
      * Instantiates a new Zookeeper cache manager.
      *
@@ -89,7 +90,7 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
         zkClient.addCache(DefaultPathConstants.PLUGIN_PARENT, new 
PluginCacheListener());
         zkClient.addCache(DefaultPathConstants.SELECTOR_PARENT, new 
SelectorCacheListener());
         zkClient.addCache(DefaultPathConstants.RULE_PARENT, new 
RuleCacheListener());
-        zkClient.addCache(DefaultPathConstants.PROXY_SELECTOR_DATA, new 
ProxySelectorCacheListener());
+        zkClient.addCache(DefaultPathConstants.PROXY_SELECTOR, new 
ProxySelectorCacheListener());
     }
 
     private void watchAppAuth() {
@@ -162,9 +163,9 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
                 .ifPresent(data -> metaDataSubscribers.forEach(e -> 
e.onSubscribe(metaData)));
     }
 
-    private void cacheProxySelectorData(final ProxySelectorData 
proxySelectorData) {
+    private void cacheProxySelectorData(final ProxySelectorData 
proxySelectorData, final List<DiscoveryUpstreamData> discoveryUpstreamDataList) 
{
         Optional.ofNullable(proxySelectorData)
-                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.onSubscribe(proxySelectorData, 
proxySelectorData.getDiscoveryUpstreamList())));
+                .ifPresent(data -> proxySelectorDataSubscribers.forEach(e -> 
e.onSubscribe(proxySelectorData, discoveryUpstreamDataList)));
     }
 
     private void unCacheMetaData(final MetaData metaData) {
@@ -330,7 +331,7 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
         @Override
         protected void event(final TreeCacheEvent.Type type, final String 
path, final ChildData data) {
             // if not uri register path, return.
-            if (!path.contains(DefaultPathConstants.PROXY_SELECTOR_DATA)) {
+            if (!path.contains(DefaultPathConstants.PROXY_SELECTOR)) {
                 return;
             }
             String[] pathInfoArray = path.split("/");
@@ -346,12 +347,13 @@ public class ZookeeperSyncDataService implements 
SyncDataService {
                 unCacheProxySelectorData(proxySelectorData);
                 return;
             }
-            ProxySelectorData proxySelectorData = 
GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), ProxySelectorData.class);
+            DiscoverySyncData discoverySyncData = 
GsonUtils.getInstance().fromJson(new String(data.getData(), 
StandardCharsets.UTF_8), DiscoverySyncData.class);
+            ProxySelectorData proxySelectorData = 
discoverySyncData.getProxySelectorData();
             proxySelectorData.setName(proxySelectorName);
             proxySelectorData.setPluginName(pluginName);
             // create or update
             Optional.ofNullable(data)
-                    .ifPresent(e -> cacheProxySelectorData(proxySelectorData));
+                    .ifPresent(e -> cacheProxySelectorData(proxySelectorData, 
discoverySyncData.getUpstreamDataList()));
 
         }
     }

Reply via email to