This is an automated email from the ASF dual-hosted git repository.
liuhongyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 3ef7607a8b [type:feat] shenyu client heartbeat (#5659)
3ef7607a8b is described below
commit 3ef7607a8b3e3132ef77a55cdf09921d2b64427e
Author: aias00 <[email protected]>
AuthorDate: Wed Nov 6 11:22:36 2024 +0800
[type:feat] shenyu client heartbeat (#5659)
* [type:feature] add client heartbeat
* [type:feature] add client heartbeat
* [type:feature] support build native image
* [type:feature] add client heartbeat, debug
* [type:feature] add client heartbeat, debug
* [type:feature] add client heartbeat, debug
* [type:feature] add client heartbeat
* [type:feature] add client heartbeat, debug
* [type:feature] add client heartbeat, debug
* [type:feature] add client heartbeat, debug
* [type:feature] add client heartbeat
* [type:feature] add client heartbeat, adapt to namespace
* [type:feature] shenyu client heartbeat merge
* [type:feature] client heartbeat
* [type:feature] client heartbeat
* [type:feature] client heartbeat
* [type:feature] client heartbeat
* [type:feature] client heartbeat
* [type:feature] client register heartbeat
* [type:feature] client heartbeat
* [type:feature] client heartbeat
* [type:feature] client heartbeat
---------
Co-authored-by: moremind <[email protected]>
---
.../subscriber/URIRegisterExecutorSubscriber.java | 9 +++++
.../listener/AbstractDataChangedListener.java | 3 --
.../admin/listener/DataChangedEventDispatcher.java | 4 +--
.../admin/service/impl/SelectorServiceImpl.java | 2 --
.../AbstractShenyuClientRegisterServiceImpl.java | 41 ++++++++++++++++++++--
.../FallbackShenyuClientRegisterService.java | 16 ++++++++-
.../ShenyuClientRegisterDivideServiceImpl.java | 7 +---
.../register/ShenyuClientRegisterService.java | 12 +++++++
.../shenyu/admin/utils/CommonUpstreamUtils.java | 10 ++++--
.../FallbackShenyuClientRegisterServiceTest.java | 14 ++++++--
.../admin/utils/CommonUpstreamUtilsTest.java | 5 +--
.../ShenyuClientURIExecutorSubscriber.java | 28 ++++++++++++++-
.../apache/shenyu/common/constant/Constants.java | 6 ++++
.../client/api/ShenyuClientRegisterRepository.java | 8 +++++
.../client/http/HttpClientRegisterRepository.java | 29 +++++++++++++++
.../register/client/http/utils/RegisterUtils.java | 23 ++++++++++++
.../shenyu/register/common/enums/EventType.java | 7 +++-
.../shenyu/register/common/type/DataType.java | 5 +++
18 files changed, 204 insertions(+), 25 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/disruptor/subscriber/URIRegisterExecutorSubscriber.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/disruptor/subscriber/URIRegisterExecutorSubscriber.java
index 05626524fa..5ff30e5ebd 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/disruptor/subscriber/URIRegisterExecutorSubscriber.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/disruptor/subscriber/URIRegisterExecutorSubscriber.java
@@ -72,6 +72,7 @@ public class URIRegisterExecutorSubscriber implements
ExecutorTypeSubscriber<URI
Map<String, List<URIRegisterDTO>> listMap =
buildData(list);
listMap.forEach((selectorName, uriList) -> {
final List<URIRegisterDTO> register = new
LinkedList<>();
+ final List<URIRegisterDTO> heartbeat = new
LinkedList<>();
final List<URIRegisterDTO> offline = new
LinkedList<>();
for (URIRegisterDTO d : uriList) {
final EventType eventType = d.getEventType();
@@ -80,6 +81,8 @@ public class URIRegisterExecutorSubscriber implements
ExecutorTypeSubscriber<URI
register.add(d);
} else if
(EventType.OFFLINE.equals(eventType)) {
offline.add(d);
+ } else if
(EventType.HEARTBEAT.equals(eventType)) {
+ heartbeat.add(d);
}
}
if (CollectionUtils.isNotEmpty(register)) {
@@ -88,6 +91,12 @@ public class URIRegisterExecutorSubscriber implements
ExecutorTypeSubscriber<URI
.findFirst()
.ifPresent(namespaceId ->
service.registerURI(selectorName, register, namespaceId));
}
+ if (CollectionUtils.isNotEmpty(heartbeat)) {
+
heartbeat.stream().map(URIRegisterDTO::getNamespaceId)
+ .filter(StringUtils::isNotBlank)
+ .findFirst()
+ .ifPresent(namespaceId ->
service.heartbeat(selectorName, register, namespaceId));
+ }
if (CollectionUtils.isNotEmpty(offline)) {
offline.stream().map(URIRegisterDTO::getNamespaceId)
.filter(StringUtils::isNotBlank)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
index 97362f8834..fd55ee0031 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/AbstractDataChangedListener.java
@@ -42,7 +42,6 @@ import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.utils.GsonUtils;
-import org.apache.shenyu.common.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -198,7 +197,6 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
if (CollectionUtils.isEmpty(changed)) {
return;
}
- LOG.info("onRuleChanged, changed:{}, eventType:{}",
JsonUtils.toJson(changed), JsonUtils.toJson(eventType));
String namespaceId = changed.stream().map(value ->
StringUtils.defaultString(value.getNamespaceId(),
SYS_DEFAULT_NAMESPACE_ID)).findFirst().get();
this.updateRuleCache(namespaceId);
this.afterRuleChanged(changed, eventType, namespaceId);
@@ -303,7 +301,6 @@ public abstract class AbstractDataChangedListener
implements DataChangedListener
ConfigDataCache newVal = new ConfigDataCache(configDataCacheKey, json,
DigestUtils.md5Hex(json), System.currentTimeMillis(), namespaceId);
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
LOG.info("update config cache[{}], old: {}, updated: {}", group,
oldVal, newVal);
- LOG.info("update config json: {}", json);
}
/**
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
index 869eb661c9..df3c725311 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/listener/DataChangedEventDispatcher.java
@@ -78,9 +78,7 @@ public class DataChangedEventDispatcher implements
ApplicationListener<DataChang
LOG.info("received DataChangedEvent, not master, pass");
return;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("received DataChangedEvent, dispatching, event:{}",
JsonUtils.toJson(event));
- }
+ LOG.info("received DataChangedEvent, dispatching, event:{}",
JsonUtils.toJson(event));
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>)
event.getSource(), event.getEventType());
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/SelectorServiceImpl.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/SelectorServiceImpl.java
index 9339df0f53..ca148aaec1 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/SelectorServiceImpl.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/SelectorServiceImpl.java
@@ -70,7 +70,6 @@ import org.apache.shenyu.common.enums.DataEventTypeEnum;
import org.apache.shenyu.common.enums.MatchModeEnum;
import org.apache.shenyu.common.enums.SelectorTypeEnum;
import org.apache.shenyu.common.utils.ContextPathUtils;
-import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.JsonUtils;
import org.apache.shenyu.common.utils.ListUtil;
import org.apache.shenyu.register.common.dto.MetaDataRegisterDTO;
@@ -166,7 +165,6 @@ public class SelectorServiceImpl implements SelectorService
{
public String registerDefault(final SelectorDTO selectorDTO) {
SelectorDO selectorDO = SelectorDO.buildSelectorDO(selectorDTO);
if (StringUtils.isEmpty(selectorDTO.getId())) {
- LOG.info("register default selector :{}",
GsonUtils.getInstance().toJson(selectorDTO));
selectorMapper.insertSelective(selectorDO);
createCondition(selectorDO.getId(),
selectorDTO.getSelectorConditions());
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/AbstractShenyuClientRegisterServiceImpl.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/AbstractShenyuClientRegisterServiceImpl.java
index 442809d484..0c3e6a18cf 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/AbstractShenyuClientRegisterServiceImpl.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/AbstractShenyuClientRegisterServiceImpl.java
@@ -50,6 +50,8 @@ import org.apache.shenyu.common.utils.PluginNameAdapter;
import org.apache.shenyu.register.common.dto.ApiDocRegisterDTO;
import org.apache.shenyu.register.common.dto.MetaDataRegisterDTO;
import org.apache.shenyu.register.common.dto.URIRegisterDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import java.util.Collections;
@@ -64,6 +66,8 @@ import static
org.apache.shenyu.common.constant.Constants.SYS_DEFAULT_NAMESPACE_
*/
public abstract class AbstractShenyuClientRegisterServiceImpl extends
FallbackShenyuClientRegisterService implements ShenyuClientRegisterService {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractShenyuClientRegisterServiceImpl.class);
+
/**
* The Event publisher.
*/
@@ -166,6 +170,7 @@ public abstract class
AbstractShenyuClientRegisterServiceImpl extends FallbackSh
*
* @param selectorName the selector name
* @param uriList the uri list
+ * @param namespaceId the namespace id
* @return the string
*/
@Override
@@ -194,7 +199,39 @@ public abstract class
AbstractShenyuClientRegisterServiceImpl extends FallbackSh
}
return ShenyuResultMessage.SUCCESS;
}
-
+
+ @Override
+ public String doHeartbeat(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
+ if (CollectionUtils.isEmpty(uriList)) {
+ return "";
+ }
+ String pluginName = PluginNameAdapter.rpcTypeAdapter(rpcType());
+ SelectorDO selectorDO =
selectorService.findByNameAndPluginNameAndNamespaceId(selectorName, pluginName,
namespaceId);
+ if (Objects.isNull(selectorDO)) {
+ throw new ShenyuException("doHeartbeat Failed to execute,wait to
retry.");
+ }
+ // update upstream
+ List<URIRegisterDTO> validUriList = uriList.stream().filter(dto ->
Objects.nonNull(dto.getPort()) &&
StringUtils.isNotBlank(dto.getHost())).toList();
+ if (CollectionUtils.isEmpty(validUriList)) {
+ return null;
+ }
+ // discovery publish change event.
+ String selectorId = selectorDO.getId();
+ // change live node status to TRUE
+ validUriList.forEach(uriRegisterDTO -> {
+ DiscoveryUpstreamDTO discoveryUpstreamDTO =
CommonUpstreamUtils.buildDefaultDiscoveryUpstreamDTO(uriRegisterDTO.getHost(),
+ uriRegisterDTO.getPort(),
+ uriRegisterDTO.getProtocol(),
+ uriRegisterDTO.getNamespaceId());
+ LOG.info("change alive selectorId={}|url={}", selectorId,
discoveryUpstreamDTO.getUrl());
+
discoveryUpstreamService.changeStatusBySelectorIdAndUrl(selectorId,
discoveryUpstreamDTO.getUrl(), Boolean.TRUE);
+ });
+ DiscoverySyncData discoverySyncData = fetch(selectorId,
selectorDO.getName(), pluginName, namespaceId);
+ eventPublisher.publishEvent(new
DataChangedEvent(ConfigGroupEnum.DISCOVER_UPSTREAM, DataEventTypeEnum.REFRESH,
Collections.singletonList(discoverySyncData)));
+
+ return ShenyuResultMessage.SUCCESS;
+ }
+
protected void doDiscoveryLocal(final SelectorDO selectorDO, final String
pluginName, final List<URIRegisterDTO> uriList) {
String discoveryHandlerId =
discoveryService.registerDefaultDiscovery(selectorDO.getId(), pluginName,
selectorDO.getNamespaceId());
for (URIRegisterDTO uriRegisterDTO : uriList) {
@@ -271,7 +308,7 @@ public abstract class
AbstractShenyuClientRegisterServiceImpl extends FallbackSh
return true;
}
return commonUpstreamList.stream().map(upstream ->
upstreamCheckService.checkAndSubmit(selectorId, upstream))
- .collect(Collectors.toList()).stream().findAny().orElse(false);
+ .toList().stream().findAny().orElse(false);
}
/**
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterService.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterService.java
index c58df729a7..8dc7bf2032 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterService.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterService.java
@@ -72,7 +72,12 @@ public abstract class FallbackShenyuClientRegisterService
implements ShenyuClien
}
return result;
}
-
+
+ @Override
+ public String heartbeat(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
+ return doHeartbeat(selectorName, uriList, namespaceId);
+ }
+
private void addFallback(final String key, final FallbackHolder holder) {
FallbackHolder oldObj = fallsRegisters.get(key);
if (Objects.nonNull(oldObj)) {
@@ -111,6 +116,15 @@ public abstract class FallbackShenyuClientRegisterService
implements ShenyuClien
*/
abstract String doRegisterURI(String selectorName, List<URIRegisterDTO>
uriList, String namespaceId);
+ /**
+ * Heartbeat.
+ *
+ * @param selectorName the selector name
+ * @param uriList the uri list
+ * @return the string
+ */
+ abstract String doHeartbeat(String selectorName, List<URIRegisterDTO>
uriList, String namespaceId);
+
/**
* The type Fall holder.
*/
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
index 2e1be7bd73..0ec8dcc1d0 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterDivideServiceImpl.java
@@ -37,7 +37,6 @@ import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.common.utils.PluginNameAdapter;
import org.apache.shenyu.register.common.dto.MetaDataRegisterDTO;
import org.apache.shenyu.register.common.dto.URIRegisterDTO;
-import org.apache.shenyu.register.common.enums.EventType;
import org.springframework.stereotype.Service;
import java.util.Collections;
@@ -80,10 +79,6 @@ public class ShenyuClientRegisterDivideServiceImpl extends
AbstractContextPathRe
protected String buildHandle(final List<URIRegisterDTO> uriList, final
SelectorDO selectorDO) {
List<DivideUpstream> addList = buildDivideUpstreamList(uriList);
List<DivideUpstream> canAddList = new CopyOnWriteArrayList<>();
- boolean isEventDeleted = uriList.size() == 1 &&
EventType.DELETED.equals(uriList.get(0).getEventType());
- if (isEventDeleted) {
- addList.get(0).setStatus(false);
- }
List<DivideUpstream> existList =
GsonUtils.getInstance().fromCurrentList(selectorDO.getHandle(),
DivideUpstream.class);
if (CollectionUtils.isEmpty(existList)) {
canAddList = addList;
@@ -108,7 +103,7 @@ public class ShenyuClientRegisterDivideServiceImpl extends
AbstractContextPathRe
private List<DivideUpstream> buildDivideUpstreamList(final
List<URIRegisterDTO> uriList) {
return uriList.stream()
- .map(dto ->
CommonUpstreamUtils.buildDivideUpstream(dto.getProtocol(), dto.getHost(),
dto.getPort(), dto.getNamespaceId()))
+ .map(dto ->
CommonUpstreamUtils.buildDivideUpstream(dto.getProtocol(), dto.getHost(),
dto.getPort(), dto.getNamespaceId(), dto.getEventType()))
.collect(Collectors.toCollection(CopyOnWriteArrayList::new));
}
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterService.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterService.java
index 57a69b16c4..4abf235796 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterService.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/register/ShenyuClientRegisterService.java
@@ -63,6 +63,18 @@ public interface ShenyuClientRegisterService {
return Constants.SUCCESS;
}
+ /**
+ * Register heartbeat.
+ *
+ * @param selectorName the selector name
+ * @param uriList the uri list
+ * @param namespaceId the namespace id
+ * @return the string
+ */
+ default String heartbeat(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
+ return Constants.SUCCESS;
+ }
+
/**
* offline.
*
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/utils/CommonUpstreamUtils.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/utils/CommonUpstreamUtils.java
index 9402117dc8..a6df53b953 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/utils/CommonUpstreamUtils.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/utils/CommonUpstreamUtils.java
@@ -26,6 +26,7 @@ import
org.apache.shenyu.common.dto.convert.selector.DubboUpstream;
import org.apache.shenyu.common.dto.convert.selector.GrpcUpstream;
import org.apache.shenyu.common.dto.convert.selector.TarsUpstream;
import org.apache.shenyu.common.dto.convert.selector.WebSocketUpstream;
+import org.apache.shenyu.register.common.enums.EventType;
import java.util.Collections;
import java.util.List;
@@ -100,14 +101,19 @@ public class CommonUpstreamUtils {
* @param host the host
* @param port the port
* @param namespaceId the namespaceId
+ * @param eventType the eventType
* @return the divide upstream
*/
- public static DivideUpstream buildDivideUpstream(final String protocol,
final String host, final Integer port, final String namespaceId) {
+ public static DivideUpstream buildDivideUpstream(final String protocol,
final String host, final Integer port, final String namespaceId, final
EventType eventType) {
return DivideUpstream.builder().upstreamHost(LOCALHOST)
.protocol(protocol).upstreamUrl(buildUrl(host, port))
.weight(50).warmup(Constants.WARMUP_TIME)
.timestamp(System.currentTimeMillis())
- .status(Objects.nonNull(port) && StringUtils.isNotBlank(host))
+ .status(!EventType.DELETED.equals(eventType)
+ && !EventType.OFFLINE.equals(eventType)
+ && !EventType.IGNORED.equals(eventType)
+ && Objects.nonNull(port)
+ && StringUtils.isNotBlank(host))
.namespaceId(namespaceId)
.build();
}
diff --git
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterServiceTest.java
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterServiceTest.java
index f7d94c61da..4880c749b6 100644
---
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterServiceTest.java
+++
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/service/register/FallbackShenyuClientRegisterServiceTest.java
@@ -50,7 +50,12 @@ class FallbackShenyuClientRegisterServiceTest {
String doRegisterURI(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
return "doRegisterURI";
}
-
+
+ @Override
+ String doHeartbeat(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
+ return "doHeartbeat";
+ }
+
@Override
public String rpcType() {
return "grpc";
@@ -73,7 +78,12 @@ class FallbackShenyuClientRegisterServiceTest {
String doRegisterURI(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
throw new ShenyuException("Exception");
}
-
+
+ @Override
+ String doHeartbeat(final String selectorName, final
List<URIRegisterDTO> uriList, final String namespaceId) {
+ throw new ShenyuException("Exception");
+ }
+
@Override
public String rpcType() {
return "grpc";
diff --git
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/utils/CommonUpstreamUtilsTest.java
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/utils/CommonUpstreamUtilsTest.java
index 1f8964ad31..2a8168fa00 100644
---
a/shenyu-admin/src/test/java/org/apache/shenyu/admin/utils/CommonUpstreamUtilsTest.java
+++
b/shenyu-admin/src/test/java/org/apache/shenyu/admin/utils/CommonUpstreamUtilsTest.java
@@ -22,6 +22,7 @@ import
org.apache.shenyu.common.dto.convert.selector.WebSocketUpstream;
import org.apache.shenyu.common.dto.convert.selector.DubboUpstream;
import org.apache.shenyu.common.dto.convert.selector.GrpcUpstream;
import org.apache.shenyu.common.dto.convert.selector.CommonUpstream;
+import org.apache.shenyu.register.common.enums.EventType;
import org.junit.Assert;
import org.junit.Test;
@@ -57,7 +58,7 @@ public final class CommonUpstreamUtilsTest {
@Test
public void buildDivideUpstream() {
- DivideUpstream divideUpstream =
CommonUpstreamUtils.buildDivideUpstream("http", HOST, PORT,
SYS_DEFAULT_NAMESPACE_ID);
+ DivideUpstream divideUpstream =
CommonUpstreamUtils.buildDivideUpstream("http", HOST, PORT,
SYS_DEFAULT_NAMESPACE_ID, EventType.REGISTER);
Assert.assertNotNull(divideUpstream);
Assert.assertEquals(HOST + ":" + PORT,
divideUpstream.getUpstreamUrl());
Assert.assertEquals("http", divideUpstream.getProtocol());
@@ -114,7 +115,7 @@ public final class CommonUpstreamUtilsTest {
@Test
public void convertCommonUpstreamList() {
List<DivideUpstream> existDivideUpstreams = new ArrayList<>();
- DivideUpstream divideUpstream =
CommonUpstreamUtils.buildDivideUpstream("http", HOST, PORT,
SYS_DEFAULT_NAMESPACE_ID);
+ DivideUpstream divideUpstream =
CommonUpstreamUtils.buildDivideUpstream("http", HOST, PORT,
SYS_DEFAULT_NAMESPACE_ID, EventType.REGISTER);
existDivideUpstreams.add(divideUpstream);
List<CommonUpstream> commonUpstreams =
CommonUpstreamUtils.convertCommonUpstreamList(existDivideUpstreams);
diff --git
a/shenyu-client/shenyu-client-core/src/main/java/org/apache/shenyu/client/core/disruptor/subcriber/ShenyuClientURIExecutorSubscriber.java
b/shenyu-client/shenyu-client-core/src/main/java/org/apache/shenyu/client/core/disruptor/subcriber/ShenyuClientURIExecutorSubscriber.java
index e7b5eb5f89..6fa228dc88 100644
---
a/shenyu-client/shenyu-client-core/src/main/java/org/apache/shenyu/client/core/disruptor/subcriber/ShenyuClientURIExecutorSubscriber.java
+++
b/shenyu-client/shenyu-client-core/src/main/java/org/apache/shenyu/client/core/disruptor/subcriber/ShenyuClientURIExecutorSubscriber.java
@@ -18,8 +18,10 @@
package org.apache.shenyu.client.core.disruptor.subcriber;
import com.google.common.base.Stopwatch;
-import org.apache.shenyu.client.core.shutdown.ShutdownHookManager;
+import com.google.common.collect.Lists;
import org.apache.shenyu.client.core.shutdown.ShenyuClientShutdownHook;
+import org.apache.shenyu.client.core.shutdown.ShutdownHookManager;
+import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.register.client.api.ShenyuClientRegisterRepository;
import org.apache.shenyu.register.common.dto.URIRegisterDTO;
import org.apache.shenyu.register.common.enums.EventType;
@@ -32,6 +34,9 @@ import org.springframework.beans.BeanUtils;
import java.io.IOException;
import java.net.Socket;
import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
@@ -41,8 +46,12 @@ public class ShenyuClientURIExecutorSubscriber implements
ExecutorTypeSubscriber
private static final Logger LOG =
LoggerFactory.getLogger(ShenyuClientURIExecutorSubscriber.class);
+ private static final List<URIRegisterDTO> URIS = Lists.newArrayList();
+
private final ShenyuClientRegisterRepository
shenyuClientRegisterRepository;
+ private final ScheduledThreadPoolExecutor executor;
+
/**
* Instantiates a new Shenyu client uri executor subscriber.
*
@@ -50,6 +59,11 @@ public class ShenyuClientURIExecutorSubscriber implements
ExecutorTypeSubscriber
*/
public ShenyuClientURIExecutorSubscriber(final
ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
this.shenyuClientRegisterRepository = shenyuClientRegisterRepository;
+ // executor for send heartbeat
+ ThreadFactory requestFactory =
ShenyuThreadFactory.create("heartbeat-reporter", true);
+ executor = new ScheduledThreadPoolExecutor(1, requestFactory);
+
+ executor.scheduleAtFixedRate(() -> URIS.forEach(this::sendHeartbeat),
30, 30, TimeUnit.SECONDS);
}
@Override
@@ -84,12 +98,24 @@ public class ShenyuClientURIExecutorSubscriber implements
ExecutorTypeSubscriber
}
ShenyuClientShutdownHook.delayOtherHooks();
shenyuClientRegisterRepository.persistURI(uriRegisterDTO);
+
+ URIS.add(uriRegisterDTO);
+
ShutdownHookManager.get().addShutdownHook(new Thread(() -> {
final URIRegisterDTO offlineDTO = new URIRegisterDTO();
BeanUtils.copyProperties(uriRegisterDTO, offlineDTO);
offlineDTO.setEventType(EventType.OFFLINE);
shenyuClientRegisterRepository.offline(offlineDTO);
+
+ // shutdown heartbeat executor
+ if (!executor.isTerminated()) {
+ executor.shutdown();
+ }
}), 2);
}
}
+
+ private void sendHeartbeat(final URIRegisterDTO uriRegisterDTO) {
+ shenyuClientRegisterRepository.sendHeartbeat(uriRegisterDTO);
+ }
}
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 578ed25bff..0c9ce0474a 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -252,6 +252,12 @@ public interface Constants {
*/
String URI = "uri";
+
+ /**
+ * The constant HEARTBEAT.
+ */
+ String HEARTBEAT = "heartbeat";
+
/**
* The constant header key of sign plugin version-2.
*/
diff --git
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-api/src/main/java/org/apache/shenyu/register/client/api/ShenyuClientRegisterRepository.java
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-api/src/main/java/org/apache/shenyu/register/client/api/ShenyuClientRegisterRepository.java
index 5b482f915e..43eead031a 100644
---
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-api/src/main/java/org/apache/shenyu/register/client/api/ShenyuClientRegisterRepository.java
+++
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-api/src/main/java/org/apache/shenyu/register/client/api/ShenyuClientRegisterRepository.java
@@ -60,6 +60,14 @@ public interface ShenyuClientRegisterRepository {
default void offline(URIRegisterDTO offlineDTO) {
}
+ /**
+ * Send heartbeat.
+ *
+ * @param heartbeatDTO the heartbeat dto
+ */
+ default void sendHeartbeat(URIRegisterDTO heartbeatDTO) {
+ }
+
/**
* persistApiDoc.
* @param apiDocRegisterDTO apiDocRegisterDTO
diff --git
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
index 01621658a4..17569fa9d3 100644
---
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
+++
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/HttpClientRegisterRepository.java
@@ -130,6 +130,15 @@ public class HttpClientRegisterRepository extends
FailbackRegistryRepository {
doUnregister(offlineDTO);
}
+ @Override
+ public void sendHeartbeat(final URIRegisterDTO heartbeatDTO) {
+ if (RuntimeUtils.listenByOther(heartbeatDTO.getPort())) {
+ return;
+ }
+ heartbeatDTO.setEventType(EventType.HEARTBEAT);
+ doHeartbeat(heartbeatDTO, Constants.URI_PATH);
+ }
+
/**
* doPersistApiDoc.
*
@@ -187,6 +196,26 @@ public class HttpClientRegisterRepository extends
FailbackRegistryRepository {
}
}
}
+
+ private <T> void doHeartbeat(final T t, final String path) {
+ int i = 0;
+ for (String server : serverList) {
+ i++;
+ String concat = server.concat(path);
+ try {
+ String accessToken = this.accessToken.get(server);
+ if (StringUtils.isBlank(accessToken)) {
+ throw new NullPointerException("accessToken is null");
+ }
+ RegisterUtils.doHeartBeat(GsonUtils.getInstance().toJson(t),
concat, Constants.HEARTBEAT, accessToken);
+ } catch (Exception e) {
+ LOGGER.error("HeartBeat admin url :{} is fail, will retry.",
server, e);
+ if (i == serverList.size()) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
private <T> void doUnregister(final T t) {
for (String server : serverList) {
diff --git
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/utils/RegisterUtils.java
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/utils/RegisterUtils.java
index 11889d00d7..00cb8e2804 100644
---
a/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/utils/RegisterUtils.java
+++
b/shenyu-register-center/shenyu-register-client/shenyu-register-client-http/src/main/java/org/apache/shenyu/register/client/http/utils/RegisterUtils.java
@@ -82,6 +82,29 @@ public final class RegisterUtils {
LOGGER.error("{} client register error: {} ", type, json);
}
}
+
+ /**
+ * Do heartbeat.
+ *
+ * @param json the json
+ * @param url the url
+ * @param type the type
+ * @param accessToken the token
+ * @throws IOException the io exception
+ */
+ public static void doHeartBeat(final String json, final String url, final
String type, final String accessToken) throws IOException {
+ if (StringUtils.isBlank(accessToken)) {
+ LOGGER.error("{} client heartbeat error accessToken is null,
please check the config : {} ", type, json);
+ return;
+ }
+ Headers headers = new Headers.Builder().add(Constants.X_ACCESS_TOKEN,
accessToken).build();
+ String result = OkHttpTools.getInstance().post(url, json, headers);
+ if (Objects.equals(SUCCESS, result)) {
+ LOGGER.info("{} success: {} ", type, json);
+ } else {
+ LOGGER.error("{} error: {} ", type, json);
+ }
+ }
/**
* Do unregister.
diff --git
a/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/enums/EventType.java
b/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/enums/EventType.java
index 06265097dc..300985fa18 100644
---
a/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/enums/EventType.java
+++
b/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/enums/EventType.java
@@ -45,5 +45,10 @@ public enum EventType {
/**
* offline event type.
*/
- OFFLINE
+ OFFLINE,
+
+ /**
+ * heartbeat event type.
+ */
+ HEARTBEAT
}
diff --git
a/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/type/DataType.java
b/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/type/DataType.java
index 37ddf62dba..b539219621 100644
---
a/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/type/DataType.java
+++
b/shenyu-register-center/shenyu-register-common/src/main/java/org/apache/shenyu/register/common/type/DataType.java
@@ -37,6 +37,11 @@ public enum DataType {
*/
API_DOC,
+ /**
+ * Heartbeat type enum.
+ */
+ HEARTBEAT,
+
/**
* Discovery config type enum.
*/