This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e98e8443c52 [fix][broker] Fix stack overflow caused by race condition
when closing a connection (#24934)
e98e8443c52 is described below
commit e98e8443c526a025914e624686775867f7b28088
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Nov 3 23:46:02 2025 +0800
[fix][broker] Fix stack overflow caused by race condition when closing a
connection (#24934)
(cherry picked from commit 1dfe07eb9b387c2b815b39096f370c639f6dcde5)
---
.../broker/service/AbstractBaseDispatcher.java | 3 +-
.../AbstractDispatcherSingleActiveConsumer.java | 29 +++++++-
.../NonPersistentDispatcherMultipleConsumers.java | 2 +
...onPersistentDispatcherSingleActiveConsumer.java | 4 ++
.../PersistentDispatcherMultipleConsumers.java | 2 +
.../PersistentDispatcherSingleActiveConsumer.java | 2 +
.../broker/service/AbstractBaseDispatcherTest.java | 5 ++
...rsistentDispatcherSingleActiveConsumerTest.java | 81 ++++++++++++++++++++++
8 files changed, 125 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index eb747a6535b..0128234bec1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -74,7 +74,6 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
this.dispatchThrottlingOnBatchMessageEnabled =
serviceConfig.isDispatchThrottlingOnBatchMessageEnabled();
}
-
/**
* Filter messages that are being sent to a consumers.
* <p>
@@ -366,6 +365,8 @@ public abstract class AbstractBaseDispatcher extends
EntryFilterSupport implemen
protected abstract void reScheduleRead();
+ public abstract String getName();
+
protected boolean reachDispatchRateLimit(DispatchRateLimiter
dispatchRateLimiter) {
if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 1d71053c8e0..5f742c29f83 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractDispatcherSingleActiveConsumer extends
AbstractBaseDispatcher {
+ private static final int MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE = 5;
protected final String topicName;
protected static final
AtomicReferenceFieldUpdater<AbstractDispatcherSingleActiveConsumer, Consumer>
ACTIVE_CONSUMER_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
@@ -158,7 +160,11 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
return Collections.unmodifiableNavigableMap(hashRing);
}
- public synchronized CompletableFuture<Void> addConsumer(Consumer consumer)
{
+ public CompletableFuture<Void> addConsumer(Consumer consumer) {
+ return internalAddConsumer(consumer, 0);
+ }
+
+ private synchronized CompletableFuture<Void> internalAddConsumer(Consumer
consumer, int retryCount) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}",
this.topicName, consumer);
consumer.disconnect();
@@ -168,12 +174,31 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (actConsumer != null) {
+ final var callerThread = Thread.currentThread();
return
actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive
-> {
if (actConsumerStillAlive == null ||
actConsumerStillAlive) {
return FutureUtil.failedFuture(new
ConsumerBusyException("Exclusive consumer is already"
+ " connected"));
+ } else if (retryCount >=
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE) {
+ log.warn("[{}] The active consumer's connection is
still inactive after all retries {}, skip "
+ + "adding new consumer {}", getName(),
actConsumer, consumer);
+ return FutureUtil.failedFuture(new
ConsumerBusyException("Exclusive consumer is already"
+ + " connected after " +
MAX_RETRY_COUNT_FOR_ADD_CONSUMER_RACE + " attempts"));
} else {
- return addConsumer(consumer);
+ if (Thread.currentThread().equals(callerThread)) {
+ // A race condition happened in
`ServerCnx#channelInactive`
+ // 1. `isActive` was set to false
+ // 2. `consumer.close()` is called
+ // We should wait until the consumer is closed,
retry for some times
+ log.warn("[{}] race condition happened that cnx of
the active consumer ({}) is inactive "
+ + "but it's not removed, retrying",
getName(), actConsumer);
+ final var future = new CompletableFuture<Void>();
+ CompletableFuture.delayedExecutor(100,
TimeUnit.MILLISECONDS)
+ .execute(() -> future.complete(null));
+ return future.thenCompose(__ ->
internalAddConsumer(consumer, retryCount + 1));
+ } else {
+ return internalAddConsumer(consumer, retryCount +
1);
+ }
}
});
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index ab0ca14e068..54a1d2b3be5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -48,6 +49,7 @@ public class NonPersistentDispatcherMultipleConsumers extends
AbstractDispatcher
protected final Subscription subscription;
private CompletableFuture<Void> closeFuture = null;
+ @Getter
protected final String name;
protected final Rate msgDrop;
protected static final
AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 862f1620c13..e93389d8c72 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.nonpersistent;
import java.util.List;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
@@ -38,6 +39,8 @@ public final class
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final NonPersistentTopic topic;
private final Rate msgDrop;
+ @Getter
+ protected final String name;
private final Subscription subscription;
private final RedeliveryTracker redeliveryTracker;
@@ -47,6 +50,7 @@ public final class
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
topic.getBrokerService().pulsar().getConfiguration(), null);
this.topic = topic;
this.subscription = subscription;
+ this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
this.redeliveryTracker =
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 245ccaf2ee3..b5a1e163771 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -99,6 +100,7 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
protected volatile boolean havePendingReplayRead = false;
protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
+ @Getter
protected final String name;
private boolean sendInProgress = false;
protected static final
AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index af9f8ae8d6f..945a6272f10 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -63,6 +64,7 @@ public class PersistentDispatcherSingleActiveConsumer extends
AbstractDispatcher
protected final PersistentTopic topic;
protected final Executor executor;
+ @Getter
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index fa551dc5f49..fcd2b6a46fc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -257,6 +257,11 @@ public class AbstractBaseDispatcherTest {
}
+ @Override
+ public String getName() {
+ return "AbstractBaseDispatcherTestHelper for subscription" +
subscription.getName();
+ }
+
@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
index 238192cd9d8..6dd506fa1ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
@@ -18,7 +18,13 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -26,22 +32,29 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
+import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.naming.TopicName;
import org.mockito.Mockito;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-api")
public class PersistentDispatcherSingleActiveConsumerTest extends
ProducerConsumerBase {
+
@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
@@ -127,4 +140,72 @@ public class PersistentDispatcherSingleActiveConsumerTest
extends ProducerConsum
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}
+
+ @DataProvider
+ public static Object[][] closeDelayMs() {
+ return new Object[][] { { 500 }, { 2000 } };
+ }
+
+ @Test(dataProvider = "closeDelayMs")
+ public void testOverrideInactiveConsumer(long closeDelayMs) throws
Exception {
+ final var interceptor = new Interceptor();
+ pulsar.getBrokerService().setInterceptor(interceptor);
+ final var topic = "test-override-inactive-consumer-" + closeDelayMs;
+ @Cleanup final var client =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ @Cleanup final var consumer =
client.newConsumer().topic(topic).subscriptionName("sub").subscribe();
+ final var dispatcher = ((PersistentTopic)
pulsar.getBrokerService().getTopicIfExists(TopicName.get(topic)
+
.toString()).get().orElseThrow()).getSubscription("sub").dispatcher;
+ Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+
+ // Generally `isActive` could only be false after `channelInactive` is
called, setting it with false directly
+ // to avoid race condition.
+ final var latch = new CountDownLatch(1);
+ interceptor.latch.set(latch);
+ interceptor.injectCloseLatency.set(true);
+ interceptor.delayMs = closeDelayMs;
+ // Simulate the real case because `channelInactive` is always called
in the event loop thread
+ final var cnx = (ServerCnx) dispatcher.getConsumers().get(0).cnx();
+ cnx.ctx().executor().execute(() -> {
+ try {
+ cnx.channelInactive(cnx.ctx());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ @Cleanup final var mockConsumer = Mockito.mock(Consumer.class);
+ Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
+ if (closeDelayMs < 1000) {
+ dispatcher.addConsumer(mockConsumer).get();
+ Assert.assertEquals(dispatcher.getConsumers().size(), 1);
+ Assert.assertSame(mockConsumer, dispatcher.getConsumers().get(0));
+ } else {
+ try {
+ dispatcher.addConsumer(mockConsumer).get();
+ Assert.fail();
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof
BrokerServiceException.ConsumerBusyException);
+ }
+ }
+ }
+
+ private static class Interceptor extends MockBrokerInterceptor {
+
+ final AtomicBoolean injectCloseLatency = new AtomicBoolean(false);
+ final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
+ long delayMs = 500;
+
+ @Override
+ public void onConnectionClosed(ServerCnx cnx) {
+ if (injectCloseLatency.compareAndSet(true, false)) {
+
Optional.ofNullable(latch.get()).ifPresent(CountDownLatch::countDown);
+ latch.set(null);
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
}