This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit eeb51c426ad5723beca567b9f7b3fe0fda604880 Author: Yunze Xu <[email protected]> AuthorDate: Mon Nov 3 23:46:02 2025 +0800 [fix][broker] Fix stack overflow caused by race condition when closing a connection (#24934) (cherry picked from commit 1dfe07eb9b387c2b815b39096f370c639f6dcde5) --- .../AbstractDispatcherSingleActiveConsumer.java | 29 +++++++- ...rsistentDispatcherSingleActiveConsumerTest.java | 81 ++++++++++++++++++++++ 2 files changed, 108 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index baca6bf078c..792b75f2896 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory; public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBaseDispatcher { + private static final int MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE = 5; protected final String topicName; private volatile Consumer activeConsumer = null; protected final CopyOnWriteArrayList<Consumer> consumers; @@ -161,7 +163,11 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas return Collections.unmodifiableNavigableMap(hashRing); } - public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) { + public CompletableFuture<Void> addConsumer(Consumer consumer) { + return internalAddConsumer(consumer, 0); + } + + private synchronized CompletableFuture<Void> internalAddConsumer(Consumer consumer, int retryCount) { if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer {}", this.topicName, consumer); consumer.disconnect(); @@ -171,12 +177,31 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { Consumer actConsumer = getActiveConsumer(); if (actConsumer != null) { + final var callerThread = Thread.currentThread(); return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> { if (actConsumerStillAlive.isEmpty() || actConsumerStillAlive.get()) { return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + " connected")); + } else if (retryCount >= MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE) { + log.warn("[{}] The active consumer's connection is still inactive after all retries {}, skip " + + "adding new consumer {}", getName(), actConsumer, consumer); + return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + + " connected after " + MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE + " attempts")); } else { - return addConsumer(consumer); + if (Thread.currentThread().equals(callerThread)) { + // A race condition happened in `ServerCnx#channelInactive` + // 1. `isActive` was set to false + // 2. `consumer.close()` is called + // We should wait until the consumer is closed, retry for some times + log.warn("[{}] race condition happened that cnx of the active consumer ({}) is inactive " + + "but it's not removed, retrying", getName(), actConsumer); + final var future = new CompletableFuture<Void>(); + CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS) + .execute(() -> future.complete(null)); + return future.thenCompose(__ -> internalAddConsumer(consumer, retryCount + 1)); + } else { + return internalAddConsumer(consumer, retryCount + 1); + } } }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java index dc6d451ed0f..466f55436ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java @@ -18,7 +18,13 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -26,22 +32,29 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.intercept.MockBrokerInterceptor; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @Test(groups = "broker-api") public class PersistentDispatcherSingleActiveConsumerTest extends ProducerConsumerBase { + @BeforeClass(alwaysRun = true) @Override protected void setup() throws Exception { @@ -129,4 +142,72 @@ public class PersistentDispatcherSingleActiveConsumerTest extends ProducerConsum // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @DataProvider + public static Object[][] closeDelayMs() { + return new Object[][] { { 500 }, { 2000 } }; + } + + @Test(dataProvider = "closeDelayMs") + public void testOverrideInactiveConsumer(long closeDelayMs) throws Exception { + final var interceptor = new Interceptor(); + pulsar.getBrokerService().setInterceptor(interceptor); + final var topic = "test-override-inactive-consumer-" + closeDelayMs; + @Cleanup final var client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + @Cleanup final var consumer = client.newConsumer().topic(topic).subscriptionName("sub").subscribe(); + final var dispatcher = ((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(TopicName.get(topic) + .toString()).get().orElseThrow()).getSubscription("sub").dispatcher; + Assert.assertEquals(dispatcher.getConsumers().size(), 1); + + // Generally `isActive` could only be false after `channelInactive` is called, setting it with false directly + // to avoid race condition. + final var latch = new CountDownLatch(1); + interceptor.latch.set(latch); + interceptor.injectCloseLatency.set(true); + interceptor.delayMs = closeDelayMs; + // Simulate the real case because `channelInactive` is always called in the event loop thread + final var cnx = (ServerCnx) dispatcher.getConsumers().get(0).cnx(); + cnx.ctx().executor().execute(() -> { + try { + cnx.channelInactive(cnx.ctx()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + @Cleanup final var mockConsumer = Mockito.mock(Consumer.class); + Assert.assertTrue(latch.await(1, TimeUnit.SECONDS)); + if (closeDelayMs < 1000) { + dispatcher.addConsumer(mockConsumer).get(); + Assert.assertEquals(dispatcher.getConsumers().size(), 1); + Assert.assertSame(mockConsumer, dispatcher.getConsumers().get(0)); + } else { + try { + dispatcher.addConsumer(mockConsumer).get(); + Assert.fail(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException); + } + } + } + + private static class Interceptor extends MockBrokerInterceptor { + + final AtomicBoolean injectCloseLatency = new AtomicBoolean(false); + final AtomicReference<CountDownLatch> latch = new AtomicReference<>(); + long delayMs = 500; + + @Override + public void onConnectionClosed(ServerCnx cnx) { + if (injectCloseLatency.compareAndSet(true, false)) { + Optional.ofNullable(latch.get()).ifPresent(CountDownLatch::countDown); + latch.set(null); + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } }
