This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e159846ecc properly shutdown query client with await termination
(#8922)
e159846ecc is described below
commit e159846ecc9117ac93e0b655cb03f0dc8f0a4ba6
Author: Rong Rong <[email protected]>
AuthorDate: Sat Jun 18 14:19:40 2022 -0700
properly shutdown query client with await termination (#8922)
Co-authored-by: Rong Rong <[email protected]>
---
.../broker/requesthandler/GrpcBrokerRequestHandler.java | 7 +++++++
.../apache/pinot/common/utils/grpc/GrpcQueryClient.java | 15 ++++++++++++++-
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 0bd33a9289..0b3744f018 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -78,6 +78,7 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
@Override
public synchronized void shutDown() {
+ _streamingQueryClient.shutdown();
_streamingReduceService.shutDown();
}
@@ -146,5 +147,11 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
String key = String.format("%s_%d", host, port);
return _grpcQueryClientMap.computeIfAbsent(key, k -> new
GrpcQueryClient(host, port, _config));
}
+
+ public void shutdown() {
+ for (GrpcQueryClient client : _grpcQueryClientMap.values()) {
+ client.close();
+ }
+ }
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index bbb70d85e9..df074d2581 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -27,6 +27,7 @@ import
io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManagerFactory;
@@ -35,9 +36,14 @@ import org.apache.pinot.common.proto.PinotQueryServerGrpc;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class GrpcQueryClient {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcQueryClient.class);
+ private static final int DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND = 10;
+
private final ManagedChannel _managedChannel;
private final PinotQueryServerGrpc.PinotQueryServerBlockingStub
_blockingStub;
@@ -83,7 +89,14 @@ public class GrpcQueryClient {
public void close() {
if (!_managedChannel.isShutdown()) {
- _managedChannel.shutdownNow();
+ try {
+ _managedChannel.shutdownNow();
+ if
(!_managedChannel.awaitTermination(DEFAULT_CHANNEL_SHUTDOWN_TIMEOUT_SECOND,
TimeUnit.SECONDS)) {
+ LOGGER.warn("Timed out forcefully shutting down connection: {}. ",
_managedChannel);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Unexpected exception while waiting for channel
termination", e);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]