This is an automated email from the ASF dual-hosted git repository.
zhangzicheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 3a69b0b204 [ISSUE #5311] less concurrency (#5587)
3a69b0b204 is described below
commit 3a69b0b204cef59da93571cee64068a9db6cf1ba
Author: loongs-zhang <[email protected]>
AuthorDate: Sat Jul 6 11:18:08 2024 +0800
[ISSUE #5311] less concurrency (#5587)
* [ISSUE #5311] less concurrency
* fix grpc CI
* try fix grpc e2e CI
* rollback
---------
Co-authored-by: moremind <[email protected]>
---
.../admin/service/impl/UpstreamCheckService.java | 65 +++++++++++++++++++---
.../shenyu/admin/transfer/DiscoveryTransfer.java | 14 +++++
.../admin/service/UpstreamCheckServiceTest.java | 22 +++++++-
.../handler/GrpcDiscoveryUpstreamDataHandler.java | 5 +-
4 files changed, 94 insertions(+), 12 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
index fdef9ef6e6..305f18fa02 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
@@ -33,13 +33,17 @@ import org.apache.shenyu.admin.model.entity.SelectorDO;
import org.apache.shenyu.admin.model.event.selector.SelectorCreatedEvent;
import org.apache.shenyu.admin.model.event.selector.SelectorUpdatedEvent;
import org.apache.shenyu.admin.model.query.SelectorConditionQuery;
+import org.apache.shenyu.admin.service.DiscoveryUpstreamService;
import org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor;
import org.apache.shenyu.admin.transfer.ConditionTransfer;
+import org.apache.shenyu.admin.transfer.DiscoveryTransfer;
import org.apache.shenyu.admin.utils.CommonUpstreamUtils;
import org.apache.shenyu.admin.utils.SelectorUtil;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.ConditionData;
+import org.apache.shenyu.common.dto.DiscoverySyncData;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
@@ -59,6 +63,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -111,6 +116,8 @@ public class UpstreamCheckService {
private final SelectorHandleConverterFactor converterFactor;
+ private final DiscoveryUpstreamService discoveryUpstreamService;
+
private ScheduledThreadPoolExecutor executor;
private ScheduledFuture<?> scheduledFuture;
@@ -134,12 +141,14 @@ public class UpstreamCheckService {
final PluginMapper pluginMapper,
final SelectorConditionMapper
selectorConditionMapper,
final ShenyuRegisterCenterConfig
shenyuRegisterCenterConfig,
- final SelectorHandleConverterFactor
converterFactor) {
+ final SelectorHandleConverterFactor
converterFactor,
+ final DiscoveryUpstreamService
discoveryUpstreamService) {
this.selectorMapper = selectorMapper;
this.eventPublisher = eventPublisher;
this.pluginMapper = pluginMapper;
this.selectorConditionMapper = selectorConditionMapper;
this.converterFactor = converterFactor;
+ this.discoveryUpstreamService = discoveryUpstreamService;
Properties props = shenyuRegisterCenterConfig.getProps();
this.checked =
Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED,
Constants.DEFAULT_CHECK_VALUE));
this.scheduledThreads =
Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS,
Constants.ZOMBIE_CHECK_THREADS_VALUE));
@@ -199,6 +208,15 @@ public class UpstreamCheckService {
return;
}
+ Optional.ofNullable(submitJust(selectorId, commonUpstream))
+ .ifPresent(upstreams -> executor.execute(() ->
updateHandler(selectorId, upstreams, upstreams)));
+ }
+
+ private List<CommonUpstream> submitJust(final String selectorId, final
CommonUpstream commonUpstream) {
+ if (!REGISTER_TYPE_HTTP.equalsIgnoreCase(registerType) || !checked) {
+ return null;
+ }
+
List<CommonUpstream> upstreams =
MapUtils.computeIfAbsent(UPSTREAM_MAP, selectorId, k -> new
CopyOnWriteArrayList<>());
if (commonUpstream.isStatus()) {
Optional<CommonUpstream> exists = upstreams.stream().filter(item
-> StringUtils.isNotBlank(item.getUpstreamUrl())
@@ -213,7 +231,7 @@ public class UpstreamCheckService {
upstreams.removeIf(item -> item.equals(commonUpstream));
PENDING_SYNC.add(NumberUtils.INTEGER_ZERO);
}
- executor.execute(() -> updateHandler(selectorId, upstreams,
upstreams));
+ return upstreams;
}
/**
@@ -297,7 +315,8 @@ public class UpstreamCheckService {
commonUpstream.setStatus(true);
LOG.info("UpstreamCacheManager check zombie upstream success the
url: {}, host: {} ", commonUpstream.getUpstreamUrl(),
commonUpstream.getUpstreamHost());
List<CommonUpstream> old =
ListUtils.unmodifiableList(UPSTREAM_MAP.getOrDefault(selectorId,
Collections.emptyList()));
- this.submit(selectorId, commonUpstream);
+ // fix https://github.com/apache/shenyu/issues/5311
+ this.submitJust(selectorId, commonUpstream);
updateHandler(selectorId, old, UPSTREAM_MAP.get(selectorId));
} else {
LOG.error("check zombie upstream the url={} is fail",
commonUpstream.getUpstreamUrl());
@@ -369,18 +388,38 @@ public class UpstreamCheckService {
}
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
- String handler =
converterFactor.newInstance(pluginDO.getName()).handler(selectorDO.getHandle(),
aliveList);
+ if (Objects.isNull(pluginDO)) {
+ return;
+ }
+ String pluginName = pluginDO.getName();
+ String handler =
converterFactor.newInstance(pluginName).handler(selectorDO.getHandle(),
aliveList);
selectorDO.setHandle(handler);
selectorMapper.updateSelective(selectorDO);
List<ConditionData> conditionDataList =
ConditionTransfer.INSTANCE.mapToSelectorDOS(
selectorConditionMapper.selectByQuery(new
SelectorConditionQuery(selectorDO.getId())));
- SelectorData selectorData = SelectorDO.transFrom(selectorDO,
pluginDO.getName(), conditionDataList);
+ SelectorData selectorData = SelectorDO.transFrom(selectorDO,
pluginName, conditionDataList);
selectorData.setHandle(handler);
// publish change event.
eventPublisher.publishEvent(new
DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(selectorData)));
+ // publish discovery change event.
+ List<DiscoveryUpstreamData> discoveryUpstreamDataList =
discoveryUpstreamService.findBySelectorId(selectorId);
+ discoveryUpstreamDataList.removeIf(u -> {
+ for (CommonUpstream alive : aliveList) {
+ if (alive.getUpstreamUrl().equals(u.getUrl())) {
+ return false;
+ }
+ }
+ return true;
+ });
+ DiscoverySyncData discoverySyncData = new DiscoverySyncData();
+ discoverySyncData.setUpstreamDataList(discoveryUpstreamDataList);
+ discoverySyncData.setPluginName(pluginName);
+ discoverySyncData.setSelectorId(selectorId);
+ discoverySyncData.setSelectorName(selectorDO.getName());
+ eventPublisher.publishEvent(new
DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.UPDATE,
Collections.singletonList(discoverySyncData)));
}
/**
@@ -396,12 +435,19 @@ public class UpstreamCheckService {
final List<SelectorDO> selectorDOList =
selectorMapper.findByPluginIds(new ArrayList<>(pluginMap.keySet()));
long currentTimeMillis = System.currentTimeMillis();
Optional.ofNullable(selectorDOList).orElseGet(ArrayList::new).stream()
- .filter(selectorDO -> Objects.nonNull(selectorDO) &&
StringUtils.isNotEmpty(selectorDO.getHandle()))
+ .filter(Objects::nonNull)
.forEach(selectorDO -> {
String name = pluginMap.get(selectorDO.getPluginId());
- List<CommonUpstream> commonUpstreams =
converterFactor.newInstance(name).convertUpstream(selectorDO.getHandle())
- .stream().filter(upstream -> upstream.isStatus()
|| upstream.getTimestamp() > currentTimeMillis -
TimeUnit.SECONDS.toMillis(zombieRemovalTimes))
- .collect(Collectors.toList());
+ List<CommonUpstream> commonUpstreams = new LinkedList<>();
+
discoveryUpstreamService.findBySelectorId(selectorDO.getId()).stream()
+
.map(DiscoveryTransfer.INSTANCE::mapToCommonUpstream)
+ .forEach(commonUpstreams::add);
+ String handle = selectorDO.getHandle();
+ if (StringUtils.isNotEmpty(handle)) {
+
commonUpstreams.addAll(converterFactor.newInstance(name).convertUpstream(handle)
+ .stream().filter(upstream ->
upstream.isStatus() || upstream.getTimestamp() > currentTimeMillis -
TimeUnit.SECONDS.toMillis(zombieRemovalTimes))
+ .collect(Collectors.toList()));
+ }
if (CollectionUtils.isNotEmpty(commonUpstreams)) {
UPSTREAM_MAP.put(selectorDO.getId(), commonUpstreams);
PENDING_SYNC.add(NumberUtils.INTEGER_ZERO);
@@ -439,6 +485,7 @@ public class UpstreamCheckService {
/**
* get the zombie removal time value.
+ *
* @return zombie removal time value
*/
public static int getZombieRemovalTimes() {
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
index 4a40cb0df3..bd60b69518 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/transfer/DiscoveryTransfer.java
@@ -33,6 +33,7 @@ import org.apache.shenyu.admin.model.vo.DiscoveryUpstreamVO;
import org.apache.shenyu.admin.model.vo.DiscoveryVO;
import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.ProxySelectorData;
+import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
import org.apache.shenyu.common.utils.GsonUtils;
import java.util.Optional;
@@ -65,6 +66,19 @@ public enum DiscoveryTransfer {
.dateUpdated(data.getDateUpdated())
.dateCreated(data.getDateCreated()).build()).orElse(null);
}
+
+ /**
+ * mapToCommonUpstream.
+ *
+ * @param discoveryUpstreamData discoveryUpstreamData
+ * @return CommonUpstream
+ */
+ public CommonUpstream mapToCommonUpstream(DiscoveryUpstreamData
discoveryUpstreamData) {
+ return Optional.ofNullable(discoveryUpstreamData).map(data -> {
+ String url = data.getUrl();
+ return new CommonUpstream(data.getProtocol(), url.split(":")[0],
url, false, data.getDateCreated().getTime());
+ }).orElse(null);
+ }
/**
* mapToVo.
diff --git
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
index fcd8a64df1..01bb6f04af 100644
---
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
+++
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/UpstreamCheckServiceTest.java
@@ -29,6 +29,7 @@ import
org.apache.shenyu.admin.service.converter.SelectorHandleConverterFactor;
import org.apache.shenyu.admin.service.impl.UpstreamCheckService;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.common.dto.DiscoveryUpstreamData;
import org.apache.shenyu.common.dto.convert.selector.DivideUpstream;
import org.apache.shenyu.common.dto.convert.selector.ZombieUpstream;
import org.apache.shenyu.common.enums.PluginEnum;
@@ -50,6 +51,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.test.util.ReflectionTestUtils;
+import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -103,6 +105,9 @@ public final class UpstreamCheckServiceTest {
private SelectorConditionMapper selectorConditionMapper;
private SelectorHandleConverterFactor converterFactor;
+
+ @Mock
+ private DiscoveryUpstreamService discoveryUpstreamService;
private final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig = new
ShenyuRegisterCenterConfig();
@@ -133,7 +138,8 @@ public final class UpstreamCheckServiceTest {
Map<String, SelectorHandleConverter> maps = new HashMap<>();
maps.put(PluginEnum.DIVIDE.getName(), new
DivideSelectorHandleConverter());
converterFactor = new SelectorHandleConverterFactor(maps);
- upstreamCheckService = new UpstreamCheckService(selectorMapper,
eventPublisher, pluginMapper, selectorConditionMapper,
shenyuRegisterCenterConfig, converterFactor);
+ upstreamCheckService = new UpstreamCheckService(selectorMapper,
eventPublisher, pluginMapper, selectorConditionMapper,
+ shenyuRegisterCenterConfig, converterFactor,
discoveryUpstreamService);
}
@Test
@@ -242,11 +248,22 @@ public final class UpstreamCheckServiceTest {
.name(MOCK_SELECTOR_NAME_OTHER)
.handle("[{\"upstreamHost\":\"localhost\",\"protocol\":\"http://\",\"localhost\":\"divide-upstream-60\",\"weight\":60}]")
.build();
+ DiscoveryUpstreamData discoveryUpstreamData =
DiscoveryUpstreamData.builder()
+ .dateCreated(new Timestamp(System.currentTimeMillis()))
+ .protocol("http")
+ .url("127.0.0.1:8080")
+ .props("{}")
+ .discoveryHandlerId("1")
+ .status(0)
+ .build();
when(pluginMapper.selectByNames(anyList())).thenReturn(Lists.newArrayList(pluginDO));
when(selectorMapper.findByPluginIds(anyList())).thenReturn(Lists.newArrayList(selectorDOWithUrlError,
selectorDOWithUrlReachable));
+
when(discoveryUpstreamService.findBySelectorId(anyString())).thenReturn(Lists.newArrayList(discoveryUpstreamData));
upstreamCheckService.fetchUpstreamData();
assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME));
+ assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME).size());
assertTrue(upstreamMap.containsKey(MOCK_SELECTOR_NAME_OTHER));
+ assertEquals(2, upstreamMap.get(MOCK_SELECTOR_NAME_OTHER).size());
}
@Test
@@ -254,7 +271,8 @@ public final class UpstreamCheckServiceTest {
Properties properties = new Properties();
properties.setProperty(Constants.IS_CHECKED, "true");
shenyuRegisterCenterConfig.setProps(properties);
- upstreamCheckService = new UpstreamCheckService(selectorMapper,
eventPublisher, pluginMapper, selectorConditionMapper,
shenyuRegisterCenterConfig, converterFactor);
+ upstreamCheckService = new UpstreamCheckService(selectorMapper,
eventPublisher, pluginMapper, selectorConditionMapper,
+ shenyuRegisterCenterConfig, converterFactor,
discoveryUpstreamService);
upstreamCheckService.close();
}
diff --git
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
index 109fa86239..f9e7645d1a 100644
---
a/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
+++
b/shenyu-plugin/shenyu-plugin-proxy/shenyu-plugin-rpc/shenyu-plugin-grpc/src/main/java/org/apache/shenyu/plugin/grpc/handler/GrpcDiscoveryUpstreamDataHandler.java
@@ -23,6 +23,7 @@ import
org.apache.shenyu.common.dto.convert.selector.GrpcUpstream;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.plugin.base.handler.DiscoveryUpstreamDataHandler;
import org.apache.shenyu.plugin.grpc.cache.ApplicationConfigCache;
+import org.apache.shenyu.plugin.grpc.cache.GrpcClientCache;
import org.springframework.util.ObjectUtils;
import java.sql.Timestamp;
@@ -42,7 +43,9 @@ public class GrpcDiscoveryUpstreamDataHandler implements
DiscoveryUpstreamDataHa
if (Objects.isNull(discoverySyncData) ||
Objects.isNull(discoverySyncData.getSelectorId())) {
return;
}
-
ApplicationConfigCache.getInstance().handlerUpstream(discoverySyncData.getSelectorId(),
convertUpstreamList(discoverySyncData.getUpstreamDataList()));
+ final String selectorId = discoverySyncData.getSelectorId();
+ ApplicationConfigCache.getInstance().handlerUpstream(selectorId,
convertUpstreamList(discoverySyncData.getUpstreamDataList()));
+ GrpcClientCache.initGrpcClient(selectorId);
}
private List<GrpcUpstream> convertUpstreamList(final
List<DiscoveryUpstreamData> upstreamList) {