This is an automated email from the ASF dual-hosted git repository.
zhangzicheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 1278156a2 polish brpc generic (#4402)
1278156a2 is described below
commit 1278156a275fb97248040138e7f71b1ddecaa448
Author: mahaitao <[email protected]>
AuthorDate: Thu Feb 23 22:19:02 2023 +0800
polish brpc generic (#4402)
* polish brpc generic
* feat:fix
---------
Co-authored-by: mahaitao617 <[email protected]>
Co-authored-by: dragon-zhang <[email protected]>
---
.../apache/shenyu/common/constant/Constants.java | 5 +
.../plugin/brpc/cache/ApplicationConfigCache.java | 5 +-
.../shenyu/plugin/brpc/proxy/BrpcProxyService.java | 1 -
.../plugin/brpc/spi/SharedThreadPoolFactory.java | 116 +++++++++++++++++++++
....starlight.api.rpc.threadpool.ThreadPoolFactory | 17 +++
5 files changed, 141 insertions(+), 3 deletions(-)
diff --git
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
index 68c933077..eb93692a2 100644
---
a/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
+++
b/shenyu-common/src/main/java/org/apache/shenyu/common/constant/Constants.java
@@ -687,6 +687,11 @@ public interface Constants {
*/
String NAMESPACE = "namespace";
+ /**
+ * brpc spi bizThreadPoolName.
+ */
+ String SHARED_BIZTHREADPOOLNAME = "shared";
+
/**
* String q.
*/
diff --git
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
index 242b9e1a3..0283c38ce 100644
---
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
+++
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/cache/ApplicationConfigCache.java
@@ -109,7 +109,9 @@ public final class ApplicationConfigCache {
StarlightClient client = CLIENT_CACHE.get(serviceConfig);
if (Objects.isNull(client)) {
BrpcParamExtInfo brpcParamExtInfo =
GsonUtils.getInstance().fromJson(metaData.getRpcExt(), BrpcParamExtInfo.class);
- client = new SingleStarlightClient(brpcParamExtInfo.getHost(),
brpcParamExtInfo.getPort(), new TransportConfig());
+ TransportConfig transportConfig = new TransportConfig();
+
transportConfig.setBizThreadPoolName(Constants.SHARED_BIZTHREADPOOLNAME);
+ client = new SingleStarlightClient(brpcParamExtInfo.getHost(),
brpcParamExtInfo.getPort(), transportConfig);
client.init();
CLIENT_CACHE.put(serviceConfig, client);
}
@@ -281,7 +283,6 @@ public final class ApplicationConfigCache {
this.methodName = methodName;
}
-
/**
* Gets paramTypes.
*
diff --git
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
index 7a8b779c7..4fbe7b997 100644
---
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
+++
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/proxy/BrpcProxyService.java
@@ -83,7 +83,6 @@ public class BrpcProxyService {
}
}
initThreadPool();
- //todo use
com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory impl it
CompletableFuture<Object> future = CompletableFuture.supplyAsync(() ->
getValue(metaData, params), threadPool);
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.isNull(ret)) {
diff --git
a/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/spi/SharedThreadPoolFactory.java
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/spi/SharedThreadPoolFactory.java
new file mode 100644
index 000000000..2303fc7f4
--- /dev/null
+++
b/shenyu-plugin/shenyu-plugin-brpc/src/main/java/org/apache/shenyu/plugin/brpc/spi/SharedThreadPoolFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.brpc.spi;
+
+import com.baidu.cloud.starlight.api.common.URI;
+import com.baidu.cloud.starlight.api.rpc.RpcService;
+import com.baidu.cloud.starlight.api.rpc.config.ServiceConfig;
+import com.baidu.cloud.starlight.api.rpc.threadpool.NamedThreadFactory;
+import com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor;
+import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * shared threadpool factory.
+ */
+public class SharedThreadPoolFactory implements ThreadPoolFactory {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SharedThreadPoolFactory.class);
+
+ private ThreadPoolExecutor defaultThreadPool;
+
+ private final Map<RpcService, ThreadPoolExecutor> threadPoolMap = new
ConcurrentHashMap();
+
+ public SharedThreadPoolFactory() {
+ }
+
+ @Override
+ public void initDefaultThreadPool(final URI uri, final String
threadPrefix) {
+ this.defaultThreadPool =
SpringBeanUtils.getInstance().getBean(ShenyuThreadPoolExecutor.class);
+ }
+
+ @Override
+ public ThreadPoolExecutor getThreadPool(final RpcService rpcService) {
+ if (rpcService == null) {
+ return this.defaultThreadPool;
+ } else if (this.threadPoolMap.get(rpcService) != null) {
+ return (ThreadPoolExecutor) this.threadPoolMap.get(rpcService);
+ } else {
+ ServiceConfig serviceConfig = rpcService.getServiceConfig();
+ if (serviceConfig == null) {
+ return this.defaultThreadPool;
+ } else if (serviceConfig.getCustomizeThreadPool() != null &&
serviceConfig.getCustomizeThreadPool()) {
+ Integer corePoolSize = serviceConfig.getThreadPoolSize();
+ Integer maxThreadPoolSize =
serviceConfig.getMaxThreadPoolSize();
+ Integer keepAliveTime =
serviceConfig.getIdleThreadKeepAliveSecond();
+ Integer maxQueueSize = serviceConfig.getMaxRunnableQueueSize();
+
+ try {
+ ThreadPoolExecutor threadPool;
+ synchronized (this) {
+ if (this.threadPoolMap.get(rpcService) != null) {
+ return (ThreadPoolExecutor)
this.threadPoolMap.get(rpcService);
+ }
+
+ threadPool = new ThreadPoolExecutor(corePoolSize,
maxThreadPoolSize, (long) keepAliveTime,
+ TimeUnit.SECONDS, new
LinkedBlockingQueue(maxQueueSize),
+ new NamedThreadFactory("service-biz-work"));
+ this.threadPoolMap.put(rpcService, threadPool);
+ }
+
+ return threadPool;
+ } catch (Exception e) {
+ LOGGER.warn("Create service thread pool failed, will use
default thread pool");
+ return this.defaultThreadPool;
+ }
+ } else {
+ return this.defaultThreadPool;
+ }
+ }
+ }
+
+ @Override
+ public ThreadPoolExecutor defaultThreadPool() {
+ return this.defaultThreadPool;
+ }
+
+ @Override
+ public void close() {
+ Iterator var1 = this.threadPoolMap.values().iterator();
+
+ while (var1.hasNext()) {
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) var1.next();
+ if (!threadPool.isShutdown()) {
+ threadPool.shutdown();
+ }
+ }
+
+ this.threadPoolMap.clear();
+ if (this.defaultThreadPool != null) {
+ this.defaultThreadPool.shutdown();
+ }
+
+ }
+}
diff --git
a/shenyu-plugin/shenyu-plugin-brpc/src/main/resources/META-INF/services/com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory
b/shenyu-plugin/shenyu-plugin-brpc/src/main/resources/META-INF/services/com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory
new file mode 100644
index 000000000..ffcdc2705
--- /dev/null
+++
b/shenyu-plugin/shenyu-plugin-brpc/src/main/resources/META-INF/services/com.baidu.cloud.starlight.api.rpc.threadpool.ThreadPoolFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.shenyu.plugin.brpc.spi.SharedThreadPoolFactory