This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e98e8443c52 [fix][broker] Fix stack overflow caused by race condition 
when closing a connection (#24934)
e98e8443c52 is described below

commit e98e8443c526a025914e624686775867f7b28088
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 3 23:46:02 2025 +0800

    [fix][broker] Fix stack overflow caused by race condition when closing a 
connection (#24934)
    
    (cherry picked from commit 1dfe07eb9b387c2b815b39096f370c639f6dcde5)
---
 .../broker/service/AbstractBaseDispatcher.java     |  3 +-
 .../AbstractDispatcherSingleActiveConsumer.java    | 29 +++++++-
 .../NonPersistentDispatcherMultipleConsumers.java  |  2 +
 ...onPersistentDispatcherSingleActiveConsumer.java |  4 ++
 .../PersistentDispatcherMultipleConsumers.java     |  2 +
 .../PersistentDispatcherSingleActiveConsumer.java  |  2 +
 .../broker/service/AbstractBaseDispatcherTest.java |  5 ++
 ...rsistentDispatcherSingleActiveConsumerTest.java | 81 ++++++++++++++++++++++
 8 files changed, 125 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index eb747a6535b..0128234bec1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -74,7 +74,6 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
         this.dispatchThrottlingOnBatchMessageEnabled = 
serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
     }
 
-
     /**
      * Filter messages that are being sent to a consumers.
      * <p>
@@ -366,6 +365,8 @@ public abstract class AbstractBaseDispatcher extends 
EntryFilterSupport implemen
 
     protected abstract void reScheduleRead();
 
+    public abstract String getName();
+
     protected boolean reachDispatchRateLimit(DispatchRateLimiter 
dispatchRateLimiter) {
         if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
             if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 1d71053c8e0..5f742c29f83 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 public abstract class AbstractDispatcherSingleActiveConsumer extends 
AbstractBaseDispatcher {
 
+    private static final int MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE = 5;
     protected final String topicName;
     protected static final 
AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
             ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
@@ -158,7 +160,11 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         return Collections.unmodifiableNavigableMap(hashRing);
     }
 
-    public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) 
{
+    public CompletableFuture<Void> addConsumer(Consumer consumer) {
+        return internalAddConsumer(consumer, 0);
+    }
+
+    private synchronized CompletableFuture<Void> internalAddConsumer(Consumer 
consumer, int retryCount) {
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer {}", 
this.topicName, consumer);
             consumer.disconnect();
@@ -168,12 +174,31 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer extends AbstractBas
         if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
             Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
             if (actConsumer != null) {
+                final var callerThread = Thread.currentThread();
                 return 
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive 
-> {
                     if (actConsumerStillAlive == null || 
actConsumerStillAlive) {
                         return FutureUtil.failedFuture(new 
ConsumerBusyException("Exclusive consumer is already"
                                 + " connected"));
+                    } else if (retryCount >= 
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE) {
+                        log.warn("[{}] The active consumer's connection is 
still inactive after all retries {}, skip "
+                                        + "adding new consumer {}", getName(), 
actConsumer, consumer);
+                        return FutureUtil.failedFuture(new 
ConsumerBusyException("Exclusive consumer is already"
+                                + " connected after " + 
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE + " attempts"));
                     } else {
-                        return addConsumer(consumer);
+                        if (Thread.currentThread().equals(callerThread)) {
+                            // A race condition happened in 
`ServerCnx#channelInactive`
+                            // 1. `isActive` was set to false
+                            // 2. `consumer.close()` is called
+                            // We should wait until the consumer is closed, 
retry for some times
+                            log.warn("[{}] race condition happened that cnx of 
the active consumer ({}) is inactive "
+                                    + "but it's not removed, retrying", 
getName(), actConsumer);
+                            final var future = new CompletableFuture<Void>();
+                            CompletableFuture.delayedExecutor(100, 
TimeUnit.MILLISECONDS)
+                                    .execute(() -> future.complete(null));
+                            return future.thenCompose(__ -> 
internalAddConsumer(consumer, retryCount + 1));
+                        } else {
+                            return internalAddConsumer(consumer, retryCount + 
1);
+                        }
                     }
                 });
             } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index ab0ca14e068..54a1d2b3be5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -48,6 +49,7 @@ public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcher
     protected final Subscription subscription;
 
     private CompletableFuture<Void> closeFuture = null;
+    @Getter
     protected final String name;
     protected final Rate msgDrop;
     protected static final 
AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 862f1620c13..e93389d8c72 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.nonpersistent;
 
 import java.util.List;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
@@ -38,6 +39,8 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
 
     private final NonPersistentTopic topic;
     private final Rate msgDrop;
+    @Getter
+    protected final String name;
     private final Subscription subscription;
     private final RedeliveryTracker redeliveryTracker;
 
@@ -47,6 +50,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
                 topic.getBrokerService().pulsar().getConfiguration(), null);
         this.topic = topic;
         this.subscription = subscription;
+        this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 245ccaf2ee3..b5a1e163771 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Predicate;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -99,6 +100,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     protected volatile boolean havePendingReplayRead = false;
     protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
+    @Getter
     protected final String name;
     private boolean sendInProgress = false;
     protected static final 
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index af9f8ae8d6f..945a6272f10 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -63,6 +64,7 @@ public class PersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcher
     protected final PersistentTopic topic;
     protected final Executor executor;
 
+    @Getter
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index fa551dc5f49..fcd2b6a46fc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -257,6 +257,11 @@ public class AbstractBaseDispatcherTest {
 
         }
 
+        @Override
+        public String getName() {
+            return "AbstractBaseDispatcherTestHelper for subscription" + 
subscription.getName();
+        }
+
         @Override
         public CompletableFuture<Void> addConsumer(Consumer consumer) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
index 238192cd9d8..6dd506fa1ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
@@ -18,7 +18,13 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -26,22 +32,29 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
 import org.mockito.Mockito;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
 @Test(groups = "broker-api")
 public class PersistentDispatcherSingleActiveConsumerTest extends 
ProducerConsumerBase {
+
     @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
@@ -127,4 +140,72 @@ public class PersistentDispatcherSingleActiveConsumerTest 
extends ProducerConsum
         // Verify: the topic can be deleted successfully.
         admin.topics().delete(topicName, false);
     }
+
+    @DataProvider
+    public static Object[][] closeDelayMs() {
+        return new Object[][] { { 500 }, { 2000 } };
+    }
+
+    @Test(dataProvider = "closeDelayMs")
+    public void testOverrideInactiveConsumer(long closeDelayMs) throws 
Exception {
+        final var interceptor = new Interceptor();
+        pulsar.getBrokerService().setInterceptor(interceptor);
+        final var topic = "test-override-inactive-consumer-" + closeDelayMs;
+        @Cleanup final var client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup final var consumer = 
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
+        final var dispatcher = ((PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(TopicName.get(topic)
+                
.toString()).get().orElseThrow()).getSubscription("sub").dispatcher;
+        Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+
+        // Generally `isActive` could only be false after `channelInactive` is 
called, setting it with false directly
+        // to avoid race condition.
+        final var latch = new CountDownLatch(1);
+        interceptor.latch.set(latch);
+        interceptor.injectCloseLatency.set(true);
+        interceptor.delayMs = closeDelayMs;
+        // Simulate the real case because `channelInactive` is always called 
in the event loop thread
+        final var cnx = (ServerCnx) dispatcher.getConsumers().get(0).cnx();
+        cnx.ctx().executor().execute(() -> {
+            try {
+                cnx.channelInactive(cnx.ctx());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        @Cleanup final var mockConsumer = Mockito.mock(Consumer.class);
+        Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
+        if (closeDelayMs < 1000) {
+            dispatcher.addConsumer(mockConsumer).get();
+            Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+            Assert.assertSame(mockConsumer, dispatcher.getConsumers().get(0));
+        } else {
+            try {
+                dispatcher.addConsumer(mockConsumer).get();
+                Assert.fail();
+            } catch (ExecutionException e) {
+                Assert.assertTrue(e.getCause() instanceof 
BrokerServiceException.ConsumerBusyException);
+            }
+        }
+    }
+
+    private static class Interceptor extends MockBrokerInterceptor {
+
+        final AtomicBoolean injectCloseLatency = new AtomicBoolean(false);
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
+        long delayMs = 500;
+
+        @Override
+        public void onConnectionClosed(ServerCnx cnx) {
+            if (injectCloseLatency.compareAndSet(true, false)) {
+                
Optional.ofNullable(latch.get()).ifPresent(CountDownLatch::countDown);
+                latch.set(null);
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
 }

Reply via email to