This is an automated email from the ASF dual-hosted git repository.
hefengen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 7b6d29129c discovery Local support upstream health check in Admin
#5591 (#5596)
7b6d29129c is described below
commit 7b6d29129ca5e220b4178c3ad96030cca97855ba
Author: 杨文杰 <[email protected]>
AuthorDate: Wed Jul 24 13:49:06 2024 +0800
discovery Local support upstream health check in Admin #5591 (#5596)
* discovery Local support upstream health check in Admin
* discovery Local support upstream health check in Admin
---------
Co-authored-by: moremind <[email protected]>
Co-authored-by: loongs-zhang <[email protected]>
---
.../admin/discovery/LocalDiscoveryProcessor.java | 5 +++
.../admin/mapper/DiscoveryUpstreamMapper.java | 11 ++++++
.../shenyu/admin/model/enums/EventTypeEnum.java | 8 +++-
.../discovery/DiscoveryStreamUpdatedEvent.java | 39 +++++++++++++++++++
.../admin/service/DiscoveryUpstreamService.java | 10 +++++
.../service/impl/DiscoveryUpstreamServiceImpl.java | 14 ++++++-
.../admin/service/impl/UpstreamCheckService.java | 45 ++++++++++------------
.../mappers/discovery-upstream-sqlmap.xml | 7 ++++
8 files changed, 112 insertions(+), 27 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
index 5cc7b891be..223e0873bc 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/discovery/LocalDiscoveryProcessor.java
@@ -24,6 +24,7 @@ import org.apache.shenyu.admin.model.dto.DiscoveryUpstreamDTO;
import org.apache.shenyu.admin.model.dto.ProxySelectorDTO;
import org.apache.shenyu.admin.model.entity.DiscoveryDO;
import org.apache.shenyu.admin.model.entity.DiscoveryUpstreamDO;
+import
org.apache.shenyu.admin.model.event.discovery.DiscoveryStreamUpdatedEvent;
import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
@@ -38,6 +39,8 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.shenyu.admin.model.enums.EventTypeEnum.LOCAL_DISCOVERY_UPSTREAM_UPDATE;
+
/**
* LocalDiscoveryProcessor.
*/
@@ -87,6 +90,8 @@ public class LocalDiscoveryProcessor implements
DiscoveryProcessor, ApplicationE
discoverySyncData.setUpstreamDataList(upstreamDataList);
DataChangedEvent dataChangedEvent = new
DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE,
Collections.singletonList(discoverySyncData));
eventPublisher.publishEvent(dataChangedEvent);
+ DiscoveryStreamUpdatedEvent discoveryStreamUpdatedEvent = new
DiscoveryStreamUpdatedEvent(discoverySyncData, LOCAL_DISCOVERY_UPSTREAM_UPDATE);
+ eventPublisher.publishEvent(discoveryStreamUpdatedEvent);
}
@Override
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
index 1d5958d1d7..cecf31dfdc 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/mapper/DiscoveryUpstreamMapper.java
@@ -150,4 +150,15 @@ public interface DiscoveryUpstreamMapper extends
ExistProvider {
* @return rows
*/
int updateDiscoveryHandlerIdAndUrl(DiscoveryUpstreamDO
discoveryUpstreamDO);
+
+ /**
+ * updateStatusByUrl.
+ *
+ * @param discoveryHandlerId discoveryHandlerId
+ * @param url url
+ * @param status status 0 healthy 1 unhealthy
+ * @return effect
+ */
+ int updateStatusByUrl(@Param("discoveryHandlerId") String
discoveryHandlerId, @Param("url") String url, int status);
+
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/enums/EventTypeEnum.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/enums/EventTypeEnum.java
index c9b913c8e6..9b1d857cfd 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/enums/EventTypeEnum.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/enums/EventTypeEnum.java
@@ -187,7 +187,13 @@ public enum EventTypeEnum {
/**
* user update.
*/
- USER_UPDATE("UPDATE:User", DataEventTypeEnum.UPDATE, Color.UPDATE_COLOR);
+ USER_UPDATE("UPDATE:User", DataEventTypeEnum.UPDATE, Color.UPDATE_COLOR),
+
+
+ /**
+ * local discovery upstream update.
+ */
+ LOCAL_DISCOVERY_UPSTREAM_UPDATE("UPDATE:LocalDiscoveryUpstream",
DataEventTypeEnum.UPDATE, Color.UPDATE_COLOR);
/**
* type name.
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/event/discovery/DiscoveryStreamUpdatedEvent.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/event/discovery/DiscoveryStreamUpdatedEvent.java
new file mode 100644
index 0000000000..0a213370fb
--- /dev/null
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/model/event/discovery/DiscoveryStreamUpdatedEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.admin.model.event.discovery;
+
+import org.apache.shenyu.admin.model.enums.EventTypeEnum;
+import org.apache.shenyu.admin.model.event.AdminDataModelChangedEvent;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+
+public class DiscoveryStreamUpdatedEvent extends AdminDataModelChangedEvent {
+
+ public DiscoveryStreamUpdatedEvent(final DiscoverySyncData source, final
EventTypeEnum type) {
+ super(source, null, type);
+ }
+
+ /**
+ * the created selector.
+ *
+ * @return selector
+ */
+ public DiscoverySyncData getDiscoverySyncData() {
+ return (DiscoverySyncData) getSource();
+ }
+
+}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/DiscoveryUpstreamService.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/DiscoveryUpstreamService.java
index 4636a2d547..27e1a8643e 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/DiscoveryUpstreamService.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/DiscoveryUpstreamService.java
@@ -99,6 +99,16 @@ public interface DiscoveryUpstreamService {
*/
void deleteBySelectorIdAndUrl(String selectorId, String url);
+
+ /**
+ * changeStatusBySelectorIdAndUrl.
+ *
+ * @param selectorId selectorId
+ * @param url url
+ * @param enabled enabled
+ */
+ void changeStatusBySelectorIdAndUrl(String selectorId, String url, Boolean
enabled);
+
/**
* Import the discoveryUpstream data list.
* @param discoveryUpstreamList the discoveryUpstream data
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/DiscoveryUpstreamServiceImpl.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/DiscoveryUpstreamServiceImpl.java
index f04e6d447a..488288c767 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/DiscoveryUpstreamServiceImpl.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/DiscoveryUpstreamServiceImpl.java
@@ -124,7 +124,10 @@ public class DiscoveryUpstreamServiceImpl implements
DiscoveryUpstreamService {
if (StringUtils.hasLength(discoveryUpstreamDTO.getId())) {
discoveryUpstreamMapper.updateSelective(discoveryUpstreamDO);
} else {
- discoveryUpstreamMapper.insert(discoveryUpstreamDO);
+ DiscoveryUpstreamDO existingRecord =
discoveryUpstreamMapper.selectByDiscoveryHandlerIdAndUrl(discoveryUpstreamDO.getDiscoveryHandlerId(),
discoveryUpstreamDO.getUrl());
+ if (Objects.isNull(existingRecord)) {
+ discoveryUpstreamMapper.insert(discoveryUpstreamDO);
+ }
}
}
@@ -228,6 +231,15 @@ public class DiscoveryUpstreamServiceImpl implements
DiscoveryUpstreamService {
}
}
+ @Override
+ @Transactional(rollbackFor = Exception.class)
+ public void changeStatusBySelectorIdAndUrl(final String selectorId, final
String url, final Boolean enabled) {
+ DiscoveryHandlerDO discoveryHandlerDO =
discoveryHandlerMapper.selectBySelectorId(selectorId);
+ if (Objects.nonNull(discoveryHandlerDO)) {
+
discoveryUpstreamMapper.updateStatusByUrl(discoveryHandlerDO.getId(), url,
enabled ? 0 : 1);
+ }
+ }
+
@Override
@Transactional(rollbackFor = Exception.class)
public ConfigImportResult importData(final List<DiscoveryUpstreamDTO>
discoveryUpstreamList) {
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
index 305f18fa02..679375658a 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
@@ -30,15 +30,13 @@ import
org.apache.shenyu.admin.mapper.SelectorConditionMapper;
import org.apache.shenyu.admin.mapper.SelectorMapper;
import org.apache.shenyu.admin.model.entity.PluginDO;
import org.apache.shenyu.admin.model.entity.SelectorDO;
-import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent;
-import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent;
+import
org.apache.shenyu.admin.model.event.discovery.DiscoveryStreamUpdatedEvent;
import org.apache.shenyu.admin.model.query.SelectorConditionQuery;
import org.apache.shenyu.admin.service.DiscoveryUpstreamService;
import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor;
import org.apache.shenyu.admin.transfer.ConditionTransfer;
import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
-import org.apache.shenyu.admin.utils.SelectorUtil;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.ConditionData;
@@ -46,11 +44,11 @@ import org.apache.shenyu.common.dto.DiscoverySyncData;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
-import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.enums.PluginEnum;
+import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.MapUtils;
import org.apache.shenyu.common.utils.UpstreamCheckUtils;
import org.apache.shenyu.register.common.config.ShenyuRegisterCenterConfig;
@@ -343,6 +341,8 @@ public class UpstreamCheckService {
} else {
commonUpstream.setStatus(false);
ZOMBIE_SET.add(ZombieUpstream.transform(commonUpstream,
zombieCheckTimes, selectorId));
+ LOG.info("change unlive selectorId={}|url={}", selectorId,
commonUpstream.getUpstreamUrl());
+
discoveryUpstreamService.changeStatusBySelectorIdAndUrl(selectorId,
commonUpstream.getUpstreamUrl(), Boolean.FALSE);
LOG.error("check the url={} is fail ",
commonUpstream.getUpstreamUrl());
}
return null;
@@ -414,6 +414,11 @@ public class UpstreamCheckService {
}
return true;
});
+ // change live node status to TRUE
+ discoveryUpstreamDataList.forEach(upstream -> {
+ LOG.info("change alive selectorId={}|url={}", selectorId,
upstream.getUrl());
+
discoveryUpstreamService.changeStatusBySelectorIdAndUrl(selectorId,
upstream.getUrl(), Boolean.TRUE);
+ });
DiscoverySyncData discoverySyncData = new DiscoverySyncData();
discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList);
discoverySyncData.setPluginName(pluginName);
@@ -456,30 +461,20 @@ public class UpstreamCheckService {
}
/**
- * listen {@link SelectorCreatedEvent} add data permission.
- *
- * @param event event
- */
- @EventListener(SelectorCreatedEvent.class)
- public void onSelectorCreated(final SelectorCreatedEvent event) {
- final PluginDO plugin =
pluginMapper.selectById(event.getSelector().getPluginId());
- List<DivideUpstream> existDivideUpstreams =
SelectorUtil.buildDivideUpstream(event.getSelector(), plugin.getName());
- if (CollectionUtils.isNotEmpty(existDivideUpstreams)) {
- replace(event.getSelector().getId(),
CommonUpstreamUtils.convertCommonUpstreamList(existDivideUpstreams));
- }
- }
-
- /**
- * listen {@link SelectorCreatedEvent} add data permission.
+ * listen {@link DiscoveryStreamUpdatedEvent} add data permission.
*
* @param event event
*/
- @EventListener(SelectorUpdatedEvent.class)
- public void onSelectorUpdated(final SelectorUpdatedEvent event) {
- final PluginDO plugin =
pluginMapper.selectById(event.getSelector().getPluginId());
- List<DivideUpstream> existDivideUpstreams =
SelectorUtil.buildDivideUpstream(event.getSelector(), plugin.getName());
- if (CollectionUtils.isNotEmpty(existDivideUpstreams)) {
- replace(event.getSelector().getId(),
CommonUpstreamUtils.convertCommonUpstreamList(existDivideUpstreams));
+ @EventListener(DiscoveryStreamUpdatedEvent.class)
+ public void onDiscoveryUpstreamUpdated(final DiscoveryStreamUpdatedEvent
event) {
+ DiscoverySyncData discoverySyncData = event.getDiscoverySyncData();
+ LOG.info("onDiscoveryUpstreamUpdated plugin={}|list={}",
discoverySyncData.getPluginName(), discoverySyncData.getUpstreamDataList());
+ if
(PluginEnum.DIVIDE.getName().equals(discoverySyncData.getPluginName())) {
+ List<DiscoveryUpstreamData> upstreamDataList =
discoverySyncData.getUpstreamDataList();
+ List<CommonUpstream> collect =
upstreamDataList.stream().map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream).collect(Collectors.toList());
+ List<CommonUpstream> commonUpstreams =
CommonUpstreamUtils.convertCommonUpstreamList(collect);
+ LOG.info("UpstreamCacheManager replace selectorId={}|json={}",
discoverySyncData.getSelectorId(), GsonUtils.getGson().toJson(commonUpstreams));
+ replace(discoverySyncData.getSelectorId(), commonUpstreams);
}
}
diff --git
a/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
b/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
index d90ec8da22..fd12ab01dd 100644
--- a/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
+++ b/shenyu-admin/src/main/resources/mappers/discovery-upstream-sqlmap.xml
@@ -219,4 +219,11 @@
<delete id="deleteByUrl">
DELETE FROM discovery_upstream WHERE discovery_handler_id =
#{discoveryHandlerId} and url = #{url}
</delete>
+
+ <update id="updateStatusByUrl">
+ UPDATE discovery_upstream
+ SET status = #{status}
+ WHERE discovery_handler_id = #{discoveryHandlerId} and url = #{url}
+ </update>
+
</mapper>