This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 748845f041b [fix][txn] Fix deadlock when loading transaction buffer 
snapshot (#24401)
748845f041b is described below

commit 748845f041b6edac6a5814fa10e86cb6f575e76c
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jun 11 19:45:44 2025 +0800

    [fix][txn] Fix deadlock when loading transaction buffer snapshot (#24401)
    
    (cherry picked from commit 1b7e4a7bbd6a8e6a309a87a9c3ad19bbe837e7c0)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  7 ++++++
 .../SingleSnapshotAbortedTxnProcessorImpl.java     |  2 +-
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    |  2 +-
 .../broker/transaction/buffer/impl/TableView.java  | 10 +++++++++
 .../java/org/apache/pulsar/utils/SimpleCache.java  | 16 +++++++++++++-
 .../pulsar/broker/transaction/TransactionTest.java |  5 ++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 25 ++++++++++++++++------
 7 files changed, 57 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 54ba5d19470..7b45bf3cc90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -300,6 +300,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
     private TransactionPendingAckStoreProvider 
transactionPendingAckStoreProvider;
     private final ExecutorProvider transactionExecutorProvider;
+    private final ExecutorProvider transactionSnapshotRecoverExecutorProvider;
     private final MonotonicClock monotonicClock;
     private String brokerId;
     private final CompletableFuture<Void> readyForIncomingRequestsFuture = new 
CompletableFuture<>();
@@ -370,8 +371,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         if (config.isTransactionCoordinatorEnabled()) {
             this.transactionExecutorProvider = new 
ExecutorProvider(this.getConfiguration()
                     .getNumTransactionReplayThreadPoolSize(), 
"pulsar-transaction-executor");
+            this.transactionSnapshotRecoverExecutorProvider = new 
ExecutorProvider(this.getConfiguration()
+                    .getNumTransactionReplayThreadPoolSize(), 
"pulsar-transaction-snapshot-recover");
         } else {
             this.transactionExecutorProvider = null;
+            this.transactionSnapshotRecoverExecutorProvider = null;
         }
 
         this.ioEventLoopGroup = 
EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), 
config.isEnableBusyWait(),
@@ -659,6 +663,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             if (transactionExecutorProvider != null) {
                 transactionExecutorProvider.shutdownNow();
             }
+            if (transactionSnapshotRecoverExecutorProvider != null) {
+                transactionSnapshotRecoverExecutorProvider.shutdownNow();
+            }
             if (transactionTimer != null) {
                 transactionTimer.stop();
             }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index a0ffa121b89..86ae3ea4824 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -87,7 +87,7 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements 
AbortedTxnProcesso
     public CompletableFuture<Position> recoverFromSnapshot() {
         final var future = new CompletableFuture<Position>();
         final var pulsar = topic.getBrokerService().getPulsar();
-        pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() 
-> {
+        
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
 -> {
             try {
                 final var snapshot = 
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
                         .getTableView().readLatest(topic.getName());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 779d083289b..1325ef482f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -228,7 +228,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
     public CompletableFuture<Position> recoverFromSnapshot() {
         final var pulsar = topic.getBrokerService().getPulsar();
         final var future = new CompletableFuture<Position>();
-        pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() 
-> {
+        
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
 -> {
             try {
                 final var indexes = 
pulsar.getTransactionBufferSnapshotServiceFactory()
                         
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 40adec74884..3b0413210f1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -58,6 +58,16 @@ public class TableView<T> {
     }
 
     public T readLatest(String topic) throws Exception {
+        try {
+            return internalReadLatest(topic);
+        } catch (Exception e) {
+            final var namespace = TopicName.get(topic).getNamespaceObject();
+            readers.remove(namespace);
+            throw e;
+        }
+    }
+
+    private T internalReadLatest(String topic) throws Exception {
         final var reader = getReader(topic);
         while (wait(reader.hasMoreEventsAsync(), "has more events")) {
             final var msg = wait(reader.readNextAsync(), "read message");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
index 6a3a6721198..6bd706068d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -41,7 +41,7 @@ public class SimpleCache<K, V> {
 
         boolean tryExpire() {
             if (System.currentTimeMillis() >= deadlineMs) {
-                expireCallback.accept(value);
+                cancel();
                 return true;
             } else {
                 return false;
@@ -51,6 +51,10 @@ public class SimpleCache<K, V> {
         void updateDeadline() {
             deadlineMs = System.currentTimeMillis() + timeoutMs;
         }
+
+        void cancel() {
+            expireCallback.accept(value);
+        }
     }
 
     public SimpleCache(final ScheduledExecutorService scheduler, final long 
timeoutMs, final long frequencyMs) {
@@ -80,4 +84,14 @@ public class SimpleCache<K, V> {
         cache.put(key, newValue);
         return newValue.value;
     }
+
+    public void remove(final K key) {
+        final ExpirableValue<V> value;
+        synchronized (this) {
+            value = cache.remove(key);
+        }
+        if (value != null) {
+            value.cancel();
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 35c9048ebb5..0e03cc6dc8d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -177,7 +177,10 @@ public class TransactionTest extends TransactionTestBase {
 
     @BeforeClass
     protected void setup() throws Exception {
-       setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
+        // Use a single transaction thread to reproduce possible deadlock 
easily
+        conf.setNumTransactionReplayThreadPoolSize(1);
+        conf.setManagedLedgerNumSchedulerThreads(1);
+        setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
     }
 
     @AfterClass(alwaysRun = true)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b4a9ec610e8..dfc217cf574 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -241,6 +241,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final AtomicInteger previousExceptionCount = new AtomicInteger();
     private volatile boolean hasSoughtByTimestamp = false;
+    // This field will be set after the state becomes Failed, then the 
following operations will fail immediately
+    private volatile Throwable failReason = null;
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
@@ -980,11 +982,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 } else if (!subscribeFuture.isDone()) {
                     // unable to create new consumer, fail operation
                     setState(State.Failed);
+                    final Throwable throwable = PulsarClientException.wrap(e, 
String.format("Failed to subscribe the "
+                            + "topic %s with subscription name %s when 
connecting to the broker", topicName.toString(),
+                            subscription));
+                    fail(throwable);
+
                     closeConsumerTasks();
-                    subscribeFuture.completeExceptionally(
-                            PulsarClientException.wrap(e, 
String.format("Failed to subscribe the topic %s "
-                                            + "with subscription name %s when 
connecting to the broker",
-                                    topicName.toString(), subscription)));
+                    subscribeFuture.completeExceptionally(throwable);
                     client.cleanupConsumer(this);
                 } else if (isUnrecoverableError(e.getCause())) {
                     closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
@@ -1025,7 +1029,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 topic, subscription, cnxStr, t.getClass().getName(), 
t.getMessage());
         closeAsync().whenComplete((__, ex) -> {
             if (ex == null) {
-                setState(State.Failed);
+                fail(t);
                 return;
             }
             log.error("[{}][{}] {} Failed to close consumer after got an error 
that does not support to retry: {} {}",
@@ -1119,7 +1123,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         if (nonRetriableError || timeout) {
             exception.setPreviousExceptionCount(previousExceptionCount);
             if (subscribeFuture.completeExceptionally(exception)) {
-                setState(State.Failed);
+                fail(exception);
                 if (nonRetriableError) {
                     log.info("[{}] Consumer creation failed for consumer {} 
with unretriableError {}",
                             topic, consumerId, exception.getMessage());
@@ -2870,6 +2874,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             });
         } else {
+            if (failReason != null) {
+                future.completeExceptionally(failReason);
+                return;
+            }
             long nextDelay = Math.min(backoff.next(), remainingTime.get());
             if (nextDelay <= 0) {
                 future.completeExceptionally(
@@ -3301,4 +3309,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     public Producer<byte[]> getDeadLetterProducer() throws ExecutionException, 
InterruptedException {
         return (deadLetterProducer == null || !deadLetterProducer.isDone()) ? 
null : deadLetterProducer.get();
     }
+
+    private void fail(Throwable throwable) {
+        setState(State.Failed);
+        failReason = throwable;
+    }
 }

Reply via email to