This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3fd43353fd [ISSUE #7393] Add timeout configuration for grpc server
(#7394)
3fd43353fd is described below
commit 3fd43353fdf880deb5d63ba3ad50cc6e3259dc3a
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Sep 26 13:53:51 2023 +0800
[ISSUE #7393] Add timeout configuration for grpc server (#7394)
* Add timeout configuration for grpc server
* Add proxyConfig
---
.../main/java/org/apache/rocketmq/proxy/ProxyStartup.java | 1 +
.../java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 9 +++++++++
.../main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java | 10 ++++++++--
.../org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java | 12 +++++++++++-
4 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 06d5f4525f..3b2ca99bfd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -85,6 +85,7 @@ public class ProxyStartup {
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
.configInterceptor(accessValidators)
+
.shutdownTime(ConfigurationManager.getProxyConfig().getGrpcShutdownTimeSeconds(),
TimeUnit.SECONDS)
.build();
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index b2478fec3a..c0d00d8640 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -87,6 +87,7 @@ public class ProxyConfig implements ConfigFile {
*/
private String proxyMode = ProxyMode.CLUSTER.name();
private Integer grpcServerPort = 8081;
+ private long grpcShutdownTimeSeconds = 30;
private int grpcBossLoopNum = 1;
private int grpcWorkerLoopNum = PROCESSOR_NUMBER * 2;
private boolean enableGrpcEpoll = false;
@@ -443,6 +444,14 @@ public class ProxyConfig implements ConfigFile {
this.grpcServerPort = grpcServerPort;
}
+ public long getGrpcShutdownTimeSeconds() {
+ return grpcShutdownTimeSeconds;
+ }
+
+ public void setGrpcShutdownTimeSeconds(long grpcShutdownTimeSeconds) {
+ this.grpcShutdownTimeSeconds = grpcShutdownTimeSeconds;
+ }
+
public boolean isUseEndpointPortFromRequest() {
return useEndpointPortFromRequest;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
index 1bffa3c0be..d5b896fe14 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
@@ -29,8 +29,14 @@ public class GrpcServer implements StartAndShutdown {
private final Server server;
- protected GrpcServer(Server server) {
+ private final long timeout;
+
+ private final TimeUnit unit;
+
+ protected GrpcServer(Server server, long timeout, TimeUnit unit) {
this.server = server;
+ this.timeout = timeout;
+ this.unit = unit;
}
public void start() throws Exception {
@@ -40,7 +46,7 @@ public class GrpcServer implements StartAndShutdown {
public void shutdown() {
try {
- this.server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ this.server.shutdown().awaitTermination(timeout, unit);
log.info("grpc server shutdown successfully.");
} catch (Exception e) {
e.printStackTrace();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index 9cddd30137..0e79006f6b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -41,6 +41,10 @@ public class GrpcServerBuilder {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected NettyServerBuilder serverBuilder;
+ protected long time = 30;
+
+ protected TimeUnit unit = TimeUnit.SECONDS;
+
public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor,
int port) {
return new GrpcServerBuilder(executor, port);
}
@@ -77,6 +81,12 @@ public class GrpcServerBuilder {
port, bossLoopNum, workerLoopNum, maxInboundMessageSize);
}
+ public GrpcServerBuilder shutdownTime(long time, TimeUnit unit) {
+ this.time = time;
+ this.unit = unit;
+ return this;
+ }
+
public GrpcServerBuilder addService(BindableService service) {
this.serverBuilder.addService(service);
return this;
@@ -93,7 +103,7 @@ public class GrpcServerBuilder {
}
public GrpcServer build() {
- return new GrpcServer(this.serverBuilder.build());
+ return new GrpcServer(this.serverBuilder.build(), time, unit);
}
public GrpcServerBuilder configInterceptor(List<AccessValidator>
accessValidators) {