This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8be127fe383 [fix][client] Fix issue in auto releasing of idle
connection with topics pattern consumer (#24528)
8be127fe383 is described below
commit 8be127fe3834851008061da0de22e591b9c7b130
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jul 17 10:18:41 2025 +0300
[fix][client] Fix issue in auto releasing of idle connection with topics
pattern consumer (#24528)
---
.../apache/pulsar/broker/MultiBrokerBaseTest.java | 14 ++
.../impl/AutoCloseUselessClientConSupports.java | 2 +-
...eUselessClientConTopicsPatternConsumerTest.java | 107 +++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 3 +
.../pulsar/client/impl/ClientCnxIdleState.java | 7 +-
.../pulsar/client/impl/ClientCnxIdleStateTest.java | 61 +++++++++
.../apache/pulsar/client/impl/ClientCnxTest.java | 11 ++
.../impl/AutoCloseUselessClientConProxyTest.java | 151 +++++++++++++++++++++
8 files changed, 354 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
index 6e4d7893adb..2a3e705dca1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java
@@ -50,6 +50,11 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
super.internalSetup();
additionalBrokersSetup();
pulsarResourcesSetup();
+ additionalSetup();
+ }
+
+ protected void additionalSetup() throws Exception {
+ // override this method to add any additional setup logic
}
protected void pulsarResourcesSetup() throws PulsarAdminException {
@@ -91,9 +96,18 @@ public abstract class MultiBrokerBaseTest extends
MockedPulsarServiceBaseTest {
@Override
public final void cleanup() throws Exception {
additionalBrokersCleanup();
+ try {
+ additionalCleanup();
+ } catch (Exception e) {
+ log.warn("Exception during additional cleanup", e);
+ }
super.internalCleanup();
}
+ protected void additionalCleanup() throws Exception {
+ // override this method to add any additional cleanup logic
+ }
+
protected void additionalBrokersCleanup() {
if (additionalBrokerAdmins != null) {
for (PulsarAdmin additionalBrokerAdmin : additionalBrokerAdmins) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
index 43f07eef17e..b8855656ad0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java
@@ -39,7 +39,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import org.awaitility.Awaitility;
import org.testng.Assert;
-public class AutoCloseUselessClientConSupports extends MultiBrokerBaseTest {
+public abstract class AutoCloseUselessClientConSupports extends
MultiBrokerBaseTest {
protected static final int BROKER_COUNT = 5;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java
new file mode 100644
index 00000000000..981612d3930
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class AutoCloseUselessClientConTopicsPatternConsumerTest extends
AutoCloseUselessClientConSupports {
+ private static final String TOPIC_NAME =
BrokerTestUtil.newUniqueName("pattern_");
+ private static final String TOPIC_FULL_NAME =
"persistent://public/default/" + TOPIC_NAME;
+ private static final String TOPIC_PATTERN =
"persistent://public/default/pattern_.*";
+
+ @Override
+ protected void pulsarResourcesSetup() throws PulsarAdminException {
+ super.pulsarResourcesSetup();
+ admin.topics().createNonPartitionedTopic(TOPIC_FULL_NAME);
+ }
+
+ @Test
+ public void testConnectionAutoReleaseWhileUsingTopicsPatternConsumer()
throws Exception {
+ ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .connectionsPerBroker(Integer.MAX_VALUE);
+ AtomicInteger connectionCreationCounter = new AtomicInteger(0);
+ PulsarClientImpl pulsarClient =
InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> {
+ connectionCreationCounter.incrementAndGet();
+ return new ClientCnx(InstrumentProvider.NOOP, conf,
eventLoopGroup);
+ });
+
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topicsPattern(TOPIC_PATTERN)
+ .isAckReceiptEnabled(true)
+ .subscriptionName("my-subscription-x")
+ .subscribe();
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(TOPIC_NAME)
+ .create();
+
+ Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES)
+ .topicsPattern(TOPIC_PATTERN)
+ .subscriptionName("my-subscription-y")
+ .subscribe();
+
+ // check that there are more than 3 connections
+ // at least 3 connections are required:
+ // 1 for "producer", 1 for "consumer", and 1 for the topic watcher of
"consumer"
+ // additional connections will be created for the second consumer and
its topic watcher
+ // there's also a connection for topic lookup
+ assertThat(pulsarClient.getCnxPool().getPoolSize()).isGreaterThan(3);
+
+ int connectionsCreatedBefore = connectionCreationCounter.get();
+
+ // trigger releasing of unused client connections
+ trigReleaseConnection(pulsarClient);
+
+ // close consumer2 that creates an extra connection due to
connectionsPerBroker set to Integer.MAX_VALUE
+ consumer2.close();
+
+ // trigger releasing of unused client connections
+ trigReleaseConnection(pulsarClient);
+
+ // verify that the number of connections is 3 now, which is the
expected number of connections
+ assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+ // Ensure all things still works
+ ensureProducerAndConsumerWorks(producer, consumer);
+
+ // Verify that the number of connections did not increase after the
work was completed
+ assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+ assertThat(connectionCreationCounter.get())
+ .as("No new connections should be created after releasing
unused connections since that would "
+ + "mean that an used connection was released.")
+ .isEqualTo(connectionsCreatedBefore);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a5829df64f2..d0361069aa7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1508,6 +1508,9 @@ public class ClientCnx extends PulsarHandler {
if (!transactionMetaStoreHandlers.isEmpty()) {
return false;
}
+ if (!topicListWatchers.isEmpty()) {
+ return false;
+ }
return true;
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
index 59d3309f248..a3bff56d298 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java
@@ -31,7 +31,7 @@ public class ClientCnxIdleState {
private final long createTime;
/** The time when marks the connection is idle. **/
- private long idleMarkTime;
+ private volatile long idleMarkTime;
public ClientCnxIdleState(ClientCnx clientCnx){
this.clientCnx = clientCnx;
@@ -171,6 +171,11 @@ public class ClientCnxIdleState {
return;
}
if (isIdle()) {
+ // check if the connection is still idle, if not, mark it as using
+ if (!clientCnx.idleCheck() && compareAndSetIdleStat(State.IDLE,
State.USING)) {
+ idleMarkTime = 0;
+ return;
+ }
if (maxIdleSeconds * 1000 + idleMarkTime <
System.currentTimeMillis()) {
tryMarkReleasing();
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java
new file mode 100644
index 00000000000..e3d88a3823d
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+public class ClientCnxIdleStateTest {
+
+ @Test
+ public void testShouldNotReleaseConnectionIfIdleCheckFails() throws
InterruptedException {
+ ClientCnx clientCnx = mock(ClientCnx.class);
+ ClientCnxIdleState idleState = new ClientCnxIdleState(clientCnx);
+ int maxIdleSeconds = 1;
+
+ // initially, return true for idle check
+ doReturn(true).when(clientCnx).idleCheck();
+
+ // do the first idle detection
+ idleState.doIdleDetect(maxIdleSeconds);
+
+ // the state should be IDLE since the idle check passed
+ assertTrue(idleState.isIdle());
+
+ // Wait for more than maxIdleSeconds
+ Thread.sleep(TimeUnit.SECONDS.toMillis(maxIdleSeconds) + 1);
+
+ // now return false for idle check
+ doReturn(false).when(clientCnx).idleCheck();
+
+ // do the second idle detection
+ idleState.doIdleDetect(maxIdleSeconds);
+
+ // the state should now be USING since the idle check failed
+ assertTrue(idleState.isUsing());
+
+ // verify that idleCheck was called twice
+ verify(clientCnx, times(2)).idleCheck();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index 534b436f894..e0721ffe905 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -289,6 +289,17 @@ public class ClientCnxTest {
eventLoop.shutdownGracefully();
}
+ @Test
+ public void testIdleCheckWithTopicListWatcher() {
+ ClientCnx cnx =
+ new ClientCnx(InstrumentProvider.NOOP, new
ClientConfigurationData(), mock(EventLoopGroup.class));
+ // idle check should return true initially
+ assertTrue(cnx.idleCheck());
+ cnx.registerTopicListWatcher(0, mock(TopicListWatcher.class));
+ // idle check should now return false since there's a registered
watcher
+ assertFalse(cnx.idleCheck());
+ }
+
@Test
public void testNoWatchersWhenNoServerSupport() {
withConnection("testNoWatchersWhenNoServerSupport", cnx -> {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java
new file mode 100644
index 00000000000..cec3392b385
--- /dev/null
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.pulsar.proxy.server.ProxyConfiguration;
+import org.apache.pulsar.proxy.server.ProxyService;
+import org.testng.annotations.Test;
+
+@Test
+public class AutoCloseUselessClientConProxyTest extends
AutoCloseUselessClientConSupports {
+ private static final String TOPIC_NAME =
BrokerTestUtil.newUniqueName("pattern_");
+ private static final String TOPIC_FULL_NAME =
"persistent://public/default/" + TOPIC_NAME;
+ private static final String TOPIC_PATTERN =
"persistent://public/default/pattern_.*";
+ private ProxyService proxyService;
+ private ProxyConfiguration proxyConfig;
+ private PulsarClient proxiedClient;
+ private AtomicInteger connectionCreationCounter = new AtomicInteger(0);
+
+ @Override
+ protected void additionalSetup() throws Exception {
+ proxyConfig = new ProxyConfiguration();
+ proxyConfig.setServicePort(Optional.ofNullable(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setMetadataStoreUrl("dummy");
+ proxyConfig.setClusterName(configClusterName);
+ startProxyService();
+ // use the same port for subsequent restarts
+ proxyConfig.setServicePort(proxyService.getListenPort());
+
+ ClientBuilderImpl clientBuilder = (ClientBuilderImpl)
PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:" +
proxyService.getListenPort().get())
+ .connectionsPerBroker(Integer.MAX_VALUE) // effectively uses a
different connection for each usage
+ .statsInterval(0, TimeUnit.SECONDS);
+ proxiedClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> {
+ connectionCreationCounter.incrementAndGet();
+ return new ClientCnx(InstrumentProvider.NOOP, conf,
eventLoopGroup);
+ });
+ registerCloseable(proxiedClient);
+ }
+
+ private void startProxyService() throws Exception {
+ proxyService = BrokerTestUtil.spyWithoutRecordingInvocations(new
ProxyService(proxyConfig,
+ new
AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)),
+ AuthenticationDisabled.INSTANCE));
+ doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
+ doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
+ .createConfigurationMetadataStore();
+ proxyService.start();
+ registerCloseable(proxyService);
+ }
+
+ @Override
+ protected void pulsarResourcesSetup() throws PulsarAdminException {
+ super.pulsarResourcesSetup();
+ admin.topics().createNonPartitionedTopic(TOPIC_FULL_NAME);
+ }
+
+ @Test
+ public void
testConnectionAutoReleaseWhileUsingTopicsPatternConsumerAndProxy() throws
Exception {
+ PulsarClientImpl pulsarClient = (PulsarClientImpl) proxiedClient;
+ @Cleanup
+ Consumer consumer = pulsarClient.newConsumer(Schema.BYTES)
+ .topicsPattern(TOPIC_PATTERN)
+ .isAckReceiptEnabled(true)
+ .subscriptionName("my-subscription-x")
+ .subscribe();
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(Schema.BYTES)
+ .topic(TOPIC_NAME)
+ .create();
+
+ Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES)
+ .topicsPattern(TOPIC_PATTERN)
+ .subscriptionName("my-subscription-y")
+ .subscribe();
+
+ int poolSizeAfterCreatingConsumersAndProducer =
pulsarClient.getCnxPool().getPoolSize();
+ // check that there are more than 3 connections
+ // at least 3 connections are required:
+ // 1 for "producer", 1 for "consumer", and 1 for the topic watcher of
"consumer"
+ // additional connections will be created for the second consumer and
its topic watcher
+ // there's also a connection for topic lookup
+ assertThat(poolSizeAfterCreatingConsumersAndProducer).isGreaterThan(3);
+
+ int connectionsCreatedBefore = connectionCreationCounter.get();
+
+ // trigger releasing of unused client connections
+ trigReleaseConnection(pulsarClient);
+
+ // verify that the number of connections is still more than 3, but
less than the pool size after creating
+ // consumers and producer
+ // since the lookup connection to the proxy should be closed now
+ assertThat(pulsarClient.getCnxPool().getPoolSize())
+ .isGreaterThan(3)
+ .isLessThan(poolSizeAfterCreatingConsumersAndProducer);
+
+ // close consumer2 that creates an extra connection due to
connectionsPerBroker set to Integer.MAX_VALUE
+ consumer2.close();
+
+ // trigger releasing of unused client connections
+ trigReleaseConnection(pulsarClient);
+
+ // verify that the number of connections is 3 now, which is the
expected number of connections
+ assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+ // Ensure all things still works
+ ensureProducerAndConsumerWorks(producer, consumer);
+
+ // Verify that the number of connections did not increase after the
work was completed
+ assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3);
+
+ assertThat(connectionCreationCounter.get())
+ .as("No new connections should be created after releasing
unused connections since that would "
+ + "mean that an used connection was released.")
+ .isEqualTo(connectionsCreatedBefore);
+ }
+}