This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6486a55ca735e01af0b1b2af1b8ec9e9d035cf19 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Aug 30 18:53:22 2021 -0700 Fixed Proxy leaking oubound connections (#11848) --- .../pulsar/proxy/server/ProxyConnection.java | 23 ++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 3fdb0d5e..25f88102 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -69,6 +69,7 @@ import lombok.Getter; public class ProxyConnection extends PulsarHandler implements FutureListener<Void> { // ConnectionPool is used by the proxy to issue lookup requests private PulsarClientImpl client; + private ConnectionPool connectionPool; private ProxyService service; private Authentication clientAuthentication; AuthenticationDataSource authenticationData; @@ -159,6 +160,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi } service.getClientCnxs().remove(this); LOG.info("[{}] Connection closed", remoteAddress); + + if (connectionPool != null) { + try { + connectionPool.close(); + } catch (Exception e) { + LOG.error("Failed to close connection pool {}", e.getMessage(), e); + } + } } @Override @@ -297,9 +306,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi // authn not enabled, complete if (!service.getConfiguration().isAuthenticationEnabled()) { - this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(), - new ProxyConnectionPool(clientConf, service.getWorkerGroup(), - () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer()); + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)); + this.client = + new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); completeConnect(); return; @@ -434,9 +444,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData, final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { - return new PulsarClientImpl(clientConf, service.getWorkerGroup(), - new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, - service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer()); + this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, + clientAuthMethod, protocolVersion)); + return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); } private static int getProtocolVersionToAdvertise(CommandConnect connect) {