This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bc3dc7727b1 [fix] [client] Fix resource leak in Pulsar Client since
HttpLookupService doesn't get closed (#22858)
bc3dc7727b1 is described below
commit bc3dc7727b132dd88aa84f6befef42ea0646ec50
Author: fengyubiao <[email protected]>
AuthorDate: Tue Jun 18 14:33:33 2024 +0800
[fix] [client] Fix resource leak in Pulsar Client since HttpLookupService
doesn't get closed (#22858)
---
.../admin/PulsarClientImplMultiBrokersTest.java | 79 ++++++++++++++++++++++
.../pulsar/client/impl/PulsarClientImpl.java | 22 ++++++
2 files changed, 101 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java
new file mode 100644
index 00000000000..29604d0440b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PulsarClientImplMultiBrokersTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.MultiBrokerBaseTest;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.annotations.Test;
+
+/**
+ * Test multi-broker admin api.
+ */
+@Slf4j
+@Test(groups = "broker-admin")
+public class PulsarClientImplMultiBrokersTest extends MultiBrokerBaseTest {
+ @Override
+ protected int numberOfAdditionalBrokers() {
+ return 3;
+ }
+
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ this.conf.setManagedLedgerMaxEntriesPerLedger(10);
+ }
+
+ @Override
+ protected void onCleanup() {
+ super.onCleanup();
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testReleaseUrlLookupServices() throws Exception {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl)
additionalBrokerClients.get(0);
+ Map<String, LookupService> urlLookupMap =
WhiteboxImpl.getInternalState(pulsarClient, "urlLookupMap");
+ assertEquals(urlLookupMap.size(), 0);
+ for (PulsarService pulsar : additionalBrokers) {
+ pulsarClient.getLookup(pulsar.getBrokerServiceUrl());
+ pulsarClient.getLookup(pulsar.getWebServiceAddress());
+ }
+ assertEquals(urlLookupMap.size(), additionalBrokers.size() * 2);
+ // Verify: lookup services will be release.
+ pulsarClient.close();
+ assertEquals(urlLookupMap.size(), 0);
+ try {
+ for (PulsarService pulsar : additionalBrokers) {
+ pulsarClient.getLookup(pulsar.getBrokerServiceUrl());
+ pulsarClient.getLookup(pulsar.getWebServiceAddress());
+ }
+ fail("Expected a error when calling pulsarClient.getLookup if
getLookup was closed");
+ } catch (IllegalStateException illegalArgumentException) {
+ assertTrue(illegalArgumentException.getMessage().contains("has
been closed"));
+ }
+ assertEquals(urlLookupMap.size(), 0);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e8107efe98e..f4afb2931cc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -33,6 +33,7 @@ import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -744,6 +745,21 @@ public class PulsarClientImpl implements PulsarClient {
}
}
+ private void closeUrlLookupMap() {
+ Map<String, LookupService> closedUrlLookupServices = new
HashMap(urlLookupMap.size());
+ urlLookupMap.entrySet().forEach(e -> {
+ try {
+ e.getValue().close();
+ } catch (Exception ex) {
+ log.error("Error closing lookup service {}", e.getKey(), ex);
+ }
+ closedUrlLookupServices.put(e.getKey(), e.getValue());
+ });
+ closedUrlLookupServices.entrySet().forEach(e -> {
+ urlLookupMap.remove(e.getKey(), e.getValue());
+ });
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
log.info("Client closing. URL: {}", lookup.getServiceUrl());
@@ -754,6 +770,8 @@ public class PulsarClientImpl implements PulsarClient {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
+ closeUrlLookupMap();
+
producers.forEach(p -> futures.add(p.closeAsync().handle((__, t) -> {
if (t != null) {
log.error("Error closing producer {}", p, t);
@@ -982,6 +1000,10 @@ public class PulsarClientImpl implements PulsarClient {
public LookupService getLookup(String serviceUrl) {
return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
+ if (isClosed()) {
+ throw new IllegalStateException("Pulsar client has been
closed, can not build LookupService when"
+ + " calling get lookup with an url");
+ }
try {
return createLookup(serviceUrl);
} catch (PulsarClientException e) {