This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 7550751bb [type:feat] Admin sync data (#4735)
7550751bb is described below
commit 7550751bb9c7317d14d506900adbc763085bd027
Author: 杨文杰 <[email protected]>
AuthorDate: Wed Jun 21 22:46:45 2023 +0800
[type:feat] Admin sync data (#4735)
* admin data handler
* add handler
add handler
* discovery sync
* discovery sync
* discovery sync
* admin data handler
* admin data handler
* admin data handler
* admin data handler
* admin data handler
* discovery sync
* change zk discovery
* merge 代码
* Structure the code
* Structure the code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* change code
* trigger ci
* trigger ci
* trigger ci
* change org.apache.shenyu.admin.listener.DataChangedEvent
---------
Co-authored-by: xiaoyu <[email protected]>
Co-authored-by: moremind <[email protected]>
---
shenyu-admin/pom.xml | 6 +
.../apache/shenyu/admin/ShenyuAdminBootstrap.java | 1 +
.../admin/config/DiscoveryConfiguration.java | 73 +++++++
.../admin/discovery/DefaultDiscoveryProcessor.java | 166 +++++++++++++++
.../DiscoveryDataChangedEventSyncListener.java | 142 +++++++++++++
.../shenyu/admin/discovery/DiscoveryMode.java | 18 +-
.../shenyu/admin/discovery/DiscoveryProcessor.java | 101 +++++++++
.../admin/discovery/DiscoveryProcessorHolder.java | 41 ++--
.../admin/discovery/LocalDiscoveryProcessor.java | 77 +++++++
.../parse/CustomDiscoveryUpstreamParser.java | 118 +++++++++++
.../admin/discovery/parse/KeyValueParser.java | 27 ++-
.../listener/AbstractListDataChangedListener.java | 43 ++--
.../listener/AbstractNodeDataChangedListener.java | 21 +-
.../admin/listener/DataChangedEventDispatcher.java | 7 +-
.../shenyu/admin/listener/DataChangedListener.java | 6 +-
.../admin/mapper/DiscoveryUpstreamMapper.java | 18 ++
.../shenyu/admin/mapper/ProxySelectorMapper.java | 8 +
.../admin/model/dto/DiscoveryHandlerDTO.java | 121 +++++++++++
.../shenyu/admin/transfer/DiscoveryTransfer.java | 71 +++++++
shenyu-admin/src/main/resources/application.yml | 8 +-
.../mappers/discovery-upstream-sqlmap.xml | 23 ++
.../resources/mappers/proxy-selector-sqlmap.xml | 7 +
.../src/main/resources/sql-script/h2/schema.sql | 29 ++-
shenyu-bootstrap/pom.xml | 12 +-
.../common/constant/DefaultPathConstants.java | 15 +-
.../shenyu/common/dto/DiscoverySyncData.java | 64 ++++++
.../shenyu/common/dto/DiscoveryUpstreamData.java | 232 +++++++++++++++++++++
.../shenyu/common/dto/ProxySelectorData.java | 23 --
.../shenyu/common/enums/ConfigGroupEnum.java | 5 +-
.../discovery/api/ShenyuDiscoveryService.java | 12 ++
.../api/listener/DataChangedEventListener.java | 2 +-
...edEvent.java => DiscoveryDataChangedEvent.java} | 4 +-
.../zookeeper/ZookeeperDiscoveryService.java | 100 +++++----
.../cache/CommonProxySelectorDataSubscriber.java | 4 +-
.../base/handler/ProxySelectorDataHandler.java | 4 +-
.../plugin/tcp/handler/TcpUpstreamDataHandler.java | 8 +-
.../shenyu/protocol/tcp/BootstrapServer.java | 4 +-
.../shenyu/protocol/tcp/TcpBootstrapServer.java | 5 +-
.../shenyu/protocol/tcp/UpstreamProvider.java | 27 ++-
.../tcp/connection/ActivityConnectionObserver.java | 8 +-
.../DefaultConnectionConfigProvider.java | 12 +-
.../sync/data/api/ProxySelectorDataSubscriber.java | 6 +-
.../shenyu/sync/data/apollo/ApolloDataService.java | 18 +-
.../sync/data/nacos/handler/NacosCacheHandler.java | 10 +-
.../data/zookeeper/ZookeeperSyncDataService.java | 20 +-
45 files changed, 1507 insertions(+), 220 deletions(-)
diff --git a/shenyu-admin/pom.xml b/shenyu-admin/pom.xml
index ff267bbda..e1def0f43 100644
--- a/shenyu-admin/pom.xml
+++ b/shenyu-admin/pom.xml
@@ -280,6 +280,12 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-discovery-zookeeper</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
index 568e3e4ed..9bd502620 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/ShenyuAdminBootstrap.java
@@ -35,4 +35,5 @@ public class ShenyuAdminBootstrap {
public static void main(final String[] args) {
SpringApplication.run(ShenyuAdminBootstrap.class, args);
}
+
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/config/DiscoveryConfiguration.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/config/DiscoveryConfiguration.java
new file mode 100644
index 000000000..a90b0d6e5
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/config/DiscoveryConfiguration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.config;
+
+import org.apache.shenyu.admin.discovery.DefaultDiscoveryProcessor;
+import org.apache.shenyu.admin.discovery.DiscoveryProcessor;
+import org.apache.shenyu.admin.discovery.DiscoveryProcessorHolder;
+import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
+import org.apache.shenyu.admin.mapper.ProxySelectorMapper;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * DiscoveryConfiguration.
+ */
+@Configuration
+public class DiscoveryConfiguration {
+
+ /**
+ * discoveryProcessor.
+ *
+ * @param discoveryUpstreamMapper discoveryUpstreamMapper
+ * @param proxySelectorMapper proxySelectorMapper
+ * @return DiscoveryProcessor
+ */
+ @Bean("DefaultDiscoveryProcessor")
+ public DiscoveryProcessor discoveryDefaultProcessor(final
DiscoveryUpstreamMapper discoveryUpstreamMapper, final ProxySelectorMapper
proxySelectorMapper) {
+ return new DefaultDiscoveryProcessor(discoveryUpstreamMapper,
proxySelectorMapper);
+ }
+
+ /**
+ * discoveryLocalProcessor.
+ *
+ * @param discoveryUpstreamMapper discoveryUpstreamMapper
+ * @param proxySelectorMapper proxySelectorMapper
+ * @return LocalDiscoveryProcessor
+ */
+ @Bean("LocalDiscoveryProcessor")
+ public DiscoveryProcessor discoveryLocalProcessor(final
DiscoveryUpstreamMapper discoveryUpstreamMapper, final ProxySelectorMapper
proxySelectorMapper) {
+ return new DefaultDiscoveryProcessor(discoveryUpstreamMapper,
proxySelectorMapper);
+ }
+
+ /**
+ * discoveryProcessorHolder.
+ *
+ * @param defaultDiscoveryProcessor defaultDiscoveryProcessor
+ * @param localDiscoveryProcessor localDiscoveryProcessor
+ * @return DiscoveryProcessorHolder
+ */
+ @Bean
+ public DiscoveryProcessorHolder
discoveryProcessorHolder(@Qualifier("DefaultDiscoveryProcessor") final
DiscoveryProcessor defaultDiscoveryProcessor,
+
@Qualifier("LocalDiscoveryProcessor") final DiscoveryProcessor
localDiscoveryProcessor
+ ) {
+ return new DiscoveryProcessorHolder(defaultDiscoveryProcessor,
localDiscoveryProcessor);
+ }
+
+}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
new file mode 100644
index 000000000..4fbd5d470
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DefaultDiscoveryProcessor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.admin.discovery.parse.CustomDiscoveryUpstreamParser;
+import org.apache.shenyu.admin.listener.DataChangedEvent;
+import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
+import org.apache.shenyu.admin.mapper.ProxySelectorMapper;
+import org.apache.shenyu.admin.model.dto.DiscoveryHandlerDTO;
+import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
+import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
+import org.apache.shenyu.admin.model.entity.DiscoveryDO;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.enums.DataEventTypeEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
+import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+import org.apache.shenyu.spi.ExtensionLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * DefaultDiscoveryProcessor.
+ */
+public class DefaultDiscoveryProcessor implements DiscoveryProcessor,
ApplicationEventPublisherAware {
+
+ private static final String KEY_TEMPLATE = "%s/%s/%s/%s";
+
+ private static final String DEFAULT_LISTENER_NODE = "/shenyu/discovery";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultDiscoveryProcessor.class);
+
+ private final Map<String, ShenyuDiscoveryService> discoveryServiceCache;
+
+ private ApplicationEventPublisher eventPublisher;
+
+ private final DiscoveryUpstreamMapper discoveryUpstreamMapper;
+
+ private final ProxySelectorMapper proxySelectorMapper;
+
+ /**
+ * DefaultDiscoveryProcessor.
+ *
+ * @param discoveryUpstreamMapper discoveryUpstreamMapper
+ * @param proxySelectorMapper proxySelectorMapper
+ */
+ public DefaultDiscoveryProcessor(final DiscoveryUpstreamMapper
discoveryUpstreamMapper, final ProxySelectorMapper proxySelectorMapper) {
+ this.discoveryUpstreamMapper = discoveryUpstreamMapper;
+ this.proxySelectorMapper = proxySelectorMapper;
+ this.discoveryServiceCache = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void createDiscovery(final DiscoveryDO discoveryDO) {
+ String type = discoveryDO.getType();
+ ShenyuDiscoveryService discoveryService =
ExtensionLoader.getExtensionLoader(ShenyuDiscoveryService.class).getJoin(type);
+ String props = discoveryDO.getProps();
+ DiscoveryConfig discoveryConfig = GsonUtils.getGson().fromJson(props,
DiscoveryConfig.class);
+ discoveryConfig.setServerList(discoveryDO.getServiceList());
+ discoveryService.init(discoveryConfig);
+ discoveryServiceCache.put(discoveryDO.getId(), discoveryService);
+ }
+
+ @Override
+ public void createProxySelector(final DiscoveryHandlerDTO
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+ ShenyuDiscoveryService shenyuDiscoveryService =
discoveryServiceCache.get(discoveryHandlerDTO.getDiscoveryId());
+ if (Objects.isNull(shenyuDiscoveryService)) {
+ LOG.warn("before start ProxySelector you need init
DiscoveryId={}", discoveryHandlerDTO.getDiscoveryId());
+ return;
+ }
+ String key = buildProxySelectorKey(discoveryHandlerDTO,
proxySelectorDTO);
+ if (StringUtils.isEmpty(shenyuDiscoveryService.getData(key))) {
+ LOG.info("shenyu discovery {} is empty need register it ", key);
+ shenyuDiscoveryService.register(key,
GsonUtils.getInstance().toJson(proxySelectorDTO));
+ }
+ shenyuDiscoveryService.watcher(key,
getDiscoveryDataChangedEventListener(proxySelectorDTO.getType(),
discoveryHandlerDTO.getProps()));
+ DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE,
Collections.singletonList(covert(proxySelectorDTO, null)));
+ eventPublisher.publishEvent(dataChangedEvent);
+ }
+
+ /**
+ * removeDiscovery by ShenyuDiscoveryService#shutdown .
+ *
+ * @param discoveryDO discoveryDO
+ */
+ @Override
+ public void removeDiscovery(final DiscoveryDO discoveryDO) {
+ ShenyuDiscoveryService shenyuDiscoveryService =
discoveryServiceCache.get(discoveryDO.getId());
+ shenyuDiscoveryService.shutdown();
+ }
+
+ /**
+ * removeProxySelector.
+ *
+ * @param proxySelectorDTO proxySelectorDTO
+ */
+ @Override
+ public void removeProxySelector(final DiscoveryHandlerDTO
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+ ShenyuDiscoveryService shenyuDiscoveryService =
discoveryServiceCache.get(discoveryHandlerDTO.getDiscoveryId());
+ String key = buildProxySelectorKey(discoveryHandlerDTO,
proxySelectorDTO);
+ shenyuDiscoveryService.unWatcher(key);
+ DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE,
Collections.singletonList(covert(proxySelectorDTO, null)));
+ eventPublisher.publishEvent(dataChangedEvent);
+ }
+
+ @Override
+ public void changeUpstream(final DiscoveryHandlerDTO discoveryHandlerDTO,
final ProxySelectorDTO proxySelectorDTO, final List<DiscoveryUpstreamDTO>
upstreamDTOS) {
+ throw new NotImplementedException("shenyu discovery local mode do
nothing in changeUpstream");
+ }
+
+ /**
+ * buildProxySelectorKey.
+ *
+ * @param discoveryHandlerDTO discoveryHandlerDTO
+ * @param proxySelectorDTO proxySelectorDTO
+ * @return key
+ */
+ private String buildProxySelectorKey(final DiscoveryHandlerDTO
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+ String key =
StringUtils.isBlank(discoveryHandlerDTO.getListenerNode()) ?
DEFAULT_LISTENER_NODE : discoveryHandlerDTO.getListenerNode();
+ return String.format(KEY_TEMPLATE, key,
proxySelectorDTO.getPluginName(), proxySelectorDTO.getId(),
discoveryHandlerDTO.getId());
+ }
+
+ /**
+ * getDiscoveryDataChangedEventListener.
+ *
+ * @param discoveryType discoveryType
+ * @param customProps customProps
+ * @return DataChangedEventListener
+ */
+ private DataChangedEventListener
getDiscoveryDataChangedEventListener(final String discoveryType, final String
customProps) {
+ Map<String, String> customMap =
GsonUtils.getInstance().toObjectMap(customProps, String.class);
+ return new DiscoveryDataChangedEventSyncListener(eventPublisher,
discoveryUpstreamMapper,
+ new CustomDiscoveryUpstreamParser(customMap,
proxySelectorMapper),
!DiscoveryMode.LOCAL.name().equalsIgnoreCase(discoveryType));
+ }
+
+ @Override
+ public void setApplicationEventPublisher(final ApplicationEventPublisher
eventPublisher) {
+ this.eventPublisher = eventPublisher;
+ }
+}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryDataChangedEventSyncListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryDataChangedEventSyncListener.java
new file mode 100644
index 000000000..9e5229450
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryDataChangedEventSyncListener.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.admin.discovery.parse.KeyValueParser;
+import org.apache.shenyu.admin.listener.DataChangedEvent;
+import org.apache.shenyu.admin.mapper.DiscoveryUpstreamMapper;
+import org.apache.shenyu.admin.model.entity.DiscoveryUpstreamDO;
+import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.enums.DataEventTypeEnum;
+import org.apache.shenyu.common.utils.UUIDUtils;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
+import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+
+import java.sql.Timestamp;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * DiscoveryHandler.
+ */
+public class DiscoveryDataChangedEventSyncListener implements
DataChangedEventListener {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DiscoveryDataChangedEventSyncListener.class);
+
+ private final KeyValueParser keyValueParser;
+
+ private final ApplicationEventPublisher eventPublisher;
+
+ private final DiscoveryUpstreamMapper discoveryUpstreamMapper;
+
+ private final Boolean needPersistence;
+
+ public DiscoveryDataChangedEventSyncListener(final
ApplicationEventPublisher eventPublisher,
+ final DiscoveryUpstreamMapper
discoveryUpstreamMapper,
+ final KeyValueParser
keyValueParser,
+ final Boolean
needPersistence) {
+ this.eventPublisher = eventPublisher;
+ this.keyValueParser = keyValueParser;
+ this.discoveryUpstreamMapper = discoveryUpstreamMapper;
+ this.needPersistence = needPersistence;
+ }
+
+ @Override
+ public void onChange(final DiscoveryDataChangedEvent event) {
+ DiscoveryDataChangedEvent.Event currentEvent = event.getEvent();
+ if (DiscoveryDataChangedEvent.Event.IGNORED.equals(currentEvent)) {
+ return;
+ }
+ DiscoverySyncData discoverySyncData =
buildProxySelectorData(event.getKey(), event.getValue());
+ DataChangedEvent dataChangedEvent = null;
+ List<DiscoveryUpstreamData> upstreamDataList =
discoverySyncData.getUpstreamDataList();
+ if (needPersistence) {
+ if (CollectionUtils.isEmpty(upstreamDataList)) {
+ LOGGER.warn("shenyu proxySelectorData#discoveryUpstreamList is
empty");
+ return;
+ }
+ switch (currentEvent) {
+ case ADDED:
+ upstreamDataList.forEach(d -> {
+ d.setId(UUIDUtils.getInstance().generateShortUuid());
+ d.setDateCreated(new
Timestamp(System.currentTimeMillis()));
+ d.setDateUpdated(new
Timestamp(System.currentTimeMillis()));
+
discoveryUpstreamMapper.insert(DiscoveryTransfer.INSTANCE.mapToDo(d));
+ LOGGER.info("shenyu
[DiscoveryDataChangedEventSyncListener] ADDED Upstream {}", d.getUrl());
+ });
+ fillFullyDiscoverySyncData(discoverySyncData);
+ dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE,
Collections.singletonList(discoverySyncData));
+ break;
+ case UPDATED:
+ upstreamDataList.forEach(d -> {
+ DiscoveryUpstreamDO discoveryUpstreamDO =
DiscoveryTransfer.INSTANCE.mapToDo(d);
+ discoveryUpstreamMapper.update(discoveryUpstreamDO);
+ LOGGER.info("shenyu
[DiscoveryDataChangedEventSyncListener] UPDATE Upstream {}",
discoveryUpstreamDO.getUrl());
+ });
+ fillFullyDiscoverySyncData(discoverySyncData);
+ dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(discoverySyncData));
+ break;
+ case DELETED:
+ if (CollectionUtils.isNotEmpty(upstreamDataList)) {
+ upstreamDataList.forEach(up -> {
+ discoveryUpstreamMapper.deleteByUrl(up.getUrl());
+ LOGGER.info("shenyu
[DiscoveryDataChangedEventSyncListener] DELETE Upstream {}", up.getUrl());
+ });
+ }
+ fillFullyDiscoverySyncData(discoverySyncData);
+ dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(discoverySyncData));
+ break;
+ default:
+ throw new IllegalStateException("shenyu
DiscoveryDataChangedEventSyncListener find IllegalState");
+ }
+ }
+ if (Objects.nonNull(dataChangedEvent)) {
+ eventPublisher.publishEvent(dataChangedEvent);
+ }
+ }
+
+ private void fillFullyDiscoverySyncData(final DiscoverySyncData
discoverySyncData) {
+ ProxySelectorData proxySelectorData =
discoverySyncData.getProxySelectorData();
+ List<DiscoveryUpstreamDO> discoveryUpstreamDOS =
discoveryUpstreamMapper.selectByProxySelectorId(proxySelectorData.getId());
+ List<DiscoveryUpstreamData> collect =
discoveryUpstreamDOS.stream().map(DiscoveryTransfer.INSTANCE::mapToData).collect(Collectors.toList());
+ discoverySyncData.setUpstreamDataList(collect);
+ }
+
+ private DiscoverySyncData buildProxySelectorData(final String key, final
String value) {
+ List<DiscoveryUpstreamData> discoveryUpstreamDTOS =
keyValueParser.parseValue(value);
+ ProxySelectorData proxySelectorData = keyValueParser.parseKey(key);
+ String[] split = key.split("/");
+ String discoveryHandleId = split[split.length - 2];
+ discoveryUpstreamDTOS.forEach(s ->
s.setDiscoveryHandlerId(discoveryHandleId));
+ DiscoverySyncData data = new DiscoverySyncData();
+ data.setUpstreamDataList(discoveryUpstreamDTOS);
+ data.setProxySelectorData(proxySelectorData);
+ return data;
+ }
+
+}
diff --git
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
similarity index 74%
copy from
shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
copy to
shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
index 33580bb90..8c0175321 100644
---
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryMode.java
@@ -15,17 +15,11 @@
* limitations under the License.
*/
-package org.apache.shenyu.discovery.api.listener;
+package org.apache.shenyu.admin.discovery;
-/**
- * Data changed listener.
- */
-public interface DataChangedEventListener {
-
- /**
- * when data changed, fire this event.
- *
- * @param event data changed event
- */
- void onChange(DataChangedEvent event);
+public enum DiscoveryMode {
+ LOCAL,
+ ZOOKEEPER,
+ NACOS,
+ EUREKA
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessor.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessor.java
new file mode 100644
index 000000000..a5d86b3f4
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.shenyu.admin.model.dto.DiscoveryHandlerDTO;
+import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
+import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
+import org.apache.shenyu.admin.model.entity.DiscoveryDO;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.springframework.beans.BeanUtils;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * DiscoveryProcessor.
+ */
+public interface DiscoveryProcessor {
+
+ /**
+ * createDiscovery.
+ *
+ * @param discoveryDO discoveryDO
+ */
+ void createDiscovery(DiscoveryDO discoveryDO);
+
+ /**
+ * createProxySelector.
+ *
+ * @param discoveryHandlerDTO discoveryHandlerDTO
+ * @param proxySelectorDTO proxySelectorDTO
+ */
+ void createProxySelector(DiscoveryHandlerDTO discoveryHandlerDTO,
ProxySelectorDTO proxySelectorDTO);
+
+ /**
+ * removeDiscovery.
+ *
+ * @param discoveryDO discoveryDO
+ */
+ void removeDiscovery(DiscoveryDO discoveryDO);
+
+ /**
+ * removeProxySelector.
+ *
+ * @param discoveryHandlerDTO discoveryHandlerDTO
+ * @param proxySelectorDTO proxySelectorDTO
+ */
+ void removeProxySelector(DiscoveryHandlerDTO discoveryHandlerDTO,
ProxySelectorDTO proxySelectorDTO);
+
+
+ /**
+ * only use in local mode to sync upstreamDTOS.
+ *
+ * @param discoveryHandlerDTO discoveryHandlerDTO
+ * @param proxySelectorDTO proxySelectorDTO
+ * @param upstreamDTOS upstreamDTOS
+ */
+ void changeUpstream(DiscoveryHandlerDTO discoveryHandlerDTO,
ProxySelectorDTO proxySelectorDTO, List<DiscoveryUpstreamDTO> upstreamDTOS);
+
+ /**
+ * covert.
+ *
+ * @param proxySelectorDTO proxySelectorDTO
+ * @param upstreamDTOS upstreamDTOS
+ * @return DiscoverySyncData
+ */
+ default DiscoverySyncData covert(final ProxySelectorDTO proxySelectorDTO,
final List<DiscoveryUpstreamDTO> upstreamDTOS) {
+ ProxySelectorData proxySelectorData = new ProxySelectorData();
+ BeanUtils.copyProperties(proxySelectorDTO, proxySelectorData);
+ DiscoverySyncData discoverySyncData = new DiscoverySyncData();
+ discoverySyncData.setProxySelectorData(proxySelectorData);
+ if (CollectionUtils.isNotEmpty(upstreamDTOS)) {
+ List<DiscoveryUpstreamData> collect = upstreamDTOS.stream().map(up
-> {
+ DiscoveryUpstreamData discoveryUpstreamData = new
DiscoveryUpstreamData();
+ BeanUtils.copyProperties(up, discoveryUpstreamData);
+ return discoveryUpstreamData;
+ }).collect(Collectors.toList());
+ discoverySyncData.setUpstreamDataList(collect);
+ }
+ return discoverySyncData;
+ }
+
+}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
similarity index 50%
copy from
shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
copy to
shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
index ac2b9d92a..e95eb8752 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/DiscoveryProcessorHolder.java
@@ -15,36 +15,31 @@
* limitations under the License.
*/
-package org.apache.shenyu.sync.data.api;
+package org.apache.shenyu.admin.discovery;
-import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+public class DiscoveryProcessorHolder {
-import java.util.List;
+ private final DiscoveryProcessor defaultDiscoveryProcessor;
-/**
- * ProxySelectorDataSubscriber.
- */
-public interface ProxySelectorDataSubscriber {
+ private final DiscoveryProcessor localDiscoveryProcessor;
- /**
- * On subscribe.
- *
- * @param proxySelectorData the proxySelector data
- * @param upstreamsList upstreamsList
- */
- void onSubscribe(ProxySelectorData proxySelectorData,
List<DiscoveryUpstream> upstreamsList);
+ public DiscoveryProcessorHolder(final DiscoveryProcessor
defaultDiscoveryProcessor, final DiscoveryProcessor localDiscoveryProcessor) {
+ this.defaultDiscoveryProcessor = defaultDiscoveryProcessor;
+ this.localDiscoveryProcessor = localDiscoveryProcessor;
+ }
/**
- * Un subscribe.
+ * chooseProcessor.
*
- * @param proxySelectorData the proxySelector data
+ * @param mode mode
+ * @return DiscoveryProcessor
*/
- void unSubscribe(ProxySelectorData proxySelectorData);
-
- /**
- * Refresh.
- */
- default void refresh() {
+ public DiscoveryProcessor chooseProcessor(final String mode) {
+ if (DiscoveryMode.LOCAL.name().equalsIgnoreCase(mode)) {
+ return localDiscoveryProcessor;
+ } else {
+ return defaultDiscoveryProcessor;
+ }
}
+
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
new file mode 100644
index 000000000..2d91b4f59
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery;
+
+import org.apache.shenyu.admin.listener.DataChangedEvent;
+import org.apache.shenyu.admin.model.dto.DiscoveryHandlerDTO;
+import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
+import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
+import org.apache.shenyu.admin.model.entity.DiscoveryDO;
+import org.apache.shenyu.common.enums.ConfigGroupEnum;
+import org.apache.shenyu.common.enums.DataEventTypeEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * LocalDiscoveryProcessor.
+ */
+public class LocalDiscoveryProcessor implements DiscoveryProcessor,
ApplicationEventPublisherAware {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalDiscoveryProcessor.class);
+
+ private ApplicationEventPublisher eventPublisher;
+
+ @Override
+ public void createDiscovery(final DiscoveryDO discoveryDO) {
+ LOG.info("shenyu discovery local mode do nothing in createDiscovery");
+ }
+
+ @Override
+ public void createProxySelector(final DiscoveryHandlerDTO
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+ DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.CREATE,
Collections.singletonList(covert(proxySelectorDTO, null)));
+ eventPublisher.publishEvent(dataChangedEvent);
+ }
+
+ @Override
+ public void removeDiscovery(final DiscoveryDO discoveryDO) {
+ LOG.info("shenyu discovery local mode do nothing in removeDiscovery");
+ }
+
+ @Override
+ public void removeProxySelector(final DiscoveryHandlerDTO
discoveryHandlerDTO, final ProxySelectorDTO proxySelectorDTO) {
+ DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE,
Collections.singletonList(covert(proxySelectorDTO, null)));
+ eventPublisher.publishEvent(dataChangedEvent);
+ }
+
+ @Override
+ public void changeUpstream(final DiscoveryHandlerDTO discoveryHandlerDTO,
final ProxySelectorDTO proxySelectorDTO, final List<DiscoveryUpstreamDTO>
upstreamDTOS) {
+ DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.PROXY_SELECTOR, DataEventTypeEnum.DELETE,
Collections.singletonList(covert(proxySelectorDTO, upstreamDTOS)));
+ eventPublisher.publishEvent(dataChangedEvent);
+ }
+
+ @Override
+ public void setApplicationEventPublisher(final ApplicationEventPublisher
applicationEventPublisher) {
+ this.eventPublisher = applicationEventPublisher;
+ }
+
+}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/CustomDiscoveryUpstreamParser.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/CustomDiscoveryUpstreamParser.java
new file mode 100644
index 000000000..e2001d9bd
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/CustomDiscoveryUpstreamParser.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.discovery.parse;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shenyu.admin.mapper.ProxySelectorMapper;
+import org.apache.shenyu.admin.model.entity.ProxySelectorDO;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.utils.GsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeanUtils;
+
+import java.lang.reflect.Type;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DiscoveryUpstreamParser.
+ *
+ * <p>
+ * You can define a custom map mapper if your custom upstream doesn't fit
+ * </p>
+ */
+public class CustomDiscoveryUpstreamParser implements
JsonDeserializer<DiscoveryUpstreamData>, KeyValueParser {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CustomDiscoveryUpstreamParser.class);
+
+ private final Map<String, String> conversion;
+
+ private final ProxySelectorMapper proxySelectorMapper;
+
+ /**
+ * CustomDiscoveryUpstreamParser.
+ *
+ * @param conversion conversion
+ * @param proxySelectorMapper proxySelectorMapper
+ */
+ public CustomDiscoveryUpstreamParser(final Map<String, String> conversion,
final ProxySelectorMapper proxySelectorMapper) {
+ this.conversion = conversion;
+ this.proxySelectorMapper = proxySelectorMapper;
+ }
+
+ @Override
+ public DiscoveryUpstreamData deserialize(final JsonElement jsonElement,
+ final Type type,
+ final JsonDeserializationContext
jsonDeserializationContext) throws JsonParseException {
+ JsonObject asJsonObject = jsonElement.getAsJsonObject();
+ JsonObject afterJson = new JsonObject();
+ for (Map.Entry<String, JsonElement> elementEntry :
asJsonObject.entrySet()) {
+ String key = elementEntry.getKey();
+ if (conversion.containsKey(key)) {
+ String transferKey = conversion.get(key);
+ afterJson.add(transferKey, elementEntry.getValue());
+ } else {
+ afterJson.add(key, elementEntry.getValue());
+ }
+ }
+ return GsonUtils.getInstance().fromJson(afterJson,
DiscoveryUpstreamData.class);
+ }
+
+ @Override
+ public List<DiscoveryUpstreamData> parseValue(final String jsonString) {
+ if (StringUtils.isBlank(jsonString)) {
+ return Collections.emptyList();
+ }
+ GsonBuilder gsonBuilder = new
GsonBuilder().registerTypeAdapter(DiscoveryUpstreamData.class, this);
+ Gson gson = gsonBuilder.create();
+ return Collections.singletonList(gson.fromJson(jsonString,
DiscoveryUpstreamData.class));
+ }
+
+ /**
+ * parseKey.
+ *
+ * <p>
+ * /.../{pluginName}/{selectorId}/{discoveryHandlerId}/{upstream_suq}
+ * </p>
+ *
+ * @param key key
+ * @return ProxySelectorData
+ */
+ @Override
+ public ProxySelectorData parseKey(final String key) {
+ String[] subArray = key.split("/");
+ String proxySelectorId = subArray[subArray.length - 2];
+ ProxySelectorData proxySelectorData = new ProxySelectorData();
+ ProxySelectorDO proxySelectorDO =
proxySelectorMapper.selectById(proxySelectorId);
+ BeanUtils.copyProperties(proxySelectorDO, proxySelectorData);
+ LOG.info("shenyu parseKey
pluginName={}|proxySelectorName={}|type={}|forwardPort={}",
proxySelectorData.getPluginName(),
+ proxySelectorData.getName(), proxySelectorData.getType(),
proxySelectorData.getForwardPort());
+ return proxySelectorData;
+ }
+
+}
diff --git
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/KeyValueParser.java
similarity index 63%
copy from
shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
copy to
shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/KeyValueParser.java
index ff009d567..10ca9fa9f 100644
---
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/parse/KeyValueParser.java
@@ -15,33 +15,32 @@
* limitations under the License.
*/
-package org.apache.shenyu.protocol.tcp;
+package org.apache.shenyu.admin.discovery.parse;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
import java.util.List;
/**
- * BootstrapServer.
+ * parse value to ProxySelectorData list.
*/
-public interface BootstrapServer {
+public interface KeyValueParser {
/**
- * start.
+ * parseValue.
*
- * @param tcpServerConfiguration tcpServerConfiguration
+ * @param value value
+ * @return DiscoveryUpstreamData list
*/
- void start(TcpServerConfiguration tcpServerConfiguration);
+ List<DiscoveryUpstreamData> parseValue(String value);
/**
- * doOnUpdate.
+ * parseKey.
*
- * @param removeList removeList
+ * @param key discovery key
+ * @return ProxySelectorData.
*/
- void removeCommonUpstream(List<DiscoveryUpstream> removeList);
+ ProxySelectorData parseKey(String key);
- /**
- * shutdown.
- */
- void shutdown();
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
index f409178a7..b467f70c7 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractListDataChangedListener.java
@@ -22,11 +22,12 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.slf4j.Logger;
@@ -249,11 +250,12 @@ public abstract class AbstractListDataChangedListener
implements DataChangedList
}
@Override
- public void onProxySelectorChanged(final List<ProxySelectorData> changed,
final DataEventTypeEnum eventType) {
+ public void onProxySelectorChanged(final List<DiscoverySyncData> changed,
final DataEventTypeEnum eventType) {
updateProxySelectorMap(getConfig(changeData.getProxySelectorDataId()));
switch (eventType) {
case DELETE:
- changed.forEach(proxySelectorData -> {
+ changed.forEach(discoverySyncData -> {
+ ProxySelectorData proxySelectorData =
discoverySyncData.getProxySelectorData();
List<ProxySelectorData> ls = PROXY_SELECTOR_MAP
.getOrDefault(proxySelectorData.getId(), new
ArrayList<>())
.stream()
@@ -266,25 +268,27 @@ public abstract class AbstractListDataChangedListener
implements DataChangedList
case MYSELF:
Set<String> selectIdSet = changed
.stream()
- .map(ProxySelectorData::getId)
+ .map(discoverySyncData ->
+
discoverySyncData.getProxySelectorData().getId()
+ )
.collect(Collectors.toSet());
PROXY_SELECTOR_MAP.keySet().removeAll(selectIdSet);
- changed.forEach(proxySelectorData -> {
- List<ProxySelectorData> ls = new
ArrayList<>(PROXY_SELECTOR_MAP.getOrDefault(proxySelectorData.getId(),
+ changed.forEach(discoverySyncData -> {
+ List<ProxySelectorData> ls = new
ArrayList<>(PROXY_SELECTOR_MAP.getOrDefault(discoverySyncData.getProxySelectorData().getId(),
new ArrayList<>()));
- ls.add(proxySelectorData);
- PROXY_SELECTOR_MAP.put(proxySelectorData.getId(), ls);
+ ls.add(discoverySyncData.getProxySelectorData());
+
PROXY_SELECTOR_MAP.put(discoverySyncData.getProxySelectorData().getId(), ls);
});
break;
default:
- changed.forEach(proxySelectorData -> {
+ changed.forEach(discoverySyncData -> {
List<ProxySelectorData> ls = PROXY_SELECTOR_MAP
- .getOrDefault(proxySelectorData.getId(), new
ArrayList<>())
+
.getOrDefault(discoverySyncData.getProxySelectorData().getId(), new
ArrayList<>())
.stream()
- .filter(s ->
!s.getId().equals(proxySelectorData.getId()))
+ .filter(s ->
!s.getId().equals(discoverySyncData.getProxySelectorData().getId()))
.collect(Collectors.toList());
- ls.add(proxySelectorData);
- PROXY_SELECTOR_MAP.put(proxySelectorData.getId(), ls);
+ ls.add(discoverySyncData.getProxySelectorData());
+
PROXY_SELECTOR_MAP.put(discoverySyncData.getProxySelectorData().getId(), ls);
});
break;
}
@@ -362,7 +366,7 @@ public abstract class AbstractListDataChangedListener
implements DataChangedList
* publishConfig.
*
* @param dataId dataId
- * @param data data
+ * @param data data
*/
public abstract void publishConfig(String dataId, Object data);
@@ -409,11 +413,11 @@ public abstract class AbstractListDataChangedListener
implements DataChangedList
/**
* ChangeData.
*
- * @param pluginDataId pluginDataId
+ * @param pluginDataId pluginDataId
* @param selectorDataId selectorDataId
- * @param ruleDataId ruleDataId
- * @param authDataId authDataId
- * @param metaDataId metaDataId
+ * @param ruleDataId ruleDataId
+ * @param authDataId authDataId
+ * @param metaDataId metaDataId
*/
public ChangeData(final String pluginDataId, final String
selectorDataId,
final String ruleDataId, final String authDataId,
@@ -473,6 +477,7 @@ public abstract class AbstractListDataChangedListener
implements DataChangedList
/**
* get proxySelectorDataId.
+ *
* @return proxySelectorDataId
*/
public String getProxySelectorDataId() {
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
index eb76dc555..07a0fec5e 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractNodeDataChangedListener.java
@@ -19,10 +19,11 @@ package org.apache.shenyu.admin.listener;
import org.apache.shenyu.common.constant.DefaultPathConstants;
import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.exception.ShenyuException;
import org.slf4j.Logger;
@@ -80,6 +81,22 @@ public abstract class AbstractNodeDataChangedListener
implements DataChangedList
}
}
+ @Override
+ public void onProxySelectorChanged(final List<DiscoverySyncData> changed,
final DataEventTypeEnum eventType) {
+ for (DiscoverySyncData data : changed) {
+ String proxySelectorPath =
DefaultPathConstants.buildProxySelectorPath(data.getProxySelectorData().getPluginName(),
data.getProxySelectorData().getName());
+ // delete
+ if (eventType == DataEventTypeEnum.DELETE) {
+ deleteNode(proxySelectorPath);
+ LOG.debug("[DataChangedListener] delete appKey {}",
proxySelectorPath);
+ continue;
+ }
+ // create or update
+ createOrUpdate(proxySelectorPath, data);
+ LOG.info("[DataChangedListener] change proxySelector
path={}|data={}", proxySelectorPath, data);
+ }
+ }
+
@Override
public void onSelectorChanged(final List<SelectorData> changed, final
DataEventTypeEnum eventType) {
if (eventType == DataEventTypeEnum.REFRESH && !changed.isEmpty()) {
@@ -145,7 +162,7 @@ public abstract class AbstractNodeDataChangedListener
implements DataChangedList
* createOrUpdate.
*
* @param pluginPath pluginPath
- * @param data data
+ * @param data data
*/
public abstract void createOrUpdate(String pluginPath, Object data);
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
index 56c3936f6..3389ece26 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
@@ -19,11 +19,11 @@ package org.apache.shenyu.admin.listener;
import org.apache.shenyu.admin.service.manager.LoadServiceDocEntry;
import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
@@ -70,9 +70,8 @@ public class DataChangedEventDispatcher implements
ApplicationListener<DataChang
listener.onMetaDataChanged((List<MetaData>)
event.getSource(), event.getEventType());
break;
case PROXY_SELECTOR:
- listener.onProxySelectorChanged((List<ProxySelectorData>)
event.getSource(), event.getEventType());
+ listener.onProxySelectorChanged((List<DiscoverySyncData>)
event.getSource(), event.getEventType());
break;
-
default:
throw new IllegalStateException("Unexpected value: " +
event.getGroupKey());
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
index da3ae8a30..9e23d3da8 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedListener.java
@@ -20,7 +20,7 @@ package org.apache.shenyu.admin.listener;
import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
@@ -80,12 +80,12 @@ public interface DataChangedListener {
}
/**
- * invoke this method when proxy selector was received.
+ * invoke this method when ProxySelector was changed.
*
* @param changed the changed
* @param eventType the event type
*/
- default void onProxySelectorChanged(List<ProxySelectorData> changed,
DataEventTypeEnum eventType) {
+ default void onProxySelectorChanged(List<DiscoverySyncData> changed,
DataEventTypeEnum eventType) {
}
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
index f2ed75ea7..ae4cbcfe8 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
@@ -45,6 +45,15 @@ public interface DiscoveryUpstreamMapper extends
ExistProvider {
*/
List<DiscoveryUpstreamDO> selectByIds(@Param("ids") List<String> ids);
+
+ /**
+ * selectByProxySelectorId.
+ *
+ * @param proxySelectorId proxySelectorId
+ * @return DiscoveryUpstreamDO list
+ */
+ List<DiscoveryUpstreamDO>
selectByProxySelectorId(@Param("proxySelectorId") String proxySelectorId);
+
/**
* insert.
*
@@ -68,4 +77,13 @@ public interface DiscoveryUpstreamMapper extends
ExistProvider {
* @return rows int
*/
int deleteByIds(@Param("ids") List<String> ids);
+
+ /**
+ * deleteByUrl.
+ *
+ * @param url url
+ * @return rows int
+ */
+ int deleteByUrl(@Param("url") String url);
+
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
index 933819e40..9d4479706 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/ProxySelectorMapper.java
@@ -70,6 +70,14 @@ public interface ProxySelectorMapper extends ExistProvider {
*/
int update(ProxySelectorDO proxySelectorDO);
+ /**
+ * selectById.
+ *
+ * @param id id
+ * @return ProxySelectorDO
+ */
+ ProxySelectorDO selectById(@Param("id") String id);
+
/**
* selectByIds.
*
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/DiscoveryHandlerDTO.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/DiscoveryHandlerDTO.java
new file mode 100644
index 000000000..47cf3d8ec
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/dto/DiscoveryHandlerDTO.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.model.dto;
+
+public class DiscoveryHandlerDTO {
+
+ private String id;
+
+ private String discoveryId;
+
+ private String handler;
+
+ private String listenerNode;
+
+ private String props;
+
+ /**
+ * getId.
+ *
+ * @return id
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * setId.
+ *
+ * @param id id
+ */
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ /**
+ * getDiscoveryId.
+ *
+ * @return discoveryId
+ */
+ public String getDiscoveryId() {
+ return discoveryId;
+ }
+
+ /**
+ * setDiscoveryId.
+ *
+ * @param discoveryId discoveryId
+ */
+ public void setDiscoveryId(final String discoveryId) {
+ this.discoveryId = discoveryId;
+ }
+
+ /**
+ * getHandler.
+ *
+ * @return handler
+ */
+ public String getHandler() {
+ return handler;
+ }
+
+ /**
+ * setHandler.
+ *
+ * @param handler handler
+ */
+ public void setHandler(final String handler) {
+ this.handler = handler;
+ }
+
+ /**
+ * getListenerNode.
+ *
+ * @return listenerNode
+ */
+ public String getListenerNode() {
+ return listenerNode;
+ }
+
+ /**
+ * setListenerNode.
+ *
+ * @param listenerNode listenerNode
+ */
+ public void setListenerNode(final String listenerNode) {
+ this.listenerNode = listenerNode;
+ }
+
+ /**
+ * getProps.
+ *
+ * @return props
+ */
+ public String getProps() {
+ return props;
+ }
+
+ /**
+ * setProps.
+ *
+ * @param props props
+ */
+ public void setProps(final String props) {
+ this.props = props;
+ }
+}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
new file mode 100644
index 000000000..10d9f7818
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.transfer;
+
+import org.apache.shenyu.admin.model.entity.DiscoveryUpstreamDO;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+
+/**
+ * DiscoveryTransfer.
+ */
+public enum DiscoveryTransfer {
+ /**
+ * The constant INSTANCE.
+ */
+ INSTANCE;
+
+ /**
+ * mapToDo.
+ *
+ * @param discoveryUpstreamData discoveryUpstreamData
+ * @return DiscoveryUpstreamDO
+ */
+ public DiscoveryUpstreamDO mapToDo(DiscoveryUpstreamData
discoveryUpstreamData) {
+ return DiscoveryUpstreamDO.builder()
+
.discoveryHandlerId(discoveryUpstreamData.getDiscoveryHandlerId())
+ .id(discoveryUpstreamData.getId())
+ .protocol(discoveryUpstreamData.getProtocol())
+ .status(discoveryUpstreamData.getStatus())
+ .weight(discoveryUpstreamData.getWeight())
+ .props(discoveryUpstreamData.getProps())
+ .url(discoveryUpstreamData.getUrl())
+ .dateUpdated(discoveryUpstreamData.getDateUpdated())
+ .dateCreated(discoveryUpstreamData.getDateCreated()).build();
+ }
+
+ /**
+ * mapToData.
+ *
+ * @param discoveryUpstreamDO discoveryUpstreamDO
+ * @return DiscoveryUpstreamData
+ */
+ public DiscoveryUpstreamData mapToData(DiscoveryUpstreamDO
discoveryUpstreamDO) {
+ DiscoveryUpstreamData discoveryUpstreamData = new
DiscoveryUpstreamData();
+ discoveryUpstreamData.setId(discoveryUpstreamDO.getId());
+ discoveryUpstreamData.setProtocol(discoveryUpstreamDO.getProtocol());
+ discoveryUpstreamData.setUrl(discoveryUpstreamDO.getUrl());
+ discoveryUpstreamData.setStatus(discoveryUpstreamDO.getStatus());
+
discoveryUpstreamData.setDiscoveryHandlerId(discoveryUpstreamDO.getDiscoveryHandlerId());
+ discoveryUpstreamData.setWeight(discoveryUpstreamDO.getWeight());
+ discoveryUpstreamData.setProps(discoveryUpstreamDO.getProps());
+
discoveryUpstreamData.setDateUpdated(discoveryUpstreamDO.getDateUpdated());
+
discoveryUpstreamData.setDateCreated(discoveryUpstreamDO.getDateCreated());
+ return discoveryUpstreamData;
+ }
+
+}
diff --git a/shenyu-admin/src/main/resources/application.yml
b/shenyu-admin/src/main/resources/application.yml
index 78f41bee6..d6c949fb6 100755
--- a/shenyu-admin/src/main/resources/application.yml
+++ b/shenyu-admin/src/main/resources/application.yml
@@ -70,10 +70,10 @@ shenyu:
# clusterName: test
# namespace: application
# token: 0fff5645fc74ee5e0d63a6389433c8c8afc0beea31eed0279ecc1c8961d12da9
-# zookeeper:
-# url: localhost:2181
-# sessionTimeout: 5000
-# connectionTimeout: 2000
+# zookeeper:
+# url: localhost:2181
+# sessionTimeout: 5000
+# connectionTimeout: 2000
# http:
# enabled: true
# nacos:
diff --git
a/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
b/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
index 982a34a9c..59c4d90a1 100644
--- a/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
+++ b/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
@@ -93,10 +93,33 @@
</foreach>
</select>
+ <select id="selectByProxySelectorId" resultMap="BaseResultMap">
+ SELECT
+ du.id,
+ du.date_created,
+ du.date_updated,
+ du.discovery_handler_id,
+ du.protocol,
+ du.url,
+ du.status,
+ du.weight,
+ du.props
+ FROM discovery_upstream du
+ INNER JOIN discovery_rel dr ON du.discovery_handler_id =
dr.discovery_handler_id
+ <where>
+ dr.proxy_selector_id = #{proxySelectorId , jdbcType=VARCHAR}
+ </where>
+ </select>
+
<delete id="deleteByIds" parameterType="java.util.List">
DELETE FROM discovery_upstream WHERE id IN
<foreach item="id" collection="ids" open="(" separator="," close=")">
#{id, jdbcType=VARCHAR}
</foreach>
</delete>
+
+ <delete id="deleteByUrl">
+ DELETE FROM discovery_upstream WHERE url = #{url}
+ </delete>
+
</mapper>
diff --git a/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
b/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
index 5df080149..33feef495 100644
--- a/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
+++ b/shenyu-admin/src/main/resources/mappers/proxy-selector-sqlmap.xml
@@ -109,6 +109,13 @@
</foreach>
</select>
+ <select id="selectById" resultMap="BaseResultMap">
+ SELECT
+ <include refid="Base_Column_List"/>
+ FROM proxy_selector
+ WHERE id = #{id, jdbcType=VARCHAR}
+ </select>
+
<delete id="deleteByIds" parameterType="java.util.List">
DELETE FROM proxy_selector WHERE id IN
<foreach item="id" collection="ids" open="(" separator="," close=")">
diff --git a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
index c96fce3ed..3e842ff90 100755
--- a/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
+++ b/shenyu-admin/src/main/resources/sql-script/h2/schema.sql
@@ -1036,21 +1036,32 @@ CREATE TABLE `proxy_selector`
PRIMARY KEY (`id`)
);
+-- ----------------------------
+-- Table structure for discovery_handler
+-- ----------------------------
+CREATE TABLE `discovery_handler`
+(
+ `id` varchar(128) NOT NULL COMMENT 'primary key id',
+ `discovery_id` varchar(128) NOT NULL COMMENT 'the discovery id',
+ `handler` varchar(255) NOT NULL COMMENT 'the handler',
+ `listener_node` varchar(255) COMMENT 'register server listener to node',
+ `props` text COMMENT 'the discovery pops (json) ',
+ `date_created` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT
'create time',
+ `date_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON
UPDATE CURRENT_TIMESTAMP(3) COMMENT 'update time',
+ PRIMARY KEY (`id`)
+) ;
+
-- ----------------------------
-- Table structure for discovery_rel
-- ----------------------------
CREATE TABLE `discovery_rel`
(
- `id` varchar(128) NOT NULL COMMENT 'primary key id',
- `level` varchar(64) NOT NULL COMMENT 'plugin or selector',
- `discovery_id` varchar(128) NOT NULL COMMENT 'the discovery id',
- `service_id` varchar(128) NOT NULL COMMENT 'the selector id or proxy
selector id',
+ `id` varchar(128) NOT NULL COMMENT 'primary key id',
+ `level` varchar(64) NOT NULL COMMENT 'plugin or selector',
+ `discovery_handler_id` varchar(128) NOT NULL COMMENT 'the discovery
handler id',
+ `selector_id` varchar(128) COMMENT 'the selector id ',
+ `proxy_selector_id` varchar(128) COMMENT 'the proxy selector id',
`date_created` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT
'create time',
`date_updated` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON
UPDATE CURRENT_TIMESTAMP(3) COMMENT 'update time',
PRIMARY KEY (`id`)
);
-
-
-
-
-
diff --git a/shenyu-bootstrap/pom.xml b/shenyu-bootstrap/pom.xml
index 8195fa1cf..149e5de05 100644
--- a/shenyu-bootstrap/pom.xml
+++ b/shenyu-bootstrap/pom.xml
@@ -232,12 +232,12 @@
<!--shenyu tars plugin end-->
<!--Tcp Plugin Start-->
-<!-- <dependency>-->
-<!-- <groupId>org.apache.shenyu</groupId>-->
-<!--
<artifactId>shenyu-spring-boot-starter-plugin-tcp</artifactId>-->
-<!-- <version>${project.version}</version>-->
-<!-- </dependency>-->
- <!--Tcp Plugin end-->
+ <dependency>
+ <groupId>org.apache.shenyu</groupId>
+ <artifactId>shenyu-spring-boot-starter-plugin-tcp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+<!-- Tcp Plugin end-->
<!--shenyu sofa plugin start-->
<dependency>
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
index d39d91c70..a5e48653b 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/DefaultPathConstants.java
@@ -52,9 +52,9 @@ public final class DefaultPathConstants implements Constants {
public static final String META_DATA = PRE_FIX + "/metaData";
/**
- * The constant PROXY_SELECTOR_DATA.
+ * The constant PROXY_SELECTOR.
*/
- public static final String PROXY_SELECTOR_DATA = PRE_FIX +
"/proxySelectorData";
+ public static final String PROXY_SELECTOR = PRE_FIX + "/proxySelectorData";
/**
* acquire app_auth_path.
@@ -138,4 +138,15 @@ public final class DefaultPathConstants implements
Constants {
return String.join(PATH_SEPARATOR, buildRuleParentPath(pluginName),
String.join(SELECTOR_JOIN_RULE, selectorId, ruleId));
}
+ /**
+ * buildProxySelector.
+ *
+ * @param pluginName pluginName
+ * @param proxySelectorName selectorId
+ * @return /shenyu/proxySelectorData/pluginName/proxySelectorName
+ */
+ public static String buildProxySelectorPath(final String pluginName, final
String proxySelectorName) {
+ return String.join(PATH_SEPARATOR, PROXY_SELECTOR, pluginName,
proxySelectorName);
+ }
+
}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoverySyncData.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoverySyncData.java
new file mode 100644
index 000000000..ca3108319
--- /dev/null
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoverySyncData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.dto;
+
+import java.util.List;
+
+public class DiscoverySyncData {
+
+ private ProxySelectorData proxySelectorData;
+
+ private List<DiscoveryUpstreamData> upstreamDataList;
+
+ /**
+ * getProxySelectorData.
+ *
+ * @return proxySelectorData
+ */
+ public ProxySelectorData getProxySelectorData() {
+ return proxySelectorData;
+ }
+
+ /**
+ * setProxySelectorData.
+ *
+ * @param proxySelectorData proxySelectorData
+ */
+ public void setProxySelectorData(final ProxySelectorData
proxySelectorData) {
+ this.proxySelectorData = proxySelectorData;
+ }
+
+ /**
+ * getUpstreamDataList.
+ *
+ * @return upstreamDataList
+ */
+ public List<DiscoveryUpstreamData> getUpstreamDataList() {
+ return upstreamDataList;
+ }
+
+ /**
+ * setUpstreamDataList.
+ *
+ * @param upstreamDataList upstreamDataList
+ */
+ public void setUpstreamDataList(final List<DiscoveryUpstreamData>
upstreamDataList) {
+ this.upstreamDataList = upstreamDataList;
+ }
+
+}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoveryUpstreamData.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoveryUpstreamData.java
new file mode 100644
index 000000000..234c35583
--- /dev/null
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/DiscoveryUpstreamData.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.common.dto;
+
+import java.sql.Timestamp;
+
+public class DiscoveryUpstreamData {
+
+ /**
+ * primary key.
+ */
+ private String id;
+
+ /**
+ * created time.
+ */
+ private Timestamp dateCreated;
+
+ /**
+ * updated time.
+ */
+ private Timestamp dateUpdated;
+
+
+ /**
+ * discoveryHandlerId.
+ */
+ private String discoveryHandlerId;
+
+ /**
+ * protocol.
+ */
+ private String protocol;
+
+ /**
+ * url.
+ */
+ private String url;
+
+ /**
+ * status.
+ */
+ private int status;
+
+ /**
+ * weight.
+ */
+ private int weight;
+
+ /**
+ * props.
+ */
+ private String props;
+
+
+ /**
+ * getDiscoveryHandlerId.
+ *
+ * @return discoveryHandlerId
+ */
+ public String getDiscoveryHandlerId() {
+ return discoveryHandlerId;
+ }
+
+ /**
+ * setDiscoveryHandlerId.
+ *
+ * @param discoveryHandlerId discoveryHandlerId
+ */
+ public void setDiscoveryHandlerId(final String discoveryHandlerId) {
+ this.discoveryHandlerId = discoveryHandlerId;
+ }
+
+ /**
+ * getProtocol.
+ *
+ * @return protocol
+ */
+ public String getProtocol() {
+ return protocol;
+ }
+
+ /**
+ * setProtocol.
+ *
+ * @param protocol protocol
+ */
+ public void setProtocol(final String protocol) {
+ this.protocol = protocol;
+ }
+
+ /**
+ * getUrl.
+ *
+ * @return url
+ */
+ public String getUrl() {
+ return url;
+ }
+
+ /**
+ * setUrl.
+ *
+ * @param url url
+ */
+ public void setUrl(final String url) {
+ this.url = url;
+ }
+
+ /**
+ * getStatus.
+ *
+ * @return status
+ */
+ public int getStatus() {
+ return status;
+ }
+
+ /**
+ * setStatus.
+ *
+ * @param status status
+ */
+ public void setStatus(final int status) {
+ this.status = status;
+ }
+
+ /**
+ * getWeight.
+ *
+ * @return weight
+ */
+ public int getWeight() {
+ return weight;
+ }
+
+ /**
+ * setWeight.
+ *
+ * @param weight weight
+ */
+ public void setWeight(final int weight) {
+ this.weight = weight;
+ }
+
+ /**
+ * getProps.
+ *
+ * @return props
+ */
+ public String getProps() {
+ return props;
+ }
+
+ /**
+ * setProps.
+ *
+ * @param props props
+ */
+ public void setProps(final String props) {
+ this.props = props;
+ }
+
+ /**
+ * Gets the value of id.
+ *
+ * @return the value of id
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the id.
+ *
+ * @param id id
+ */
+ public void setId(final String id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets the value of dateCreated.
+ *
+ * @return the value of dateCreated
+ */
+ public Timestamp getDateCreated() {
+ return dateCreated;
+ }
+
+ /**
+ * Sets the dateCreated.
+ *
+ * @param dateCreated dateCreated
+ */
+ public void setDateCreated(final Timestamp dateCreated) {
+ this.dateCreated = dateCreated;
+ }
+
+ /**
+ * Gets the value of dateUpdated.
+ *
+ * @return the value of dateUpdated
+ */
+ public Timestamp getDateUpdated() {
+ return dateUpdated;
+ }
+
+ /**
+ * setDateUpdated.
+ *
+ * @param dateUpdated dateUpdated
+ */
+ public void setDateUpdated(final Timestamp dateUpdated) {
+ this.dateUpdated = dateUpdated;
+ }
+}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
index 9ff547243..0f28eedbb 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/ProxySelectorData.java
@@ -17,9 +17,6 @@
package org.apache.shenyu.common.dto;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
-
-import java.util.List;
import java.util.Properties;
/**
@@ -39,8 +36,6 @@ public class ProxySelectorData {
private Properties props = new Properties();
- private List<DiscoveryUpstream> discoveryUpstreamList;
-
/**
* getId.
*
@@ -149,22 +144,4 @@ public class ProxySelectorData {
this.props = props;
}
- /**
- * getDiscoveryUpstreamList.
- *
- * @return discoveryUpstreamList
- */
- public List<DiscoveryUpstream> getDiscoveryUpstreamList() {
- return discoveryUpstreamList;
- }
-
- /**
- * setDiscoveryUpstreamList.
- *
- * @param discoveryUpstreamList discoveryUpstreamList
- */
- public void setDiscoveryUpstreamList(final List<DiscoveryUpstream>
discoveryUpstreamList) {
- this.discoveryUpstreamList = discoveryUpstreamList;
- }
-
}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
index 49a2d4f47..70bb892a4 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/enums/ConfigGroupEnum.java
@@ -24,7 +24,6 @@ import java.util.Objects;
/**
* configuration group.
- *
*/
public enum ConfigGroupEnum {
@@ -53,13 +52,11 @@ public enum ConfigGroupEnum {
*/
META_DATA,
-
/**
- * proxy selector enum.
+ * ProxySelector data group enum.
*/
PROXY_SELECTOR;
-
/**
* Acquire by name config group enum.
*
diff --git
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
index f59b13f64..dece0d7d2 100644
---
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
+++
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/ShenyuDiscoveryService.java
@@ -42,6 +42,13 @@ public interface ShenyuDiscoveryService {
*/
void watcher(String key, DataChangedEventListener listener);
+ /**
+ * unWatcher path.
+ *
+ * @param key key
+ */
+ void unWatcher(String key);
+
/**
* Register data.
*
@@ -58,4 +65,9 @@ public interface ShenyuDiscoveryService {
*/
String getData(String key);
+ /**
+ * shutdown.
+ */
+ void shutdown();
+
}
diff --git
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
index 33580bb90..9e7c97e72 100644
---
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
+++
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEventListener.java
@@ -27,5 +27,5 @@ public interface DataChangedEventListener {
*
* @param event data changed event
*/
- void onChange(DataChangedEvent event);
+ void onChange(DiscoveryDataChangedEvent event);
}
diff --git
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEvent.java
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DiscoveryDataChangedEvent.java
similarity index 93%
rename from
shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEvent.java
rename to
shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DiscoveryDataChangedEvent.java
index 5ebe41874..414034590 100644
---
a/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DataChangedEvent.java
+++
b/shenyu-discovery/shenyu-discovery-api/src/main/java/org/apache/shenyu/discovery/api/listener/DiscoveryDataChangedEvent.java
@@ -20,7 +20,7 @@ package org.apache.shenyu.discovery.api.listener;
/**
* Data changed event.
*/
-public final class DataChangedEvent {
+public final class DiscoveryDataChangedEvent {
private final String key;
@@ -35,7 +35,7 @@ public final class DataChangedEvent {
* @param value the value
* @param event the event
*/
- public DataChangedEvent(final String key, final String value, final Event
event) {
+ public DiscoveryDataChangedEvent(final String key, final String value,
final Event event) {
this.key = key;
this.value = value;
this.event = event;
diff --git
a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
index 63e676f4d..8483a4a37 100644
---
a/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
+++
b/shenyu-discovery/shenyu-discovery-zookeeper/src/main/java/org/apache/shenyu/discovery/zookeeper/ZookeeperDiscoveryService.java
@@ -20,17 +20,19 @@ package org.apache.shenyu.discovery.zookeeper;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.CuratorWatcher;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.discovery.api.ShenyuDiscoveryService;
import org.apache.shenyu.discovery.api.config.DiscoveryConfig;
-import org.apache.shenyu.discovery.api.listener.DataChangedEvent;
+import org.apache.shenyu.discovery.api.listener.DiscoveryDataChangedEvent;
import org.apache.shenyu.discovery.api.listener.DataChangedEventListener;
import org.apache.shenyu.spi.Join;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +53,8 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
private final Map<String, String> nodeDataMap = new HashMap<>();
+ private final Map<String, TreeCache> cacheMap = new HashMap<>();
+
@Override
public void init(final DiscoveryConfig config) {
String baseSleepTimeMilliseconds =
config.getProps().getProperty("baseSleepTimeMilliseconds", "1000");
@@ -94,7 +98,7 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
private boolean isExist(final String key) {
try {
- return Objects.nonNull(client.checkExists().forPath(key));
+ return null != client.checkExists().forPath(key);
} catch (Exception e) {
throw new ShenyuException(e);
}
@@ -112,59 +116,81 @@ public class ZookeeperDiscoveryService implements
ShenyuDiscoveryService {
@Override
public void watcher(final String key, final DataChangedEventListener
listener) {
try {
- this.client.getData().usingWatcher(new CuratorWatcher() {
- @Override
- public void process(final WatchedEvent watchedEvent) throws
Exception {
- if (Objects.nonNull(listener)) {
- String path = Objects.isNull(watchedEvent.getPath()) ?
"" : watchedEvent.getPath();
- if (StringUtils.isNoneBlank(path)) {
- client.getData().usingWatcher(this).forPath(path);
- byte[] ret = client.getData().forPath(key);
- String data = Objects.isNull(ret) ? null : new
String(ret, StandardCharsets.UTF_8);
- LOGGER.info("shenyu ZookeeperDiscoveryService
onChange key={}", path);
- listener.onChange(buildDataChangedEvent(path,
data, watchedEvent));
- }
+ TreeCache treeCache = new TreeCache(client, key);
+ TreeCacheListener treeCacheListener = (curatorFramework, event) ->
{
+ ChildData data = event.getData();
+ DiscoveryDataChangedEvent dataChangedEvent;
+ if (Objects.nonNull(data) && Objects.nonNull(data.getData())) {
+ String currentPath = data.getPath();
+ Stat stat = data.getStat();
+ boolean isEphemeral = Objects.nonNull(stat) &&
stat.getEphemeralOwner() > 0;
+ if (!isEphemeral) {
+ LOGGER.info("shenyu Ignore non-ephemeral node
changes");
+ return;
+ }
+ switch (event.getType()) {
+ case NODE_ADDED:
+ dataChangedEvent = new
DiscoveryDataChangedEvent(currentPath, new String(data.getData(),
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.ADDED);
+ break;
+ case NODE_UPDATED:
+ dataChangedEvent = new
DiscoveryDataChangedEvent(currentPath, new String(data.getData(),
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.UPDATED);
+ break;
+ case NODE_REMOVED:
+ dataChangedEvent = new
DiscoveryDataChangedEvent(currentPath, new String(data.getData(),
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.DELETED);
+ break;
+ default:
+ dataChangedEvent = new
DiscoveryDataChangedEvent(currentPath, new String(data.getData(),
StandardCharsets.UTF_8), DiscoveryDataChangedEvent.Event.IGNORED);
+ break;
}
+ listener.onChange(dataChangedEvent);
}
- }).forPath(key);
+ };
+ treeCache.getListenable().addListener(treeCacheListener);
+ treeCache.start();
+ cacheMap.put(key, treeCache);
} catch (Exception e) {
throw new ShenyuException(e);
}
}
+ @Override
+ public void unWatcher(final String key) {
+ if (cacheMap.containsKey(key)) {
+ cacheMap.remove(key).close();
+ }
+ }
+
@Override
public void register(final String key, final String value) {
- this.createOrUpdate(key, value, CreateMode.EPHEMERAL);
+ this.createOrUpdate(key, value, CreateMode.PERSISTENT);
}
@Override
public String getData(final String key) {
try {
- byte[] ret = client.getData().forPath(key);
+ TreeCache treeCache = cacheMap.get(key);
+ if (Objects.isNull(treeCache)) {
+ return null;
+ }
+ ChildData currentData = treeCache.getCurrentData(key);
+ byte[] ret = currentData.getData();
return Objects.isNull(ret) ? null : new String(ret,
StandardCharsets.UTF_8);
} catch (Exception e) {
throw new ShenyuException(e);
}
}
- private DataChangedEvent buildDataChangedEvent(final String key, final
String value, final WatchedEvent watchedEvent) {
- DataChangedEvent.Event event = null;
- switch (watchedEvent.getType()) {
- case NodeCreated:
- event = DataChangedEvent.Event.ADDED;
- break;
- case NodeDeleted:
- event = DataChangedEvent.Event.DELETED;
- break;
- case NodeDataChanged:
- event = DataChangedEvent.Event.UPDATED;
- break;
- case NodeChildrenChanged:
- case DataWatchRemoved:
- case ChildWatchRemoved:
- default:
- event = DataChangedEvent.Event.IGNORED;
+ @Override
+ public void shutdown() {
+ try {
+ //close treeCache
+ for (String key : cacheMap.keySet()) {
+ cacheMap.get(key).close();
+ }
+ client.close();
+ } catch (Exception e) {
+ throw new ShenyuException(e);
}
- return new DataChangedEvent(key, value, event);
+
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
index 955d579d2..8315de259 100644
---
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
+++
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/cache/CommonProxySelectorDataSubscriber.java
@@ -17,8 +17,8 @@
package org.apache.shenyu.plugin.base.cache;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
import org.apache.shenyu.plugin.base.handler.ProxySelectorDataHandler;
import org.apache.shenyu.sync.data.api.ProxySelectorDataSubscriber;
@@ -39,7 +39,7 @@ public class CommonProxySelectorDataSubscriber implements
ProxySelectorDataSubsc
}
@Override
- public void onSubscribe(final ProxySelectorData proxySelectorData, final
List<DiscoveryUpstream> upstreamsList) {
+ public void onSubscribe(final ProxySelectorData proxySelectorData, final
List<DiscoveryUpstreamData> upstreamsList) {
Optional.ofNullable(handlerMap.get(proxySelectorData.getPluginName()))
.ifPresent(handler ->
handler.handlerProxySelector(proxySelectorData, upstreamsList));
}
diff --git
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
index baf2b34fb..21567392f 100644
---
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/handler/ProxySelectorDataHandler.java
@@ -17,8 +17,8 @@
package org.apache.shenyu.plugin.base.handler;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
import java.util.List;
@@ -33,7 +33,7 @@ public interface ProxySelectorDataHandler {
* @param selectorData selectorData
* @param upstreamsList upstreamsList
*/
- void handlerProxySelector(ProxySelectorData selectorData,
List<DiscoveryUpstream> upstreamsList);
+ void handlerProxySelector(ProxySelectorData selectorData,
List<DiscoveryUpstreamData> upstreamsList);
/**
diff --git
a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
index d9c6cdab8..454355ef0 100644
---
a/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-tcp/src/main/java/org/apache/shenyu/plugin/tcp/handler/TcpUpstreamDataHandler.java
@@ -17,8 +17,8 @@
package org.apache.shenyu.plugin.tcp.handler;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.plugin.base.handler.ProxySelectorDataHandler;
import org.apache.shenyu.protocol.tcp.BootstrapServer;
@@ -41,7 +41,7 @@ public class TcpUpstreamDataHandler implements
ProxySelectorDataHandler {
private final Map<String, BootstrapServer> cache = new
ConcurrentHashMap<>();
@Override
- public synchronized void handlerProxySelector(final ProxySelectorData
proxySelectorData, final List<DiscoveryUpstream> upstreamsList) {
+ public synchronized void handlerProxySelector(final ProxySelectorData
proxySelectorData, final List<DiscoveryUpstreamData> upstreamsList) {
String name = proxySelectorData.getName();
if (!cache.containsKey(name)) {
Integer forwardPort = proxySelectorData.getForwardPort();
@@ -54,10 +54,10 @@ public class TcpUpstreamDataHandler implements
ProxySelectorDataHandler {
cache.put(name, bootstrapServer);
LOG.info("shenyu create TcpBootstrapServer success port is {}",
forwardPort);
} else {
- List<DiscoveryUpstream> removed =
UpstreamProvider.getSingleton().refreshCache(name, upstreamsList);
+ List<DiscoveryUpstreamData> removed =
UpstreamProvider.getSingleton().refreshCache(name, upstreamsList);
BootstrapServer bootstrapServer = cache.get(name);
bootstrapServer.removeCommonUpstream(removed);
- LOG.info("shenyu update TcpBootstrapServer success remove is {}",
removed);
+ LOG.info("shenyu update TcpBootstrapServer success upstream is
{}", upstreamsList);
}
}
diff --git
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
index ff009d567..59f1f35b0 100644
---
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
+++
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/BootstrapServer.java
@@ -17,7 +17,7 @@
package org.apache.shenyu.protocol.tcp;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import java.util.List;
@@ -38,7 +38,7 @@ public interface BootstrapServer {
*
* @param removeList removeList
*/
- void removeCommonUpstream(List<DiscoveryUpstream> removeList);
+ void removeCommonUpstream(List<DiscoveryUpstreamData> removeList);
/**
* shutdown.
diff --git
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
index 0a25555c6..3d162d3c1 100644
---
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
+++
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/TcpBootstrapServer.java
@@ -21,7 +21,7 @@ import com.google.common.eventbus.EventBus;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.protocol.tcp.connection.Bridge;
import org.apache.shenyu.protocol.tcp.connection.ConnectionContext;
import
org.apache.shenyu.protocol.tcp.connection.DefaultConnectionConfigProvider;
@@ -66,7 +66,6 @@ public class TcpBootstrapServer implements BootstrapServer {
connectionContext.init(tcpServerConfiguration.getProps());
LoopResources loopResources =
LoopResources.create("shenyu-tcp-bootstrap-server",
tcpServerConfiguration.getBossGroupThreadCount(),
tcpServerConfiguration.getWorkerGroupThreadCount(), true);
-
TcpServer tcpServer = TcpServer.create()
.doOnChannelInit((connObserver, channel, remoteAddress) ->
channel.pipeline().addFirst(new LoggingHandler(LogLevel.INFO)))
.wiretap(true)
@@ -101,7 +100,7 @@ public class TcpBootstrapServer implements BootstrapServer {
* @param removeList removeList
*/
@Override
- public void removeCommonUpstream(final List<DiscoveryUpstream> removeList)
{
+ public void removeCommonUpstream(final List<DiscoveryUpstreamData>
removeList) {
eventBus.post(removeList);
}
diff --git
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
index 116b1baed..acf3a7b06 100644
---
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
+++
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/UpstreamProvider.java
@@ -17,11 +17,15 @@
package org.apache.shenyu.protocol.tcp;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/**
* UpstreamProvider.
@@ -30,7 +34,7 @@ public final class UpstreamProvider {
private static final UpstreamProvider SINGLETON = new UpstreamProvider();
- private final Map<String, List<DiscoveryUpstream>> cache = new
ConcurrentHashMap<>();
+ private final Map<String, List<DiscoveryUpstreamData>> cache = new
ConcurrentHashMap<>();
private UpstreamProvider() {
}
@@ -50,8 +54,8 @@ public final class UpstreamProvider {
* @param pluginSelectorName pluginSelectorName
* @return UpstreamList
*/
- public List<DiscoveryUpstream> provide(final String pluginSelectorName) {
- return cache.get(pluginSelectorName);
+ public List<DiscoveryUpstreamData> provide(final String
pluginSelectorName) {
+ return cache.getOrDefault(pluginSelectorName, new ArrayList<>());
}
/**
@@ -60,8 +64,9 @@ public final class UpstreamProvider {
* @param pluginSelectorName pluginSelectorName
* @param upstreams upstreams
*/
- public void createUpstreams(final String pluginSelectorName, final
List<DiscoveryUpstream> upstreams) {
- cache.put(pluginSelectorName, upstreams);
+ public void createUpstreams(final String pluginSelectorName, final
List<DiscoveryUpstreamData> upstreams) {
+ List<DiscoveryUpstreamData> discoveryUpstreamDataList =
Optional.ofNullable(upstreams).orElseGet(ArrayList::new);
+ cache.put(pluginSelectorName, discoveryUpstreamDataList);
}
/**
@@ -71,9 +76,11 @@ public final class UpstreamProvider {
* @param upstreams upstreams
* @return removeList
*/
- public List<DiscoveryUpstream> refreshCache(final String
pluginSelectorName, final List<DiscoveryUpstream> upstreams) {
- List<DiscoveryUpstream> remove = cache.remove(pluginSelectorName);
- cache.put(pluginSelectorName, upstreams);
- return remove;
+ public List<DiscoveryUpstreamData> refreshCache(final String
pluginSelectorName, final List<DiscoveryUpstreamData> upstreams) {
+ List<DiscoveryUpstreamData> remove = cache.remove(pluginSelectorName);
+ List<DiscoveryUpstreamData> discoveryUpstreamDataList =
Optional.ofNullable(upstreams).orElse(new ArrayList<>());
+ cache.put(pluginSelectorName, discoveryUpstreamDataList);
+ Set<String> urlSet =
discoveryUpstreamDataList.stream().map(DiscoveryUpstreamData::getUrl).collect(Collectors.toSet());
+ return remove.stream().filter(r ->
!urlSet.contains(r.getUrl())).collect(Collectors.toList());
}
}
diff --git
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
index 0cbdf5403..9c8b1c580 100644
---
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
+++
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/ActivityConnectionObserver.java
@@ -19,7 +19,7 @@ package org.apache.shenyu.protocol.tcp.connection;
import com.google.common.eventbus.Subscribe;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
@@ -67,7 +67,7 @@ public class ActivityConnectionObserver implements
ConnectionObserver {
* @param remove removeList
*/
@Subscribe
- public void onRemove(final List<DiscoveryUpstream> remove) {
+ public void onRemove(final List<DiscoveryUpstreamData> remove) {
LOG.info("shenyu {} ConnectionObserver do on remove upstreams", name);
for (Connection connection : cache.keySet()) {
SocketAddress socketAddress = connection.channel().remoteAddress();
@@ -85,10 +85,10 @@ public class ActivityConnectionObserver implements
ConnectionObserver {
* @param cacheSocketAddress cacheSocketAddress
* @return boolean
*/
- private boolean in(final List<DiscoveryUpstream> removeList, final
SocketAddress cacheSocketAddress) {
+ private boolean in(final List<DiscoveryUpstreamData> removeList, final
SocketAddress cacheSocketAddress) {
return removeList.stream().anyMatch(u -> {
String cacheUrl = cacheSocketAddress.toString().substring(1);
- String removedUrl = u.getUpstreamUrl();
+ String removedUrl = u.getUrl();
LOG.info("compare {} , {}", cacheUrl, removedUrl);
return StringUtils.equals(cacheUrl, removedUrl);
});
diff --git
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
index 2a0e7b270..3f4240993 100644
---
a/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
+++
b/shenyu-protocol/shenyu-protocol-tcp/src/main/java/org/apache/shenyu/protocol/tcp/connection/DefaultConnectionConfigProvider.java
@@ -17,6 +17,7 @@
package org.apache.shenyu.protocol.tcp.connection;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.loadbalancer.entity.Upstream;
import org.apache.shenyu.loadbalancer.factory.LoadBalancerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
@@ -48,10 +50,12 @@ public class DefaultConnectionConfigProvider implements
ClientConnectionConfigPr
@Override
public URI getProxiedService(final String ip) {
- List<Upstream> upstreamList =
UpstreamProvider.getSingleton().provide(this.pluginSelectorName)
- .stream()
- .map(dp ->
Upstream.builder().url(dp.getUpstreamUrl()).status(dp.isStatus()).weight(dp.getWeight()).protocol(dp.getProtocol()).build())
- .collect(Collectors.toList());
+ List<Upstream> upstreamList =
UpstreamProvider.getSingleton().provide(this.pluginSelectorName).stream().map(dp
-> {
+ return
Upstream.builder().url(dp.getUrl()).status(Objects.equals(dp.getStatus(),
1)).weight(dp.getWeight()).protocol(dp.getProtocol()).build();
+ }).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(upstreamList)) {
+ throw new ShenyuException("shenyu TcpProxy don't have any
upstream");
+ }
Upstream upstream = LoadBalancerFactory.selector(upstreamList,
loadBalanceAlgorithm, ip);
return cover(upstream);
}
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
index ac2b9d92a..32185a657 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-api/src/main/java/org/apache/shenyu/sync/data/api/ProxySelectorDataSubscriber.java
@@ -17,8 +17,8 @@
package org.apache.shenyu.sync.data.api;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
-import org.apache.shenyu.common.dto.convert.selector.DiscoveryUpstream;
import java.util.List;
@@ -31,9 +31,9 @@ public interface ProxySelectorDataSubscriber {
* On subscribe.
*
* @param proxySelectorData the proxySelector data
- * @param upstreamsList upstreamsList
+ * @param upstreamsList upstreamsList
*/
- void onSubscribe(ProxySelectorData proxySelectorData,
List<DiscoveryUpstream> upstreamsList);
+ void onSubscribe(ProxySelectorData proxySelectorData,
List<DiscoveryUpstreamData> upstreamsList);
/**
* Un subscribe.
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
index ffa77ed6a..4b5b9950d 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-apollo/src/main/java/org/apache/shenyu/sync/data/apollo/ApolloDataService.java
@@ -25,7 +25,7 @@ import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
@@ -169,12 +169,12 @@ public class ApolloDataService implements SyncDataService
{
* subscriber proxy selector data.
* @return proxy selector data list
*/
- public List<ProxySelectorData> subscriberProxySelectorData() {
- List<ProxySelectorData> proxySelectorDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configService.getProperty(ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
"{}"),
- ProxySelectorData.class).values());
- proxySelectorDataList.forEach(proxySelectorData ->
proxySelectorDataSubscribers.forEach(subscriber -> {
- subscriber.unSubscribe(proxySelectorData);
- subscriber.onSubscribe(proxySelectorData,
proxySelectorData.getDiscoveryUpstreamList());
+ public List<DiscoverySyncData> subscriberProxySelectorData() {
+ List<DiscoverySyncData> proxySelectorDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configService.getProperty(ApolloPathConstants.PROXY_SELECTOR_DATA_ID,
"{}"),
+ DiscoverySyncData.class).values());
+ proxySelectorDataList.forEach(discoverySyncData ->
proxySelectorDataSubscribers.forEach(subscriber -> {
+ subscriber.unSubscribe(discoverySyncData.getProxySelectorData());
+ subscriber.onSubscribe(discoverySyncData.getProxySelectorData(),
discoverySyncData.getUpstreamDataList());
}));
return proxySelectorDataList;
}
@@ -206,8 +206,8 @@ public class ApolloDataService implements SyncDataService {
LOG.info("apollo listener authData: {}", appAuthDataList);
break;
case ApolloPathConstants.PROXY_SELECTOR_DATA_ID:
- List<ProxySelectorData> proxySelectorDataList =
subscriberProxySelectorData();
- LOG.info("apollo listener ProxySelectorData: {}",
proxySelectorDataList);
+ List<DiscoverySyncData> discoverySyncData =
subscriberProxySelectorData();
+ LOG.info("apollo listener ProxySelectorData: {}",
discoverySyncData);
break;
default:
break;
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
index 66c16e0bf..6dd635a6f 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-nacos/src/main/java/org/apache/shenyu/sync/data/nacos/handler/NacosCacheHandler.java
@@ -29,6 +29,8 @@ import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.ProxySelectorData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.MapUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
@@ -154,10 +156,12 @@ public class NacosCacheHandler {
protected void updateProxySelectorMap(final String configInfo) {
try {
- List<ProxySelectorData> proxySelectorDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
ProxySelectorData.class).values());
- proxySelectorDataList.forEach(proxySelectorData ->
proxySelectorDataSubscribers.forEach(subscriber -> {
+ List<DiscoverySyncData> discoverySyncDataList = new
ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo,
DiscoverySyncData.class).values());
+ discoverySyncDataList.forEach(discoverySyncData ->
proxySelectorDataSubscribers.forEach(subscriber -> {
+ ProxySelectorData proxySelectorData =
discoverySyncData.getProxySelectorData();
+ List<DiscoveryUpstreamData> upstreamDataList =
discoverySyncData.getUpstreamDataList();
subscriber.unSubscribe(proxySelectorData);
- subscriber.onSubscribe(proxySelectorData,
proxySelectorData.getDiscoveryUpstreamList());
+ subscriber.onSubscribe(proxySelectorData, upstreamDataList);
}));
} catch (JsonParseException e) {
LOG.error("sync proxy selector data have error", e);
diff --git
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
index 81c3cb4b4..c9caea41b 100644
---
a/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
+++
b/shenyu-sync-data-center/shenyu-sync-data-zookeeper/src/main/java/org/apache/shenyu/sync/data/zookeeper/ZookeeperSyncDataService.java
@@ -25,12 +25,14 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.shenyu.common.constant.DefaultPathConstants;
+import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.RuleData;
-import org.apache.shenyu.common.dto.AppAuthData;
-import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.SelectorData;
+import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
@@ -61,7 +63,6 @@ public class ZookeeperSyncDataService implements
SyncDataService {
private final List<ProxySelectorDataSubscriber>
proxySelectorDataSubscribers;
-
/**
* Instantiates a new Zookeeper cache manager.
*
@@ -89,7 +90,7 @@ public class ZookeeperSyncDataService implements
SyncDataService {
zkClient.addCache(DefaultPathConstants.PLUGIN_PARENT, new
PluginCacheListener());
zkClient.addCache(DefaultPathConstants.SELECTOR_PARENT, new
SelectorCacheListener());
zkClient.addCache(DefaultPathConstants.RULE_PARENT, new
RuleCacheListener());
- zkClient.addCache(DefaultPathConstants.PROXY_SELECTOR_DATA, new
ProxySelectorCacheListener());
+ zkClient.addCache(DefaultPathConstants.PROXY_SELECTOR, new
ProxySelectorCacheListener());
}
private void watchAppAuth() {
@@ -162,9 +163,9 @@ public class ZookeeperSyncDataService implements
SyncDataService {
.ifPresent(data -> metaDataSubscribers.forEach(e ->
e.onSubscribe(metaData)));
}
- private void cacheProxySelectorData(final ProxySelectorData
proxySelectorData) {
+ private void cacheProxySelectorData(final ProxySelectorData
proxySelectorData, final List<DiscoveryUpstreamData> discoveryUpstreamDataList)
{
Optional.ofNullable(proxySelectorData)
- .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.onSubscribe(proxySelectorData,
proxySelectorData.getDiscoveryUpstreamList())));
+ .ifPresent(data -> proxySelectorDataSubscribers.forEach(e ->
e.onSubscribe(proxySelectorData, discoveryUpstreamDataList)));
}
private void unCacheMetaData(final MetaData metaData) {
@@ -330,7 +331,7 @@ public class ZookeeperSyncDataService implements
SyncDataService {
@Override
protected void event(final TreeCacheEvent.Type type, final String
path, final ChildData data) {
// if not uri register path, return.
- if (!path.contains(DefaultPathConstants.PROXY_SELECTOR_DATA)) {
+ if (!path.contains(DefaultPathConstants.PROXY_SELECTOR)) {
return;
}
String[] pathInfoArray = path.split("/");
@@ -346,12 +347,13 @@ public class ZookeeperSyncDataService implements
SyncDataService {
unCacheProxySelectorData(proxySelectorData);
return;
}
- ProxySelectorData proxySelectorData =
GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), ProxySelectorData.class);
+ DiscoverySyncData discoverySyncData =
GsonUtils.getInstance().fromJson(new String(data.getData(),
StandardCharsets.UTF_8), DiscoverySyncData.class);
+ ProxySelectorData proxySelectorData =
discoverySyncData.getProxySelectorData();
proxySelectorData.setName(proxySelectorName);
proxySelectorData.setPluginName(pluginName);
// create or update
Optional.ofNullable(data)
- .ifPresent(e -> cacheProxySelectorData(proxySelectorData));
+ .ifPresent(e -> cacheProxySelectorData(proxySelectorData,
discoverySyncData.getUpstreamDataList()));
}
}