This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f16b757d5b9e179682407518b52c31447b8d1bac
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jul 17 10:18:41 2025 +0300

    [fix][client] Fix issue in auto releasing of idle connection with topics 
pattern consumer (#24528)
    
    (cherry picked from commit 8be127fe3834851008061da0de22e591b9c7b130)
---
 .../apache/pulsar/broker/MultiBrokerBaseTest.java  |  14 ++
 .../impl/AutoCloseUselessClientConSupports.java    |   2 +-
 ...eUselessClientConTopicsPatternConsumerTest.java | 106 +++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   3 +
 .../pulsar/client/impl/ClientCnxIdleState.java     |   7 +-
 .../pulsar/client/impl/ClientCnxIdleStateTest.java |  61 +++++++++
 .../apache/pulsar/client/impl/ClientCnxTest.java   |  11 ++
 .../impl/AutoCloseUselessClientConProxyTest.java   | 150 +++++++++++++++++++++
 8 files changed, 352 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index 6e4d7893adb..2a3e705dca1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -50,6 +50,11 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
         super.internalSetup();
         additionalBrokersSetup();
         pulsarResourcesSetup();
+        additionalSetup();
+    }
+
+    protected void additionalSetup() throws Exception {
+        // override this method to add any additional setup logic
     }
 
     protected void pulsarResourcesSetup() throws PulsarAdminException {
@@ -91,9 +96,18 @@ public abstract class MultiBrokerBaseTest extends 
MockedPulsarServiceBaseTest {
     @Override
     public final void cleanup() throws Exception {
         additionalBrokersCleanup();
+        try {
+            additionalCleanup();
+        } catch (Exception e) {
+            log.warn("Exception during additional cleanup", e);
+        }
         super.internalCleanup();
     }
 
+    protected void additionalCleanup() throws Exception {
+        // override this method to add any additional cleanup logic
+    }
+
     protected void additionalBrokersCleanup() {
         if (additionalBrokerAdmins != null) {
             for (PulsarAdmin additionalBrokerAdmin : additionalBrokerAdmins) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
index e03b1709137..29dc36959f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 
-public class AutoCloseUselessClientConSupports extends MultiBrokerBaseTest {
+public abstract class AutoCloseUselessClientConSupports extends 
MultiBrokerBaseTest {
 
     protected int BROKER_COUNT = 5;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java
new file mode 100644
index 00000000000..b68cca0d50f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.Test;
+
+@Test
+public class AutoCloseUselessClientConTopicsPatternConsumerTest extends 
AutoCloseUselessClientConSupports {
+    private static final String TOPIC_NAME = 
BrokerTestUtil.newUniqueName("pattern_");
+    private static final String TOPIC_FULL_NAME = 
"persistent://public/default/" + TOPIC_NAME;
+    private static final String TOPIC_PATTERN = 
"persistent://public/default/pattern_.*";
+
+    @Override
+    protected void pulsarResourcesSetup() throws PulsarAdminException {
+        super.pulsarResourcesSetup();
+        admin.topics().createNonPartitionedTopic(TOPIC_FULL_NAME);
+    }
+
+    @Test
+    public void testConnectionAutoReleaseWhileUsingTopicsPatternConsumer() 
throws Exception {
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .connectionsPerBroker(Integer.MAX_VALUE);
+        AtomicInteger connectionCreationCounter = new AtomicInteger(0);
+        PulsarClientImpl pulsarClient = 
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> {
+            connectionCreationCounter.incrementAndGet();
+            return new ClientCnx(conf, eventLoopGroup);
+        });
+
+        @Cleanup
+        Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topicsPattern(TOPIC_PATTERN)
+                .isAckReceiptEnabled(true)
+                .subscriptionName("my-subscription-x")
+                .subscribe();
+        @Cleanup
+        Producer producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(TOPIC_NAME)
+                .create();
+
+        Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES)
+                .topicsPattern(TOPIC_PATTERN)
+                .subscriptionName("my-subscription-y")
+                .subscribe();
+
+        // check that there are more than 3 connections
+        // at least 3 connections are required:
+        // 1 for "producer", 1 for "consumer", and 1 for the topic watcher of 
"consumer"
+        // additional connections will be created for the second consumer and 
its topic watcher
+        // there's also a connection for topic lookup
+        assertThat(pulsarClient.getCnxPool().getPoolSize()).isGreaterThan(3);
+
+        int connectionsCreatedBefore = connectionCreationCounter.get();
+
+        // trigger releasing of unused client connections
+        trigReleaseConnection(pulsarClient);
+
+        // close consumer2 that creates an extra connection due to 
connectionsPerBroker set to Integer.MAX_VALUE
+        consumer2.close();
+
+        // trigger releasing of unused client connections
+        trigReleaseConnection(pulsarClient);
+
+        // verify that the number of connections is 3 now, which is the 
expected number of connections
+        assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+        // Ensure all things still works
+        ensureProducerAndConsumerWorks(producer, consumer);
+
+        // Verify that the number of connections did not increase after the 
work was completed
+        assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+        assertThat(connectionCreationCounter.get())
+                .as("No new connections should be created after releasing 
unused connections since that would "
+                        + "mean that an used connection was released.")
+                .isEqualTo(connectionsCreatedBefore);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 00ecd3d6263..f798980c299 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1444,6 +1444,9 @@ public class ClientCnx extends PulsarHandler {
         if (!transactionMetaStoreHandlers.isEmpty()) {
             return false;
         }
+        if (!topicListWatchers.isEmpty()) {
+            return false;
+        }
         return true;
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
index 59d3309f248..a3bff56d298 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
@@ -31,7 +31,7 @@ public class ClientCnxIdleState {
     private final long createTime;
 
     /** The time when marks the connection is idle. **/
-    private long idleMarkTime;
+    private volatile long idleMarkTime;
 
     public ClientCnxIdleState(ClientCnx clientCnx){
         this.clientCnx = clientCnx;
@@ -171,6 +171,11 @@ public class ClientCnxIdleState {
             return;
         }
         if (isIdle()) {
+            // check if the connection is still idle, if not, mark it as using
+            if (!clientCnx.idleCheck() && compareAndSetIdleStat(State.IDLE, 
State.USING)) {
+                idleMarkTime = 0;
+                return;
+            }
             if (maxIdleSeconds * 1000 + idleMarkTime < 
System.currentTimeMillis()) {
                 tryMarkReleasing();
             }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java
new file mode 100644
index 00000000000..e3d88a3823d
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+public class ClientCnxIdleStateTest {
+
+    @Test
+    public void testShouldNotReleaseConnectionIfIdleCheckFails() throws 
InterruptedException {
+        ClientCnx clientCnx = mock(ClientCnx.class);
+        ClientCnxIdleState idleState = new ClientCnxIdleState(clientCnx);
+        int maxIdleSeconds = 1;
+
+        // initially, return true for idle check
+        doReturn(true).when(clientCnx).idleCheck();
+
+        // do the first idle detection
+        idleState.doIdleDetect(maxIdleSeconds);
+
+        // the state should be IDLE since the idle check passed
+        assertTrue(idleState.isIdle());
+
+        // Wait for more than maxIdleSeconds
+        Thread.sleep(TimeUnit.SECONDS.toMillis(maxIdleSeconds) + 1);
+
+        // now return false for idle check
+        doReturn(false).when(clientCnx).idleCheck();
+
+        // do the second idle detection
+        idleState.doIdleDetect(maxIdleSeconds);
+
+        // the state should now be USING since the idle check failed
+        assertTrue(idleState.isUsing());
+
+        // verify that idleCheck was called twice
+        verify(clientCnx, times(2)).idleCheck();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index d5fbfd22321..c29c9993a39 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -270,6 +270,17 @@ public class ClientCnxTest {
         eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testIdleCheckWithTopicListWatcher() {
+        ClientCnx cnx =
+                new ClientCnx(new ClientConfigurationData(), 
mock(EventLoopGroup.class));
+        // idle check should return true initially
+        assertTrue(cnx.idleCheck());
+        cnx.registerTopicListWatcher(0, mock(TopicListWatcher.class));
+        // idle check should now return false since there's a registered 
watcher
+        assertFalse(cnx.idleCheck());
+    }
+
     @Test
     public void testNoWatchersWhenNoServerSupport() {
         withConnection("testNoWatchersWhenNoServerSupport", cnx -> {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java
new file mode 100644
index 00000000000..aac362ac2f2
--- /dev/null
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+import org.apache.pulsar.proxy.server.ProxyService;
+import org.testng.annotations.Test;
+
+@Test
+public class AutoCloseUselessClientConProxyTest extends 
AutoCloseUselessClientConSupports {
+    private static final String TOPIC_NAME = 
BrokerTestUtil.newUniqueName("pattern_");
+    private static final String TOPIC_FULL_NAME = 
"persistent://public/default/" + TOPIC_NAME;
+    private static final String TOPIC_PATTERN = 
"persistent://public/default/pattern_.*";
+    private ProxyService proxyService;
+    private ProxyConfiguration proxyConfig;
+    private PulsarClient proxiedClient;
+    private AtomicInteger connectionCreationCounter = new AtomicInteger(0);
+
+    @Override
+    protected void additionalSetup() throws Exception {
+        proxyConfig = new ProxyConfiguration();
+        proxyConfig.setServicePort(Optional.ofNullable(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setMetadataStoreUrl("dummy");
+        proxyConfig.setClusterName(configClusterName);
+        startProxyService();
+        // use the same port for subsequent restarts
+        proxyConfig.setServicePort(proxyService.getListenPort());
+
+        ClientBuilderImpl clientBuilder = (ClientBuilderImpl) 
PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:" + 
proxyService.getListenPort().get())
+                .connectionsPerBroker(Integer.MAX_VALUE) // effectively uses a 
different connection for each usage
+                .statsInterval(0, TimeUnit.SECONDS);
+        proxiedClient = InjectedClientCnxClientBuilder.create(clientBuilder, 
(conf, eventLoopGroup) -> {
+            connectionCreationCounter.incrementAndGet();
+            return new ClientCnx(conf, eventLoopGroup);
+        });
+        registerCloseable(proxiedClient);
+    }
+
+    private void startProxyService() throws Exception {
+        proxyService = BrokerTestUtil.spyWithoutRecordingInvocations(new 
ProxyService(proxyConfig,
+                new 
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+                AuthenticationDisabled.INSTANCE));
+        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+        doReturn(registerCloseable(new 
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+                .createConfigurationMetadataStore();
+        proxyService.start();
+        registerCloseable(proxyService);
+    }
+
+    @Override
+    protected void pulsarResourcesSetup() throws PulsarAdminException {
+        super.pulsarResourcesSetup();
+        admin.topics().createNonPartitionedTopic(TOPIC_FULL_NAME);
+    }
+
+    @Test
+    public void 
testConnectionAutoReleaseWhileUsingTopicsPatternConsumerAndProxy() throws 
Exception {
+        PulsarClientImpl pulsarClient = (PulsarClientImpl) proxiedClient;
+        @Cleanup
+        Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topicsPattern(TOPIC_PATTERN)
+                .isAckReceiptEnabled(true)
+                .subscriptionName("my-subscription-x")
+                .subscribe();
+        @Cleanup
+        Producer producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(TOPIC_NAME)
+                .create();
+
+        Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES)
+                .topicsPattern(TOPIC_PATTERN)
+                .subscriptionName("my-subscription-y")
+                .subscribe();
+
+        int poolSizeAfterCreatingConsumersAndProducer = 
pulsarClient.getCnxPool().getPoolSize();
+        // check that there are more than 3 connections
+        // at least 3 connections are required:
+        // 1 for "producer", 1 for "consumer", and 1 for the topic watcher of 
"consumer"
+        // additional connections will be created for the second consumer and 
its topic watcher
+        // there's also a connection for topic lookup
+        assertThat(poolSizeAfterCreatingConsumersAndProducer).isGreaterThan(3);
+
+        int connectionsCreatedBefore = connectionCreationCounter.get();
+
+        // trigger releasing of unused client connections
+        trigReleaseConnection(pulsarClient);
+
+        // verify that the number of connections is still more than 3, but 
less than the pool size after creating
+        // consumers and producer
+        // since the lookup connection to the proxy should be closed now
+        assertThat(pulsarClient.getCnxPool().getPoolSize())
+                .isGreaterThan(3)
+                .isLessThan(poolSizeAfterCreatingConsumersAndProducer);
+
+        // close consumer2 that creates an extra connection due to 
connectionsPerBroker set to Integer.MAX_VALUE
+        consumer2.close();
+
+        // trigger releasing of unused client connections
+        trigReleaseConnection(pulsarClient);
+
+        // verify that the number of connections is 3 now, which is the 
expected number of connections
+        assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+        // Ensure all things still works
+        ensureProducerAndConsumerWorks(producer, consumer);
+
+        // Verify that the number of connections did not increase after the 
work was completed
+        assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+        assertThat(connectionCreationCounter.get())
+                .as("No new connections should be created after releasing 
unused connections since that would "
+                        + "mean that an used connection was released.")
+                .isEqualTo(connectionsCreatedBefore);
+    }
+}

Reply via email to