This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new f838db190e4 [improve][broker][branch-3.3] Optimize
PersistentTopic.getLastDispatchablePosition (#22707) (#23826)
f838db190e4 is described below
commit f838db190e4c28254ffc138d1ecf52a908d26df3
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jan 9 02:15:56 2025 +0800
[improve][broker][branch-3.3] Optimize
PersistentTopic.getLastDispatchablePosition (#22707) (#23826)
Co-authored-by: 道君 <[email protected]>
---
.../broker/service/persistent/PersistentTopic.java | 66 ++++++++++++++++++----
.../buffer/impl/InMemTransactionBuffer.java | 14 ++++-
.../buffer/impl/TopicTransactionBuffer.java | 11 ++++
.../buffer/impl/TransactionBufferDisable.java | 14 ++++-
4 files changed, 89 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f75cac8a0f3..38e2a063065 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -316,6 +316,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
Long estimatedOldestUnacknowledgedMessageTimestamp;
}
+ // The last position that can be dispatched to consumers
+ private volatile Position lastDispatchablePosition;
+
/***
* We use 3 futures to prevent a new closing if there is an in-progress
deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
@@ -3889,18 +3892,57 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
- return
ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger,
entry -> {
- MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
- // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
- if (Markers.isServerOnlyMarker(md)) {
- return false;
- } else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
- // Filter-out transaction aborted messages.
- TxnID txnID = new TxnID(md.getTxnidMostBits(),
md.getTxnidLeastBits());
- return !isTxnAborted(txnID, (PositionImpl)
entry.getPosition());
- }
- return true;
- }, getMaxReadPosition());
+ if (lastDispatchablePosition != null) {
+ return CompletableFuture.completedFuture(lastDispatchablePosition);
+ }
+ return ManagedLedgerImplUtils
+ .asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry
-> {
+ MessageMetadata md =
Commands.parseMessageMetadata(entry.getDataBuffer());
+ // If a messages has marker will filter by
AbstractBaseDispatcher.filterEntriesForConsumer
+ if (Markers.isServerOnlyMarker(md)) {
+ return false;
+ } else if (md.hasTxnidMostBits() &&
md.hasTxnidLeastBits()) {
+ // Filter-out transaction aborted messages.
+ TxnID txnID = new TxnID(md.getTxnidMostBits(),
md.getTxnidLeastBits());
+ return !isTxnAborted(txnID, (PositionImpl)
entry.getPosition());
+ }
+ return true;
+ }, getMaxReadPosition())
+ .thenApply(position -> {
+ // Update lastDispatchablePosition to the given position
+ updateLastDispatchablePosition(position);
+ return position;
+ });
+ }
+
+ /**
+ * Update lastDispatchablePosition if the given position is greater than
the lastDispatchablePosition.
+ *
+ * @param position
+ */
+ public synchronized void updateLastDispatchablePosition(Position position)
{
+ // Update lastDispatchablePosition to null if the position is null,
fallback to
+ // ManagedLedgerImplUtils#asyncGetLastValidPosition
+ if (position == null) {
+ lastDispatchablePosition = null;
+ return;
+ }
+
+ PositionImpl position0 = (PositionImpl) position;
+ // If the position is greater than the maxReadPosition, ignore
+ if (position0.compareTo(getMaxReadPosition()) > 0) {
+ return;
+ }
+ // If the lastDispatchablePosition is null, set it to the position
+ if (lastDispatchablePosition == null) {
+ lastDispatchablePosition = position;
+ return;
+ }
+ // If the position is greater than the lastDispatchablePosition,
update it
+ PositionImpl lastDispatchablePosition0 = (PositionImpl)
lastDispatchablePosition;
+ if (position0.compareTo(lastDispatchablePosition0) > 0) {
+ lastDispatchablePosition = position;
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index bab7b64c608..533d0716d41 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -377,8 +377,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
- if (!isMarkerMessage && maxReadPositionCallBack != null) {
- maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
+ if (maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ }
}
}
@@ -436,4 +439,11 @@ class InMemTransactionBuffer implements TransactionBuffer {
.filter(txnBuffer ->
txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}
+
+ // ThreadSafe
+ private void updateLastDispatchablePosition(Position position) {
+ if (topic instanceof PersistentTopic t) {
+ t.updateLastDispatchablePosition(position);
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 43666aae1ab..7db68a522df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -297,6 +297,11 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
+ // ThreadSafe
+ private void updateLastDispatchablePosition(Position position) {
+ topic.updateLastDispatchablePosition(position);
+ }
+
@Override
public CompletableFuture<TransactionBufferReader>
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return null;
@@ -459,6 +464,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
} else {
updateMaxReadPosition((PositionImpl)
topic.getManagedLedger().getLastConfirmedEntry(), false);
}
+ // Update the last dispatchable position to null if there is a TXN
finished.
+ updateLastDispatchablePosition(null);
}
/**
@@ -523,6 +530,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
}
}
+ // If the message is a normal message, update the last dispatchable
position.
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
+ }
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index ebd61dbaa82..6f5dc0cd4d0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -99,8 +99,11 @@ public class TransactionBufferDisable implements
TransactionBuffer {
@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position,
boolean isMarkerMessage) {
- if (!isMarkerMessage && maxReadPositionCallBack != null) {
- maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ if (!isMarkerMessage) {
+ updateLastDispatchablePosition(position);
+ if (maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(null,
position);
+ }
}
}
@@ -148,4 +151,11 @@ public class TransactionBufferDisable implements
TransactionBuffer {
public long getCommittedTxnCount() {
return 0;
}
+
+ // ThreadSafe
+ private void updateLastDispatchablePosition(Position position) {
+ if (topic instanceof PersistentTopic t) {
+ t.updateLastDispatchablePosition(position);
+ }
+ }
}