This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 748845f041b [fix][txn] Fix deadlock when loading transaction buffer
snapshot (#24401)
748845f041b is described below
commit 748845f041b6edac6a5814fa10e86cb6f575e76c
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jun 11 19:45:44 2025 +0800
[fix][txn] Fix deadlock when loading transaction buffer snapshot (#24401)
(cherry picked from commit 1b7e4a7bbd6a8e6a309a87a9c3ad19bbe837e7c0)
---
.../org/apache/pulsar/broker/PulsarService.java | 7 ++++++
.../SingleSnapshotAbortedTxnProcessorImpl.java | 2 +-
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 2 +-
.../broker/transaction/buffer/impl/TableView.java | 10 +++++++++
.../java/org/apache/pulsar/utils/SimpleCache.java | 16 +++++++++++++-
.../pulsar/broker/transaction/TransactionTest.java | 5 ++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 25 ++++++++++++++++------
7 files changed, 57 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 54ba5d19470..7b45bf3cc90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -300,6 +300,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private TransactionPendingAckStoreProvider
transactionPendingAckStoreProvider;
private final ExecutorProvider transactionExecutorProvider;
+ private final ExecutorProvider transactionSnapshotRecoverExecutorProvider;
private final MonotonicClock monotonicClock;
private String brokerId;
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new
CompletableFuture<>();
@@ -370,8 +371,11 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (config.isTransactionCoordinatorEnabled()) {
this.transactionExecutorProvider = new
ExecutorProvider(this.getConfiguration()
.getNumTransactionReplayThreadPoolSize(),
"pulsar-transaction-executor");
+ this.transactionSnapshotRecoverExecutorProvider = new
ExecutorProvider(this.getConfiguration()
+ .getNumTransactionReplayThreadPoolSize(),
"pulsar-transaction-snapshot-recover");
} else {
this.transactionExecutorProvider = null;
+ this.transactionSnapshotRecoverExecutorProvider = null;
}
this.ioEventLoopGroup =
EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(),
config.isEnableBusyWait(),
@@ -659,6 +663,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
if (transactionExecutorProvider != null) {
transactionExecutorProvider.shutdownNow();
}
+ if (transactionSnapshotRecoverExecutorProvider != null) {
+ transactionSnapshotRecoverExecutorProvider.shutdownNow();
+ }
if (transactionTimer != null) {
transactionTimer.stop();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index a0ffa121b89..86ae3ea4824 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -87,7 +87,7 @@ public class SingleSnapshotAbortedTxnProcessorImpl implements
AbortedTxnProcesso
public CompletableFuture<Position> recoverFromSnapshot() {
final var future = new CompletableFuture<Position>();
final var pulsar = topic.getBrokerService().getPulsar();
- pulsar.getTransactionExecutorProvider().getExecutor(this).execute(()
-> {
+
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
-> {
try {
final var snapshot =
pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
.getTableView().readLatest(topic.getName());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 779d083289b..1325ef482f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -228,7 +228,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
public CompletableFuture<Position> recoverFromSnapshot() {
final var pulsar = topic.getBrokerService().getPulsar();
final var future = new CompletableFuture<Position>();
- pulsar.getTransactionExecutorProvider().getExecutor(this).execute(()
-> {
+
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(()
-> {
try {
final var indexes =
pulsar.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 40adec74884..3b0413210f1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -58,6 +58,16 @@ public class TableView<T> {
}
public T readLatest(String topic) throws Exception {
+ try {
+ return internalReadLatest(topic);
+ } catch (Exception e) {
+ final var namespace = TopicName.get(topic).getNamespaceObject();
+ readers.remove(namespace);
+ throw e;
+ }
+ }
+
+ private T internalReadLatest(String topic) throws Exception {
final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
index 6a3a6721198..6bd706068d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -41,7 +41,7 @@ public class SimpleCache<K, V> {
boolean tryExpire() {
if (System.currentTimeMillis() >= deadlineMs) {
- expireCallback.accept(value);
+ cancel();
return true;
} else {
return false;
@@ -51,6 +51,10 @@ public class SimpleCache<K, V> {
void updateDeadline() {
deadlineMs = System.currentTimeMillis() + timeoutMs;
}
+
+ void cancel() {
+ expireCallback.accept(value);
+ }
}
public SimpleCache(final ScheduledExecutorService scheduler, final long
timeoutMs, final long frequencyMs) {
@@ -80,4 +84,14 @@ public class SimpleCache<K, V> {
cache.put(key, newValue);
return newValue.value;
}
+
+ public void remove(final K key) {
+ final ExpirableValue<V> value;
+ synchronized (this) {
+ value = cache.remove(key);
+ }
+ if (value != null) {
+ value.cancel();
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 35c9048ebb5..0e03cc6dc8d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -177,7 +177,10 @@ public class TransactionTest extends TransactionTestBase {
@BeforeClass
protected void setup() throws Exception {
- setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
+ // Use a single transaction thread to reproduce possible deadlock
easily
+ conf.setNumTransactionReplayThreadPoolSize(1);
+ conf.setManagedLedgerNumSchedulerThreads(1);
+ setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}
@AfterClass(alwaysRun = true)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b4a9ec610e8..dfc217cf574 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -241,6 +241,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final AtomicInteger previousExceptionCount = new AtomicInteger();
private volatile boolean hasSoughtByTimestamp = false;
+ // This field will be set after the state becomes Failed, then the
following operations will fail immediately
+ private volatile Throwable failReason = null;
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
@@ -980,11 +982,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
} else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
+ final Throwable throwable = PulsarClientException.wrap(e,
String.format("Failed to subscribe the "
+ + "topic %s with subscription name %s when
connecting to the broker", topicName.toString(),
+ subscription));
+ fail(throwable);
+
closeConsumerTasks();
- subscribeFuture.completeExceptionally(
- PulsarClientException.wrap(e,
String.format("Failed to subscribe the topic %s "
- + "with subscription name %s when
connecting to the broker",
- topicName.toString(), subscription)));
+ subscribeFuture.completeExceptionally(throwable);
client.cleanupConsumer(this);
} else if (isUnrecoverableError(e.getCause())) {
closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
@@ -1025,7 +1029,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
topic, subscription, cnxStr, t.getClass().getName(),
t.getMessage());
closeAsync().whenComplete((__, ex) -> {
if (ex == null) {
- setState(State.Failed);
+ fail(t);
return;
}
log.error("[{}][{}] {} Failed to close consumer after got an error
that does not support to retry: {} {}",
@@ -1119,7 +1123,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (nonRetriableError || timeout) {
exception.setPreviousExceptionCount(previousExceptionCount);
if (subscribeFuture.completeExceptionally(exception)) {
- setState(State.Failed);
+ fail(exception);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {}
with unretriableError {}",
topic, consumerId, exception.getMessage());
@@ -2870,6 +2874,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
});
} else {
+ if (failReason != null) {
+ future.completeExceptionally(failReason);
+ return;
+ }
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
@@ -3301,4 +3309,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
public Producer<byte[]> getDeadLetterProducer() throws ExecutionException,
InterruptedException {
return (deadLetterProducer == null || !deadLetterProducer.isDone()) ?
null : deadLetterProducer.get();
}
+
+ private void fail(Throwable throwable) {
+ setState(State.Failed);
+ failReason = throwable;
+ }
}