This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9a1c29652ad5311eee1faa04a0895f1ab287e63f Author: ken <[email protected]> AuthorDate: Mon Nov 3 17:48:20 2025 +0800 [fix][broker] fix getMaxReadPosition in TransactionBufferDisable should return latest (#24898) Co-authored-by: fanjianye <[email protected]> (cherry picked from commit b297f1f5f0332a892da742ca6e2ff87d2600296f) --- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 8 +++++++- .../broker/transaction/buffer/impl/TransactionBufferDisable.java | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1f424df17a1..2f0255f4f2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4140,6 +4140,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (lastDispatchablePosition != null) { return CompletableFuture.completedFuture(lastDispatchablePosition); } + Position lastPosition; + if (transactionBuffer instanceof TransactionBufferDisable) { + lastPosition = getLastPosition(); + } else { + lastPosition = getMaxReadPosition(); + } return ledger.getLastDispatchablePosition(entry -> { MessageMetadata md = entry.getMessageMetadata(); if (md == null) { @@ -4154,7 +4160,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return !isTxnAborted(txnID, entry.getPosition()); } return true; - }, getMaxReadPosition()).thenApply(position -> { + }, lastPosition).thenApply(position -> { // Update lastDispatchablePosition to the given position updateLastDispatchablePosition(position); return position; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index d4fd071fef8..f6e2ad04e50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -108,7 +109,7 @@ public class TransactionBufferDisable implements TransactionBuffer { @Override public Position getMaxReadPosition() { - return topic.getLastPosition(); + return PositionFactory.LATEST; } @Override
