This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f16b757d5b9e179682407518b52c31447b8d1bac Author: Lari Hotari <[email protected]> AuthorDate: Thu Jul 17 10:18:41 2025 +0300 [fix][client] Fix issue in auto releasing of idle connection with topics pattern consumer (#24528) (cherry picked from commit 8be127fe3834851008061da0de22e591b9c7b130) --- .../apache/pulsar/broker/MultiBrokerBaseTest.java | 14 ++ .../impl/AutoCloseUselessClientConSupports.java | 2 +- ...eUselessClientConTopicsPatternConsumerTest.java | 106 +++++++++++++++ .../org/apache/pulsar/client/impl/ClientCnx.java | 3 + .../pulsar/client/impl/ClientCnxIdleState.java | 7 +- .../pulsar/client/impl/ClientCnxIdleStateTest.java | 61 +++++++++ .../apache/pulsar/client/impl/ClientCnxTest.java | 11 ++ .../impl/AutoCloseUselessClientConProxyTest.java | 150 +++++++++++++++++++++ 8 files changed, 352 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java index 6e4d7893adb..2a3e705dca1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java @@ -50,6 +50,11 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest { super.internalSetup(); additionalBrokersSetup(); pulsarResourcesSetup(); + additionalSetup(); + } + + protected void additionalSetup() throws Exception { + // override this method to add any additional setup logic } protected void pulsarResourcesSetup() throws PulsarAdminException { @@ -91,9 +96,18 @@ public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest { @Override public final void cleanup() throws Exception { additionalBrokersCleanup(); + try { + additionalCleanup(); + } catch (Exception e) { + log.warn("Exception during additional cleanup", e); + } super.internalCleanup(); } + protected void additionalCleanup() throws Exception { + // override this method to add any additional cleanup logic + } + protected void additionalBrokersCleanup() { if (additionalBrokerAdmins != null) { for (PulsarAdmin additionalBrokerAdmin : additionalBrokerAdmins) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java index e03b1709137..29dc36959f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConSupports.java @@ -44,7 +44,7 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.awaitility.Awaitility; import org.testng.Assert; -public class AutoCloseUselessClientConSupports extends MultiBrokerBaseTest { +public abstract class AutoCloseUselessClientConSupports extends MultiBrokerBaseTest { protected int BROKER_COUNT = 5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java new file mode 100644 index 00000000000..b68cca0d50f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConTopicsPatternConsumerTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.testng.annotations.Test; + +@Test +public class AutoCloseUselessClientConTopicsPatternConsumerTest extends AutoCloseUselessClientConSupports { + private static final String TOPIC_NAME = BrokerTestUtil.newUniqueName("pattern_"); + private static final String TOPIC_FULL_NAME = "persistent://public/default/" + TOPIC_NAME; + private static final String TOPIC_PATTERN = "persistent://public/default/pattern_.*"; + + @Override + protected void pulsarResourcesSetup() throws PulsarAdminException { + super.pulsarResourcesSetup(); + admin.topics().createNonPartitionedTopic(TOPIC_FULL_NAME); + } + + @Test + public void testConnectionAutoReleaseWhileUsingTopicsPatternConsumer() throws Exception { + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .connectionsPerBroker(Integer.MAX_VALUE); + AtomicInteger connectionCreationCounter = new AtomicInteger(0); + PulsarClientImpl pulsarClient = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> { + connectionCreationCounter.incrementAndGet(); + return new ClientCnx(conf, eventLoopGroup); + }); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topicsPattern(TOPIC_PATTERN) + .isAckReceiptEnabled(true) + .subscriptionName("my-subscription-x") + .subscribe(); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(TOPIC_NAME) + .create(); + + Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES) + .topicsPattern(TOPIC_PATTERN) + .subscriptionName("my-subscription-y") + .subscribe(); + + // check that there are more than 3 connections + // at least 3 connections are required: + // 1 for "producer", 1 for "consumer", and 1 for the topic watcher of "consumer" + // additional connections will be created for the second consumer and its topic watcher + // there's also a connection for topic lookup + assertThat(pulsarClient.getCnxPool().getPoolSize()).isGreaterThan(3); + + int connectionsCreatedBefore = connectionCreationCounter.get(); + + // trigger releasing of unused client connections + trigReleaseConnection(pulsarClient); + + // close consumer2 that creates an extra connection due to connectionsPerBroker set to Integer.MAX_VALUE + consumer2.close(); + + // trigger releasing of unused client connections + trigReleaseConnection(pulsarClient); + + // verify that the number of connections is 3 now, which is the expected number of connections + assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3); + + // Ensure all things still works + ensureProducerAndConsumerWorks(producer, consumer); + + // Verify that the number of connections did not increase after the work was completed + assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3); + + assertThat(connectionCreationCounter.get()) + .as("No new connections should be created after releasing unused connections since that would " + + "mean that an used connection was released.") + .isEqualTo(connectionsCreatedBefore); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 00ecd3d6263..f798980c299 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1444,6 +1444,9 @@ public class ClientCnx extends PulsarHandler { if (!transactionMetaStoreHandlers.isEmpty()) { return false; } + if (!topicListWatchers.isEmpty()) { + return false; + } return true; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java index 59d3309f248..a3bff56d298 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnxIdleState.java @@ -31,7 +31,7 @@ public class ClientCnxIdleState { private final long createTime; /** The time when marks the connection is idle. **/ - private long idleMarkTime; + private volatile long idleMarkTime; public ClientCnxIdleState(ClientCnx clientCnx){ this.clientCnx = clientCnx; @@ -171,6 +171,11 @@ public class ClientCnxIdleState { return; } if (isIdle()) { + // check if the connection is still idle, if not, mark it as using + if (!clientCnx.idleCheck() && compareAndSetIdleStat(State.IDLE, State.USING)) { + idleMarkTime = 0; + return; + } if (maxIdleSeconds * 1000 + idleMarkTime < System.currentTimeMillis()) { tryMarkReleasing(); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java new file mode 100644 index 00000000000..e3d88a3823d --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxIdleStateTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.TimeUnit; +import org.testng.annotations.Test; + +public class ClientCnxIdleStateTest { + + @Test + public void testShouldNotReleaseConnectionIfIdleCheckFails() throws InterruptedException { + ClientCnx clientCnx = mock(ClientCnx.class); + ClientCnxIdleState idleState = new ClientCnxIdleState(clientCnx); + int maxIdleSeconds = 1; + + // initially, return true for idle check + doReturn(true).when(clientCnx).idleCheck(); + + // do the first idle detection + idleState.doIdleDetect(maxIdleSeconds); + + // the state should be IDLE since the idle check passed + assertTrue(idleState.isIdle()); + + // Wait for more than maxIdleSeconds + Thread.sleep(TimeUnit.SECONDS.toMillis(maxIdleSeconds) + 1); + + // now return false for idle check + doReturn(false).when(clientCnx).idleCheck(); + + // do the second idle detection + idleState.doIdleDetect(maxIdleSeconds); + + // the state should now be USING since the idle check failed + assertTrue(idleState.isUsing()); + + // verify that idleCheck was called twice + verify(clientCnx, times(2)).idleCheck(); + } +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index d5fbfd22321..c29c9993a39 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -270,6 +270,17 @@ public class ClientCnxTest { eventLoop.shutdownGracefully(); } + @Test + public void testIdleCheckWithTopicListWatcher() { + ClientCnx cnx = + new ClientCnx(new ClientConfigurationData(), mock(EventLoopGroup.class)); + // idle check should return true initially + assertTrue(cnx.idleCheck()); + cnx.registerTopicListWatcher(0, mock(TopicListWatcher.class)); + // idle check should now return false since there's a registered watcher + assertFalse(cnx.idleCheck()); + } + @Test public void testNoWatchersWhenNoServerSupport() { withConnection("testNoWatchersWhenNoServerSupport", cnx -> { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java new file mode 100644 index 00000000000..aac362ac2f2 --- /dev/null +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doReturn; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.authentication.AuthenticationService; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.pulsar.proxy.server.ProxyConfiguration; +import org.apache.pulsar.proxy.server.ProxyService; +import org.testng.annotations.Test; + +@Test +public class AutoCloseUselessClientConProxyTest extends AutoCloseUselessClientConSupports { + private static final String TOPIC_NAME = BrokerTestUtil.newUniqueName("pattern_"); + private static final String TOPIC_FULL_NAME = "persistent://public/default/" + TOPIC_NAME; + private static final String TOPIC_PATTERN = "persistent://public/default/pattern_.*"; + private ProxyService proxyService; + private ProxyConfiguration proxyConfig; + private PulsarClient proxiedClient; + private AtomicInteger connectionCreationCounter = new AtomicInteger(0); + + @Override + protected void additionalSetup() throws Exception { + proxyConfig = new ProxyConfiguration(); + proxyConfig.setServicePort(Optional.ofNullable(0)); + proxyConfig.setBrokerProxyAllowedTargetPorts("*"); + proxyConfig.setMetadataStoreUrl("dummy"); + proxyConfig.setClusterName(configClusterName); + startProxyService(); + // use the same port for subsequent restarts + proxyConfig.setServicePort(proxyService.getListenPort()); + + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder() + .serviceUrl("pulsar://localhost:" + proxyService.getListenPort().get()) + .connectionsPerBroker(Integer.MAX_VALUE) // effectively uses a different connection for each usage + .statsInterval(0, TimeUnit.SECONDS); + proxiedClient = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) -> { + connectionCreationCounter.incrementAndGet(); + return new ClientCnx(conf, eventLoopGroup); + }); + registerCloseable(proxiedClient); + } + + private void startProxyService() throws Exception { + proxyService = BrokerTestUtil.spyWithoutRecordingInvocations(new ProxyService(proxyConfig, + new AuthenticationService(PulsarConfigurationLoader.convertFrom(proxyConfig)), + AuthenticationDisabled.INSTANCE)); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore(); + doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService) + .createConfigurationMetadataStore(); + proxyService.start(); + registerCloseable(proxyService); + } + + @Override + protected void pulsarResourcesSetup() throws PulsarAdminException { + super.pulsarResourcesSetup(); + admin.topics().createNonPartitionedTopic(TOPIC_FULL_NAME); + } + + @Test + public void testConnectionAutoReleaseWhileUsingTopicsPatternConsumerAndProxy() throws Exception { + PulsarClientImpl pulsarClient = (PulsarClientImpl) proxiedClient; + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topicsPattern(TOPIC_PATTERN) + .isAckReceiptEnabled(true) + .subscriptionName("my-subscription-x") + .subscribe(); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(TOPIC_NAME) + .create(); + + Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES) + .topicsPattern(TOPIC_PATTERN) + .subscriptionName("my-subscription-y") + .subscribe(); + + int poolSizeAfterCreatingConsumersAndProducer = pulsarClient.getCnxPool().getPoolSize(); + // check that there are more than 3 connections + // at least 3 connections are required: + // 1 for "producer", 1 for "consumer", and 1 for the topic watcher of "consumer" + // additional connections will be created for the second consumer and its topic watcher + // there's also a connection for topic lookup + assertThat(poolSizeAfterCreatingConsumersAndProducer).isGreaterThan(3); + + int connectionsCreatedBefore = connectionCreationCounter.get(); + + // trigger releasing of unused client connections + trigReleaseConnection(pulsarClient); + + // verify that the number of connections is still more than 3, but less than the pool size after creating + // consumers and producer + // since the lookup connection to the proxy should be closed now + assertThat(pulsarClient.getCnxPool().getPoolSize()) + .isGreaterThan(3) + .isLessThan(poolSizeAfterCreatingConsumersAndProducer); + + // close consumer2 that creates an extra connection due to connectionsPerBroker set to Integer.MAX_VALUE + consumer2.close(); + + // trigger releasing of unused client connections + trigReleaseConnection(pulsarClient); + + // verify that the number of connections is 3 now, which is the expected number of connections + assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3); + + // Ensure all things still works + ensureProducerAndConsumerWorks(producer, consumer); + + // Verify that the number of connections did not increase after the work was completed + assertThat(pulsarClient.getCnxPool().getPoolSize()).isEqualTo(3); + + assertThat(connectionCreationCounter.get()) + .as("No new connections should be created after releasing unused connections since that would " + + "mean that an used connection was released.") + .isEqualTo(connectionsCreatedBefore); + } +}
