This is an automated email from the ASF dual-hosted git repository.
zhangzicheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 997e935fd [ISSUE #4356] optimize UpstreamCheckService (#4366)
997e935fd is described below
commit 997e935fd8283d4d4feb5de7712d616b1e50a37f
Author: iwangjie <[email protected]>
AuthorDate: Fri Feb 10 23:30:36 2023 +0800
[ISSUE #4356] optimize UpstreamCheckService (#4366)
* fix issue_4536 review commit
* checkstyle pass
* shenyu-4356 [Task] optimize UpstreamCheckService
* Reserve the judgment and remove parallelStream.
* implementation clear and simple
* delete UpstreamWithSelectorId
---------
Co-authored-by: dragon-zhang <[email protected]>
---
.../admin/service/impl/UpstreamCheckService.java | 92 ++++++++++++++++------
shenyu-admin/src/main/resources/application.yml | 1 +
.../src/main/resources/application.yml | 1 +
.../apache/shenyu/common/config/ShenyuConfig.java | 22 +++++-
.../apache/shenyu/common/constant/Constants.java | 10 +++
.../dto/convert/selector/ZombieUpstream.java | 2 -
.../shenyu/common/config/ShenyuConfigTest.java | 3 +-
.../loadbalancer/cache/UpstreamCacheManager.java | 4 +
.../loadbalancer/cache/UpstreamCheckTask.java | 24 +++++-
9 files changed, 129 insertions(+), 30 deletions(-)
diff --git
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
index c25f630d0..4cb1a81c2 100644
---
a/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
+++
b/shenyu-admin/src/main/java/org/apache/shenyu/admin/service/impl/UpstreamCheckService.java
@@ -65,9 +65,11 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -97,6 +99,8 @@ public class UpstreamCheckService {
private final boolean checked;
+ private final Integer scheduledThreads;
+
private final SelectorMapper selectorMapper;
private final ApplicationEventPublisher eventPublisher;
@@ -111,6 +115,10 @@ public class UpstreamCheckService {
private ScheduledFuture<?> scheduledFuture;
+ private ScheduledThreadPoolExecutor invokeExecutor;
+
+ private final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
/**
* Instantiates a new Upstream check service.
*
@@ -134,6 +142,7 @@ public class UpstreamCheckService {
this.converterFactor = converterFactor;
Properties props = shenyuRegisterCenterConfig.getProps();
this.checked =
Boolean.parseBoolean(props.getProperty(Constants.IS_CHECKED,
Constants.DEFAULT_CHECK_VALUE));
+ this.scheduledThreads =
Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_THREADS,
Constants.ZOMBIE_CHECK_THREADS_VALUE));
this.zombieCheckTimes =
Integer.parseInt(props.getProperty(Constants.ZOMBIE_CHECK_TIMES,
Constants.ZOMBIE_CHECK_TIMES_VALUE));
this.scheduledTime =
Integer.parseInt(props.getProperty(Constants.SCHEDULED_TIME,
Constants.SCHEDULED_TIME_VALUE));
this.registerType = shenyuRegisterCenterConfig.getRegisterType();
@@ -151,6 +160,9 @@ public class UpstreamCheckService {
this.fetchUpstreamData();
executor = new ScheduledThreadPoolExecutor(1,
ShenyuThreadFactory.create("scheduled-upstream-task", false));
scheduledFuture = executor.scheduleWithFixedDelay(this::scheduled,
10, scheduledTime, TimeUnit.SECONDS);
+
+ ThreadFactory requestFactory =
ShenyuThreadFactory.create("upstream-health-check-request", true);
+ invokeExecutor = new
ScheduledThreadPoolExecutor(this.scheduledThreads, requestFactory);
}
}
@@ -203,7 +215,7 @@ public class UpstreamCheckService {
executor.execute(() -> updateHandler(selectorId, upstreams,
upstreams));
return true;
}
-
+
/**
* If the health check passes, the service will be added to
* the normal service list; if the health check fails, the service
@@ -213,7 +225,7 @@ public class UpstreamCheckService {
* that do not register with the gateway by listening to
* {@link org.springframework.context.event.ContextRefreshedEvent},
* which will cause some problems,
- * check https://github.com/apache/shenyu/issues/3484 for more details.
+ * check <a href="https://github.com/apache/shenyu/issues/3484">...</a>
for more details.
*
* @param selectorId the selector id
* @param commonUpstream the common upstream
@@ -244,18 +256,37 @@ public class UpstreamCheckService {
private void scheduled() {
try {
- if (!ZOMBIE_SET.isEmpty()) {
- ZOMBIE_SET.parallelStream().forEach(this::checkZombie);
- }
- if (!UPSTREAM_MAP.isEmpty()) {
- UPSTREAM_MAP.forEach(this::check);
- }
+ doCheck();
+ waitFinish();
} catch (Exception e) {
LOG.error("upstream scheduled check error -------- ", e);
}
}
+ private void doCheck() {
+ // check zombie
+ if (!ZOMBIE_SET.isEmpty()) {
+ ZOMBIE_SET.forEach(this::checkZombie);
+ }
+ // check up
+ if (!UPSTREAM_MAP.isEmpty()) {
+ UPSTREAM_MAP.forEach(this::check);
+ }
+ }
+
+ private void waitFinish() {
+ // wait all check success
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+ // clear, for next time
+ futures.clear();
+ }
+
private void checkZombie(final ZombieUpstream zombieUpstream) {
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() ->
checkZombie0(zombieUpstream), invokeExecutor);
+ futures.add(future);
+ }
+
+ private void checkZombie0(final ZombieUpstream zombieUpstream) {
ZOMBIE_SET.remove(zombieUpstream);
String selectorId = zombieUpstream.getSelectorId();
CommonUpstream commonUpstream = zombieUpstream.getCommonUpstream();
@@ -277,23 +308,36 @@ public class UpstreamCheckService {
}
private void check(final String selectorId, final List<CommonUpstream>
upstreamList) {
- List<CommonUpstream> successList =
Lists.newArrayListWithCapacity(upstreamList.size());
+ final List<CompletableFuture<CommonUpstream>> checkFutures = new
ArrayList<>(upstreamList.size());
for (CommonUpstream commonUpstream : upstreamList) {
- final boolean pass =
UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
- if (pass) {
- if (!commonUpstream.isStatus()) {
- commonUpstream.setTimestamp(System.currentTimeMillis());
- commonUpstream.setStatus(true);
- LOG.info("UpstreamCacheManager check success the url: {},
host: {} ", commonUpstream.getUpstreamUrl(), commonUpstream.getUpstreamHost());
+ checkFutures.add(CompletableFuture.supplyAsync(() -> {
+ final boolean pass =
UpstreamCheckUtils.checkUrl(commonUpstream.getUpstreamUrl());
+ if (pass) {
+ if (!commonUpstream.isStatus()) {
+
commonUpstream.setTimestamp(System.currentTimeMillis());
+ commonUpstream.setStatus(true);
+ LOG.info("UpstreamCacheManager check success the url:
{}, host: {} ", commonUpstream.getUpstreamUrl(),
commonUpstream.getUpstreamHost());
+ }
+ return commonUpstream;
+ } else {
+ commonUpstream.setStatus(false);
+ ZOMBIE_SET.add(ZombieUpstream.transform(commonUpstream,
zombieCheckTimes, selectorId));
+ LOG.error("check the url={} is fail ",
commonUpstream.getUpstreamUrl());
}
- successList.add(commonUpstream);
- } else {
- commonUpstream.setStatus(false);
- ZOMBIE_SET.add(ZombieUpstream.transform(commonUpstream,
zombieCheckTimes, selectorId));
- LOG.error("check the url={} is fail ",
commonUpstream.getUpstreamUrl());
- }
+ return null;
+ }).exceptionally(ex -> {
+ LOG.error("An exception occurred during the check of url {}:
{}", commonUpstream.getUpstreamUrl(), ex);
+ return null;
+ }));
}
- updateHandler(selectorId, upstreamList, successList);
+
+ this.futures.add(CompletableFuture.runAsync(() -> {
+ List<CommonUpstream> successList = checkFutures.stream()
+ .map(CompletableFuture::join)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ updateHandler(selectorId, upstreamList, successList);
+ }));
}
private void updateHandler(final String selectorId, final
List<CommonUpstream> upstreamList, final List<CommonUpstream> successList) {
@@ -362,7 +406,7 @@ public class UpstreamCheckService {
}
});
}
-
+
/**
* listen {@link SelectorCreatedEvent} add data permission.
*
@@ -376,7 +420,7 @@ public class UpstreamCheckService {
replace(event.getSelector().getId(),
CommonUpstreamUtils.convertCommonUpstreamList(existDivideUpstreams));
}
}
-
+
/**
* listen {@link SelectorCreatedEvent} add data permission.
*
diff --git a/shenyu-admin/src/main/resources/application.yml
b/shenyu-admin/src/main/resources/application.yml
index b04145134..d2388e005 100755
--- a/shenyu-admin/src/main/resources/application.yml
+++ b/shenyu-admin/src/main/resources/application.yml
@@ -42,6 +42,7 @@ shenyu:
sessionTimeout: 5000
connectionTimeout: 2000
checked: true
+ zombieCheckThreads: 10
zombieCheckTimes: 5
scheduledTime: 10
nacosNameSpace: ShenyuRegisterCenter
diff --git a/shenyu-bootstrap/src/main/resources/application.yml
b/shenyu-bootstrap/src/main/resources/application.yml
index 030c40151..f90973d6f 100644
--- a/shenyu-bootstrap/src/main/resources/application.yml
+++ b/shenyu-bootstrap/src/main/resources/application.yml
@@ -238,6 +238,7 @@ shenyu:
threads: 16
upstreamCheck:
enabled: false
+ poolSize: 10
timeout: 3000
healthyThreshold: 1
unhealthyThreshold: 1
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
index f0d3631da..3f1bd8b68 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/config/ShenyuConfig.java
@@ -794,6 +794,8 @@ public class ShenyuConfig {
public static class UpstreamCheck {
private boolean enabled;
+
+ private Integer poolSize = 10;
private Integer timeout = 3000;
@@ -824,7 +826,25 @@ public class ShenyuConfig {
public void setEnabled(final boolean enabled) {
this.enabled = enabled;
}
-
+
+ /**
+ * get checkThreadPoolSize.
+ *
+ * @return checkThreadPoolSize
+ */
+ public Integer getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * set checkThreadPoolSize.
+ *
+ * @param poolSize checkThreadPoolSize
+ */
+ public void setPoolSize(final Integer poolSize) {
+ this.poolSize = poolSize;
+ }
+
/**
* Gets timeout.
*
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 94c487ff9..68c933077 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -437,6 +437,16 @@ public interface Constants {
*/
String DEFAULT_CHECK_VALUE = "false";
+ /**
+ * zombie check threads.
+ */
+ String ZOMBIE_CHECK_THREADS = "zombieCheckThreads";
+
+ /**
+ * default zombie check threads value.
+ */
+ String ZOMBIE_CHECK_THREADS_VALUE = "10";
+
/**
* zombie check times.
*/
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
index dfba6bdfd..4ef84c518 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/dto/convert/selector/ZombieUpstream.java
@@ -130,7 +130,6 @@ public class ZombieUpstream {
}
ZombieUpstream that = (ZombieUpstream) o;
return new EqualsBuilder()
- .append(zombieCheckTimes, that.zombieCheckTimes)
.append(commonUpstream, that.commonUpstream)
.append(selectorId, that.selectorId)
.isEquals();
@@ -140,7 +139,6 @@ public class ZombieUpstream {
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(commonUpstream)
- .append(zombieCheckTimes)
.append(selectorId)
.toHashCode();
}
diff --git
a/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
b/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
index cce15adc4..9ff90c729 100644
---
a/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
+++
b/shenyu-common/src/test/java/org/apache/shenyu/common/config/ShenyuConfigTest.java
@@ -80,6 +80,7 @@ public class ShenyuConfigTest {
public void testUpstreamCheck() {
ShenyuConfig.UpstreamCheck upstreamCheck = config.getUpstreamCheck();
upstreamCheck.setEnabled(false);
+ upstreamCheck.setPoolSize(10);
upstreamCheck.setHealthyThreshold(4);
upstreamCheck.setTimeout(10);
upstreamCheck.setInterval(5);
@@ -87,7 +88,7 @@ public class ShenyuConfigTest {
upstreamCheck.setPrintEnabled(false);
upstreamCheck.setPrintInterval(5);
- notEmptyElements(upstreamCheck.getEnabled(),
upstreamCheck.getHealthyThreshold(), upstreamCheck.getTimeout(),
+ notEmptyElements(upstreamCheck.getEnabled(),
upstreamCheck.getPoolSize(), upstreamCheck.getHealthyThreshold(),
upstreamCheck.getTimeout(),
upstreamCheck.getInterval(),
upstreamCheck.getUnhealthyThreshold(), upstreamCheck.getPrintInterval(),
upstreamCheck.getPrintEnabled());
}
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
index 899ae24db..630b9f155 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCacheManager.java
@@ -51,6 +51,8 @@ public final class UpstreamCacheManager {
*/
private Boolean checkEnable;
+ private int poolSize;
+
private int checkTimeout;
private int checkInterval;
@@ -74,6 +76,7 @@ public final class UpstreamCacheManager {
ShenyuConfig shenyuConfig =
Optional.ofNullable(Singleton.INST.get(ShenyuConfig.class)).orElse(new
ShenyuConfig());
UpstreamCheck upstreamCheck = shenyuConfig.getUpstreamCheck();
checkEnable = upstreamCheck.getEnabled();
+ poolSize = upstreamCheck.getPoolSize();
checkTimeout = upstreamCheck.getTimeout();
healthyThreshold = upstreamCheck.getHealthyThreshold();
unhealthyThreshold = upstreamCheck.getUnhealthyThreshold();
@@ -86,6 +89,7 @@ public final class UpstreamCacheManager {
private void createTask() {
task = new UpstreamCheckTask(checkInterval);
+ task.setPoolSize(poolSize);
task.setCheckTimeout(checkTimeout);
task.setHealthyThreshold(healthyThreshold);
task.setUnhealthyThreshold(unhealthyThreshold);
diff --git
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
index 46b9ddc76..e604a22ef 100644
---
a/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
+++
b/shenyu-loadbalancer/src/main/java/org/apache/shenyu/loadbalancer/cache/UpstreamCheckTask.java
@@ -64,6 +64,8 @@ public final class UpstreamCheckTask implements Runnable {
private ExecutorService executor;
+ private int poolSize;
+
private int checkTimeout = 3000;
private int healthyThreshold = 1;
@@ -99,7 +101,7 @@ public final class UpstreamCheckTask implements Runnable {
// executor for async request, avoid request block health check thread
ThreadFactory requestFactory =
ShenyuThreadFactory.create("upstream-health-check-request", true);
- executor = new ScheduledThreadPoolExecutor(10, requestFactory);
+ executor = new ScheduledThreadPoolExecutor(poolSize, requestFactory);
}
/**
@@ -110,7 +112,25 @@ public final class UpstreamCheckTask implements Runnable {
public void setCheckTimeout(final int checkTimeout) {
this.checkTimeout = checkTimeout;
}
-
+
+ /**
+ * get checkThreadPoolSize.
+ *
+ * @return checkThreadPoolSize
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+
+ /**
+ * set checkThreadPoolSize.
+ *
+ * @param poolSize checkThreadPoolSize
+ */
+ public void setPoolSize(final int poolSize) {
+ this.poolSize = poolSize;
+ }
+
/**
* Set healthy threshold.
*