This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fd0961d8f09 [improve][broker] PIP-473: MetadataTransactionBuffer for
segment topics (#25768)
fd0961d8f09 is described below
commit fd0961d8f096305cdf03defaa80e6e0c06027bb3
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 13 17:49:50 2026 -0700
[improve][broker] PIP-473: MetadataTransactionBuffer for segment topics
(#25768)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 6 +-
.../impl/DispatchingTransactionBufferProvider.java | 43 ++
.../buffer/impl/MetadataTransactionBuffer.java | 533 +++++++++++++++++++++
.../impl/MetadataTransactionBufferProvider.java | 41 ++
.../pulsar/broker/transaction/metadata/TxnIds.java | 57 +++
.../transaction/metadata/TxnMetadataStore.java | 33 ++
.../broker/transaction/metadata/TxnPaths.java | 20 +
.../DispatchingTransactionBufferProviderTest.java | 78 +++
.../buffer/impl/MetadataTransactionBufferTest.java | 274 +++++++++++
.../broker/transaction/metadata/TxnIdsTest.java | 74 +++
10 files changed, 1158 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b22f9a4bef8..4c3651bf90b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3738,7 +3738,11 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_TRANSACTION,
- doc = "Class name for transaction buffer provider"
+ doc = "Class name for transaction buffer provider. Default routes
segment:// topics to the"
+ + " legacy TopicTransactionBuffer. Set this to"
+ + "
org.apache.pulsar.broker.transaction.buffer.impl.DispatchingTransactionBufferProvider"
+ + " once the v5 transaction coordinator (PIP-473 P5) is
enabled to opt segment topics"
+ + " into MetadataTransactionBuffer."
)
private String transactionBufferProviderClassName =
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider";
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProvider.java
new file mode 100644
index 00000000000..b58c38bd929
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Default {@link TransactionBufferProvider}: returns {@link
MetadataTransactionBuffer} for
+ * {@code segment://} topics (PIP-473) and falls back to {@link
TopicTransactionBuffer} for
+ * {@code persistent://} / {@code topic://}. This is the configured default so
segment topics
+ * pick up the metadata-driven implementation out of the box without operators
flipping a knob.
+ */
+public class DispatchingTransactionBufferProvider implements
TransactionBufferProvider {
+
+ private final TransactionBufferProvider legacy = new
TopicTransactionBufferProvider();
+ private final TransactionBufferProvider metadata = new
MetadataTransactionBufferProvider();
+
+ @Override
+ public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+ return TopicName.get(originTopic.getName()).isSegment()
+ ? metadata.newTransactionBuffer(originTopic)
+ : legacy.newTransactionBuffer(originTopic);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
new file mode 100644
index 00000000000..42f38a78428
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnPaths;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.broker.transaction.metadata.Versioned;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
+import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+
+/**
+ * {@link TransactionBuffer} for {@code segment://} topics that reads truth
from the metadata-store
+ * transaction layout (PIP-473) rather than from a per-topic snapshot log.
+ *
+ * <p>Lifecycle:
+ * <ul>
+ * <li><b>Publish</b> — {@link #appendBufferToTxn} reads the txn header
(cache-first), appends the
+ * entry to the managed ledger, then appends a {@link TxnOp} record under
+ * {@code /txn-op/<txnId>-<seq>}. Both must succeed before we ack the
publisher.</li>
+ * <li><b>State transitions</b> — driven by {@code
/txn-segment-events/<segment>-*} sequence
+ * events. The events are wake-ups; the truth is the header. On each
notification we
+ * re-read headers for every currently-open txn and apply the resulting
state changes.</li>
+ * <li><b>Recovery</b> (Option C) — scan {@code idx:writes-by-segment} for
this segment, group by
+ * {@code txnId}, fetch each header, and seed the in-memory cache. Then
subscribe to the
+ * event stream for forward updates.</li>
+ * </ul>
+ *
+ * <p><b>TC ordering contract.</b> There is a TOCTOU window between the header
authorization read
+ * in {@link #appendBufferToTxn} and the managed-ledger append: the TC may
flip the header (commit
+ * or abort) in between, and the entry still lands. On commit that's harmless
— the message is
+ * visible. On abort, the subsequent segment-event delivery marks the txn
ABORTED in the cache and
+ * {@link #isTxnAborted} filters it. This relies on the TC publishing the
segment event <em>after</em>
+ * the header CAS lands so a participant that lost the race always learns the
decision. The legacy
+ * {@code TopicTransactionBuffer} has the same window with marker-message
ordering.
+ *
+ * <p><b>In-memory growth.</b> Terminal txns stay in the {@code txns} cache
for the segment's
+ * lifetime so {@code isTxnAborted} can answer authoritatively for dispatcher
reads — evicting a
+ * COMMITTED entry would mean the default "unknown → aborted" filter wrongly
hides its messages.
+ * Long-running segments with high txn turnover will accumulate cached
entries. Cache pruning tied
+ * to data-ledger trimming / header GC is a P5/P6 concern.
+ */
+@CustomLog
+public class MetadataTransactionBuffer implements TransactionBuffer {
+
+ private final PersistentTopic topic;
+ private final ManagedLedger ledger;
+ private final TxnMetadataStore txnStore;
+ private final String segmentName;
+ private final TopicTransactionBuffer.MaxReadPositionCallBack
maxReadPositionCallBack;
+
+ private final CompletableFuture<Void> recoveryFuture = new
CompletableFuture<>();
+ private volatile AutoCloseable subscription;
+ private volatile boolean closed;
+
+ /** Guards {@link #txns} + {@link #maxReadPosition} + {@link
#lastDispatchable}. */
+ private final Object lock = new Object();
+
+ /** Cached per-txn state, populated by appendBufferToTxn and refreshed by
event reconcile. */
+ private final Map<String, TxnEntry> txns = new HashMap<>();
+
+ private Position maxReadPosition;
+ private Position lastDispatchable;
+
+ private final LongAdder committedCount = new LongAdder();
+ private final LongAdder abortedCount = new LongAdder();
+
+ public MetadataTransactionBuffer(PersistentTopic topic, TxnMetadataStore
txnStore) {
+ this.topic = topic;
+ this.ledger = topic.getManagedLedger();
+ this.txnStore = txnStore;
+ this.segmentName = topic.getName();
+ this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
+ this.maxReadPosition = ledger.getLastConfirmedEntry();
+ this.lastDispatchable = this.maxReadPosition;
+ recover();
+ }
+
+ // ---- Recovery ----------------------------------------------------------
+
+ private void recover() {
+ AutoCloseable handle;
+ try {
+ handle = txnStore.subscribeSegmentEvents(segmentName, path ->
triggerReconcile());
+ } catch (MetadataStoreException e) {
+ recoveryFuture.completeExceptionally(e);
+ return;
+ }
+ subscription = handle;
+
+ // Scan all /txn-op records for this segment, group by txnId.
+ Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
+ txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ TxnOp op = TxnMetadataStore.fromJson(r.getValue(),
TxnOp.class);
+ String txnIdKey =
TxnPaths.txnIdFromOpPath(r.getStat().getPath());
+ if (txnIdKey == null) {
+ return;
+ }
+ opsByTxn.computeIfAbsent(txnIdKey, k -> new ArrayList<>())
+ .add(PositionFactory.create(op.getLedgerId(),
op.getEntryId()));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // Future propagation handled by listWritesBySegment's
returned future.
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ })
+ .thenCompose(__ -> {
+ // Fan out one header read per distinct txnId; build initial state.
+ List<CompletableFuture<Void>> reads = new ArrayList<>();
+ opsByTxn.forEach((txnIdKey, positions) -> reads.add(
+ txnStore.getHeader(txnIdKey).thenAccept(opt ->
applyHeaderForRecovery(
+ txnIdKey, opt, positions))));
+ return FutureUtil.waitForAll(reads);
+ })
+ .whenComplete((__, err) -> {
+ if (err != null) {
+ log.error().attr("segment",
segmentName).exception(err).log("TB recovery failed");
+ // Close the subscription we opened above so the listener
doesn't outlive a
+ // failed-to-recover TB instance (closeAsync may never be
called if recovery never
+ // succeeded).
+ closeSubscriptionQuietly();
+ recoveryFuture.completeExceptionally(err);
+ return;
+ }
+ synchronized (lock) {
+ recomputeMaxReadPositionLocked();
+ }
+ recoveryFuture.complete(null);
+ // Drain any events that fired between subscribe and now —
triggerReconcile short-
+ // circuits while recoveryFuture is not done, so we explicitly
kick a reconcile pass
+ // now to pick up state transitions whose only notification landed
in that window.
+ triggerReconcile();
+ });
+ }
+
+ private void applyHeaderForRecovery(String txnIdKey,
Optional<Versioned<TxnHeader>> opt, List<Position> positions) {
+ TxnState state = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
+ Position first =
positions.stream().min(Position::compareTo).orElse(null);
+ synchronized (lock) {
+ txns.put(txnIdKey, new TxnEntry(state, first));
+ }
+ // Schedule op-record cleanup for terminal txns (best-effort, async).
+ if (state.isTerminal()) {
+ cleanupOpRecords(txnIdKey);
+ }
+ }
+
+ // ---- Publish path ------------------------------------------------------
+
+ @Override
+ public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
+ // Retain so the buffer survives the chain — release in the terminal
handlers.
+ buffer.retain();
+ return recoveryFuture.thenCompose(__ -> internalAppend(txnId, buffer))
+ .whenComplete((p, ex) -> buffer.release());
+ }
+
+ private CompletableFuture<Position> internalAppend(TxnID txnId, ByteBuf
buffer) {
+ if (closed) {
+ return FutureUtil.failedFuture(new
BrokerServiceException.ServiceUnitNotReadyException(
+ "Transaction buffer is closed"));
+ }
+ String txnIdKey = TxnIds.toKey(txnId);
+ return readStateCacheFirst(txnIdKey)
+ .thenCompose(state -> {
+ if (state.isTerminal()) {
+ return FutureUtil.failedFuture(new
BrokerServiceException.NotAllowedException(
+ "Transaction " + txnId + " is already " +
state + " — TxnConflict"));
+ }
+ return appendToLedger(buffer)
+ .thenCompose(position -> recordOp(txnId, txnIdKey,
position));
+ });
+ }
+
+ private CompletableFuture<TxnState> readStateCacheFirst(String txnIdKey) {
+ synchronized (lock) {
+ TxnEntry cached = txns.get(txnIdKey);
+ if (cached != null) {
+ return CompletableFuture.completedFuture(cached.state);
+ }
+ }
+ return txnStore.getHeader(txnIdKey).thenApply(opt -> {
+ TxnState state = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
+ synchronized (lock) {
+ txns.putIfAbsent(txnIdKey, new TxnEntry(state, null));
+ }
+ return state;
+ });
+ }
+
+ private CompletableFuture<Position> appendToLedger(ByteBuf buffer) {
+ CompletableFuture<Position> result = new CompletableFuture<>();
+ ledger.asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
+ @Override
+ public void addComplete(Position position, ByteBuf entryData,
Object ctx) {
+ result.complete(position);
+ }
+
+ @Override
+ public void addFailed(ManagedLedgerException exception, Object
ctx) {
+ result.completeExceptionally(exception);
+ }
+ }, null);
+ return result;
+ }
+
+ private CompletableFuture<Position> recordOp(TxnID txnId, String txnIdKey,
Position position) {
+ TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
+ position.getLedgerId(), position.getEntryId());
+ return txnStore.appendOp(txnIdKey, op).thenApply(stat -> {
+ synchronized (lock) {
+ TxnEntry entry = txns.get(txnIdKey);
+ // Only an OPEN entry pins maxReadPosition. If a concurrent
reconcile flipped this
+ // txn to terminal between the cache-first authorization read
and this update, do
+ // NOT resurrect it as OPEN — the ML append still lands but
isTxnAborted will mask
+ // it for aborted txns (see class javadoc on the publish-side
TOCTOU window).
+ if (entry != null && entry.state == TxnState.OPEN) {
+ if (entry.firstPosition == null ||
position.compareTo(entry.firstPosition) < 0) {
+ entry.firstPosition = position;
+ recomputeMaxReadPositionLocked();
+ }
+ }
+ }
+ return position;
+ });
+ }
+
+ // ---- Reconcile (event-driven) -----------------------------------------
+
+ private void triggerReconcile() {
+ if (closed || !recoveryFuture.isDone()) {
+ return;
+ }
+ Set<String> snapshot;
+ synchronized (lock) {
+ snapshot = new HashSet<>(openTxnsLocked());
+ }
+ if (snapshot.isEmpty()) {
+ return;
+ }
+ List<CompletableFuture<Void>> reads = new ArrayList<>(snapshot.size());
+ for (String txnIdKey : snapshot) {
+ reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
+ TxnState newState = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
+ applyReconciledState(txnIdKey, newState);
+ }));
+ }
+ FutureUtil.waitForAll(reads).whenComplete((__, err) -> {
+ if (err != null) {
+ log.warn().attr("segment",
segmentName).exception(err).log("Reconcile encountered error");
+ }
+ });
+ }
+
+ private void applyReconciledState(String txnIdKey, TxnState newState) {
+ boolean cleanup = false;
+ synchronized (lock) {
+ TxnEntry entry = txns.get(txnIdKey);
+ if (entry == null || entry.state == newState) {
+ return;
+ }
+ entry.state = newState;
+ if (newState.isTerminal()) {
+ entry.firstPosition = null;
+ cleanup = true;
+ if (newState == TxnState.COMMITTED) {
+ committedCount.increment();
+ } else {
+ abortedCount.increment();
+ }
+ recomputeMaxReadPositionLocked();
+ }
+ }
+ if (cleanup) {
+ cleanupOpRecords(txnIdKey);
+ }
+ }
+
+ /**
+ * Delete every {@code /txn-op} record for {@code (this segment,
txnIdKey)}. Best-effort —
+ * failures are logged and retried by the next reconcile.
+ */
+ private void cleanupOpRecords(String txnIdKey) {
+ txnStore.deleteWriteOpsForSegmentAndTxn(segmentName, txnIdKey)
+ .exceptionally(err -> {
+ log.warn().attr("segment", segmentName).attr("txnId",
txnIdKey).exception(err)
+ .log("Op-record cleanup failed; will retry on next
reconcile");
+ return null;
+ });
+ }
+
+ // ---- maxReadPosition ---------------------------------------------------
+
+ private Set<String> openTxnsLocked() {
+ Set<String> open = new HashSet<>();
+ for (Map.Entry<String, TxnEntry> e : txns.entrySet()) {
+ if (e.getValue().state == TxnState.OPEN) {
+ open.add(e.getKey());
+ }
+ }
+ return open;
+ }
+
+ private void recomputeMaxReadPositionLocked() {
+ Position min = null;
+ for (TxnEntry e : txns.values()) {
+ if (e.state == TxnState.OPEN && e.firstPosition != null) {
+ if (min == null || e.firstPosition.compareTo(min) < 0) {
+ min = e.firstPosition;
+ }
+ }
+ }
+ Position next = (min == null) ? lastDispatchable :
ledger.getPreviousPosition(min);
+ Position prev = maxReadPosition;
+ maxReadPosition = next;
+ // Only fire the callback on forward motion. Initial-state setup at
recovery may move
+ // the position backwards (LAC -> previous(firstOpenWrite)); that's
not a "moved forward".
+ if (next.compareTo(prev) > 0 && maxReadPositionCallBack != null) {
+ maxReadPositionCallBack.maxReadPositionMovedForward(prev, next);
+ }
+ }
+
+ // ---- SPI surface (lifecycle, queries) ---------------------------------
+
+ @Override
+ public Position getMaxReadPosition() {
+ synchronized (lock) {
+ return maxReadPosition;
+ }
+ }
+
+ @Override
+ public boolean isTxnAborted(TxnID txnID, Position readPosition) {
+ String key = TxnIds.toKey(txnID);
+ synchronized (lock) {
+ TxnEntry entry = txns.get(key);
+ if (entry == null) {
+ // No record of this txn — must be either orphan (broker crash
mid-publish) or
+ // long-aborted-and-cleaned. Filtering is the safe default.
+ return true;
+ }
+ return entry.state == TxnState.ABORTED;
+ }
+ }
+
+ @Override
+ public void syncMaxReadPositionForNormalPublish(Position position, boolean
isMarkerMessage) {
+ if (isMarkerMessage) {
+ return;
+ }
+ topic.updateLastDispatchablePosition(position);
+ synchronized (lock) {
+ lastDispatchable = position;
+ recomputeMaxReadPositionLocked();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> checkIfTBRecoverCompletely() {
+ return recoveryFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ closed = true;
+ closeSubscriptionQuietly();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void closeSubscriptionQuietly() {
+ AutoCloseable handle = subscription;
+ if (handle == null) {
+ return;
+ }
+ subscription = null;
+ try {
+ handle.close();
+ } catch (Throwable t) {
+ log.warn().attr("segment", segmentName).exception(t).log("Failed
to close subscription");
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
+ // No-op for the metadata-driven TB: v5 commits are driven by the TC
writing /txn header CAS
+ // and the segment-event stream, not by direct SPI calls.
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
+ // No-op (see commitTxn).
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // ---- SPI surface (snapshots / readers — unused in v5) -----------------
+
+ @Override
+ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<TransactionBufferReader>
openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+ return FutureUtil.failedFuture(new
BrokerServiceException.NotAllowedException(
+ "openTransactionBufferReader is not supported on segment
topics"));
+ }
+
+ @Override
+ public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> clearSnapshot() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> clearSnapshotAndClose() {
+ return closeAsync();
+ }
+
+ @Override
+ public AbortedTxnProcessor.SnapshotType getSnapshotType() {
+ return null;
+ }
+
+ @Override
+ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
+ return null;
+ }
+
+ @Override
+ public TransactionBufferStats getStats(boolean lowWaterMarks, boolean
segmentStats) {
+ return null;
+ }
+
+ @Override
+ public TransactionBufferStats getStats(boolean lowWaterMarks) {
+ return null;
+ }
+
+ @Override
+ public long getOngoingTxnCount() {
+ synchronized (lock) {
+ long n = 0;
+ for (TxnEntry e : txns.values()) {
+ if (e.state == TxnState.OPEN) {
+ n++;
+ }
+ }
+ return n;
+ }
+ }
+
+ @Override
+ public long getAbortedTxnCount() {
+ return abortedCount.sum();
+ }
+
+ @Override
+ public long getCommittedTxnCount() {
+ return committedCount.sum();
+ }
+
+ // ---- helpers ----------------------------------------------------------
+
+ private static final class TxnEntry {
+ TxnState state;
+ Position firstPosition; // null if no writes on this segment yet, or
after termination
+
+ TxnEntry(TxnState state, Position firstPosition) {
+ this.state = state;
+ this.firstPosition = firstPosition;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferProvider.java
new file mode 100644
index 00000000000..036098bebe4
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferProvider.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+
+/**
+ * Provider that builds a {@link MetadataTransactionBuffer} backed by the
broker's local
+ * {@code MetadataStore}. Intended for {@code segment://} topics. The
dispatching provider routes
+ * here when the topic is a segment topic.
+ */
+public class MetadataTransactionBufferProvider implements
TransactionBufferProvider {
+
+ @Override
+ public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+ PersistentTopic topic = (PersistentTopic) originTopic;
+ TxnMetadataStore txnStore = new TxnMetadataStore(
+ topic.getBrokerService().getPulsar().getLocalMetadataStore());
+ return new MetadataTransactionBuffer(topic, txnStore);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnIds.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnIds.java
new file mode 100644
index 00000000000..b3b7c961e9e
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnIds.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+/**
+ * Round-trip between {@link TxnID} and the string form used in metadata-store
paths and
+ * partition keys (e.g. {@code /txn/<txnId>}, {@code partitionKey = txnId}).
+ *
+ * <p>Format: {@code <mostSigBits>_<leastSigBits>}. Path-friendly (no
parens/commas) and round-trips
+ * losslessly for any pair of {@code long} values — including negatives —
because {@code _} cannot
+ * appear inside a decimal long literal (Java's {@link Long#parseLong} rejects
it). {@link TxnID#toString}
+ * uses {@code (most,least)} which leaks shell-unfriendly characters into
paths; this helper is the
+ * single point that controls the on-the-wire encoding.
+ */
+public final class TxnIds {
+
+ private static final char SEP = '_';
+
+ /** @return {@code <most>_<least>}, suitable for use as a metadata-store
path segment. */
+ public static String toKey(TxnID txnId) {
+ return txnId.getMostSigBits() + String.valueOf(SEP) +
txnId.getLeastSigBits();
+ }
+
+ /**
+ * @return the {@link TxnID} parsed from {@code key}.
+ * @throws IllegalArgumentException if {@code key} is not in the expected
{@code <most>_<least>} form
+ */
+ public static TxnID fromKey(String key) {
+ int sep = key.indexOf(SEP);
+ if (sep <= 0 || sep == key.length() - 1 || key.indexOf(SEP, sep + 1)
>= 0) {
+ throw new IllegalArgumentException("Invalid txnId key: " + key);
+ }
+ long most = Long.parseLong(key, 0, sep, 10);
+ long least = Long.parseLong(key, sep + 1, key.length(), 10);
+ return new TxnID(most, least);
+ }
+
+ private TxnIds() {}
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 5c2087e3fba..8e20ed305f8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -155,6 +155,39 @@ public class TxnMetadataStore {
consumer);
}
+ /**
+ * Delete every {@code /txn-op} write record for {@code (segment, txnId)}
— used by the TB once
+ * an event tells it the txn is terminal. Path extraction follows the
layout in
+ * {@link TxnPaths#txnIdFromOpPath}. Best-effort: tolerates concurrent
deletions via
+ * {@link MetadataStore#deleteIfExists}.
+ */
+ public CompletableFuture<Void> deleteWriteOpsForSegmentAndTxn(String
segment, String txnId) {
+ java.util.List<String> paths = new java.util.ArrayList<>();
+ return listWritesBySegment(segment, new ScanConsumer() {
+ @Override
+ public void onNext(org.apache.pulsar.metadata.api.GetResult r) {
+ if
(txnId.equals(TxnPaths.txnIdFromOpPath(r.getStat().getPath()))) {
+ paths.add(r.getStat().getPath());
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ Set<Option> opts = Set.of(new Option.PartitionKey(txnId));
+ CompletableFuture<?>[] deletes = new
CompletableFuture<?>[paths.size()];
+ for (int i = 0; i < paths.size(); i++) {
+ deletes[i] = store.deleteIfExists(paths.get(i),
Optional.empty(), opts);
+ }
+ return CompletableFuture.allOf(deletes);
+ });
+ }
+
/**
* Stream open transactions whose deadline falls in {@code
[fromMsInclusive, toMsInclusive]}.
* Pass {@code null} on either bound for an unbounded range.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index c8952af1048..d90c7a470cf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -140,5 +140,25 @@ public final class TxnPaths {
return state.name() + ":" + longKey(finalizedMs);
}
+ /**
+ * Extract the {@code txnId} key from a path under {@link #TXN_OP_PREFIX}.
The path layout is
+ * {@code /txn-op/<txnId>-<paddedSeq>}; txnId itself is {@code
<most>-<least>} (one dash), so
+ * the sequence dash is always the last one and the substring before it is
the txnId key.
+ *
+ * @return the txnId key, or {@code null} if {@code opPath} doesn't have
the expected shape
+ */
+ public static String txnIdFromOpPath(String opPath) {
+ int lastSlash = opPath.lastIndexOf('/');
+ if (lastSlash < 0) {
+ return null;
+ }
+ String name = opPath.substring(lastSlash + 1);
+ int dash = name.lastIndexOf('-');
+ if (dash <= 0) {
+ return null;
+ }
+ return name.substring(0, dash);
+ }
+
private TxnPaths() {}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProviderTest.java
new file mode 100644
index 00000000000..fccba498973
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/DispatchingTransactionBufferProviderTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+/**
+ * Routing tests for the default {@link DispatchingTransactionBufferProvider}:
{@code segment://}
+ * topics get {@link MetadataTransactionBuffer}, everything else falls through
to the legacy
+ * {@link TopicTransactionBuffer}.
+ */
+public class DispatchingTransactionBufferProviderTest {
+
+ @Test
+ public void routesSegmentTopicToMetadataBuffer() throws Exception {
+ try (MetadataStoreExtended store =
MetadataStoreExtended.create("memory:dispatcher-test",
+ MetadataStoreConfig.builder().fsyncEnable(false).build())) {
+ PersistentTopic topic =
mockTopic("segment://public/default/topic/0000-ffff-0", store);
+ TransactionBuffer tb = new
DispatchingTransactionBufferProvider().newTransactionBuffer(topic);
+ assertThat(tb).isInstanceOf(MetadataTransactionBuffer.class);
+ tb.closeAsync().get();
+ }
+ }
+
+ @Test
+ public void routesPersistentTopicToLegacyBuffer() {
+ PersistentTopic topic = mockTopic("persistent://public/default/topic",
null);
+ TransactionBuffer tb = new
DispatchingTransactionBufferProvider().newTransactionBuffer(topic);
+ assertThat(tb).isInstanceOf(TopicTransactionBuffer.class);
+ }
+
+ private static PersistentTopic mockTopic(String name,
MetadataStoreExtended store) {
+ PersistentTopic t = mock(PersistentTopic.class, RETURNS_DEEP_STUBS);
+ when(t.getName()).thenReturn(name);
+ ManagedLedger ml = mock(ManagedLedger.class);
+ when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(0,
0));
+ doAnswer(inv -> {
+ AsyncCallbacks.AddEntryCallback cb = inv.getArgument(1);
+ cb.addComplete(PositionFactory.create(0, 0), inv.getArgument(0),
inv.getArgument(2));
+ return null;
+ }).when(ml).asyncAddEntry(any(ByteBuf.class), any(), any());
+ when(t.getManagedLedger()).thenReturn(ml);
+ if (store != null) {
+
when(t.getBrokerService().getPulsar().getLocalMetadataStore()).thenReturn(store);
+ }
+ return t;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
new file mode 100644
index 00000000000..5e4dfe736b7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link MetadataTransactionBuffer} against the in-memory
{@code MetadataStore}.
+ * The TB's collaborators ({@link PersistentTopic}, {@link ManagedLedger}) are
mocked so the test
+ * focuses on the buffer's state machine and metadata interactions.
+ */
+public class MetadataTransactionBufferTest {
+
+ private static final String SEGMENT =
"segment://public/default/topic/0000-ffff-0";
+
+ private MetadataStore store;
+ private TxnMetadataStore txnStore;
+ private ManagedLedger ledger;
+ private PersistentTopic topic;
+ private AtomicLong nextEntryId;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = MetadataStoreFactory.create("memory:local",
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ txnStore = new TxnMetadataStore(store);
+ nextEntryId = new AtomicLong(1);
+ ledger = mockManagedLedger();
+ topic = mockTopic(SEGMENT, ledger);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ @Test
+ public void openTxnAppend_pinsMaxReadPosition() throws Exception {
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ Position firstPos = tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+ // maxReadPosition should be just before the first txn entry.
+
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getPreviousPosition(firstPos));
+ assertThat(tb.isTxnAborted(txnId, firstPos)).isFalse();
+ assertThat(tb.getOngoingTxnCount()).isOne();
+ }
+
+ @Test
+ public void concurrentOpenTxns_minPositionPins() throws Exception {
+ TxnID t1 = new TxnID(1, 1);
+ TxnID t2 = new TxnID(1, 2);
+ createOpenHeader(t1);
+ createOpenHeader(t2);
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ Position p1 = tb.appendBufferToTxn(t1, 0, payload("a")).get();
+ tb.appendBufferToTxn(t2, 0, payload("b")).get();
+
+
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getPreviousPosition(p1));
+ }
+
+ @Test
+ public void commitEvent_dropsTxn_advancesMaxReadPosition() throws
Exception {
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+ tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+ // TC-side: flip header to COMMITTED + publish segment event.
+ commitTxn(txnId);
+ txnStore.publishSegmentEvent(SEGMENT, new
TxnEvent(TxnIds.toKey(txnId), TxnState.COMMITTED)).get();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertThat(tb.getOngoingTxnCount()).isZero();
+ assertThat(tb.getCommittedTxnCount()).isOne();
+ });
+
+ // No OPEN txns pin the read position — should now sit at LAC.
+
assertThat(tb.getMaxReadPosition()).isEqualTo(ledger.getLastConfirmedEntry());
+ }
+
+ @Test
+ public void abortEvent_marksAborted_isTxnAbortedTrue() throws Exception {
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+ Position firstPos = tb.appendBufferToTxn(txnId, 0, payload("a")).get();
+
+ abortTxn(txnId);
+ txnStore.publishSegmentEvent(SEGMENT, new
TxnEvent(TxnIds.toKey(txnId), TxnState.ABORTED)).get();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertThat(tb.isTxnAborted(txnId, firstPos)).isTrue();
+ assertThat(tb.getAbortedTxnCount()).isOne();
+ assertThat(tb.getOngoingTxnCount()).isZero();
+ });
+ }
+
+ @Test
+ public void appendToCommittedTxn_failsTxnConflict() throws Exception {
+ TxnID txnId = new TxnID(1, 1);
+ // Pre-set header to COMMITTED — txn is terminal before any append.
+ Stat created = txnStore.createHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
+ Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
+ assertThat(created.getVersion()).isZero();
+
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ assertThatThrownBy(() -> tb.appendBufferToTxn(txnId, 0,
payload("a")).get())
+
.hasCauseInstanceOf(BrokerServiceException.NotAllowedException.class);
+ }
+
+ @Test
+ public void unknownTxn_isTxnAbortedReturnsTrue() throws Exception {
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ // Never seen — must be filtered as aborted (orphan or long-cleaned).
+ assertThat(tb.isTxnAborted(new TxnID(99, 99),
PositionFactory.create(1, 0))).isTrue();
+ }
+
+ @Test
+ public void recovery_rebuildsOpenTxnStateFromOpRecords() throws Exception {
+ // Pre-populate: one OPEN txn with two writes on this segment, one
COMMITTED txn with one
+ // (lingering) write, and one txn touching a different segment (must
not appear here).
+ TxnID openTxn = new TxnID(1, 1);
+ TxnID committedTxn = new TxnID(1, 2);
+ TxnID otherSegTxn = new TxnID(1, 3);
+
+ createOpenHeader(openTxn);
+ txnStore.appendOp(TxnIds.toKey(openTxn),
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L)).get();
+ txnStore.appendOp(TxnIds.toKey(openTxn),
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L)).get();
+
+ txnStore.createHeader(TxnIds.toKey(committedTxn),
+ new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
+ Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
+ txnStore.appendOp(TxnIds.toKey(committedTxn),
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L)).get();
+
+ createOpenHeader(otherSegTxn);
+ txnStore.appendOp(TxnIds.toKey(otherSegTxn),
+ new TxnOp(TxnOpKind.WRITE,
"segment://public/default/topic/other-seg", null, 5L, 9L)).get();
+
+ MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
+ tb.checkIfTBRecoverCompletely().get();
+
+ // openTxn pins max read position at min(5:1, 5:2) - 1 = 5:0.
+
assertThat(tb.getMaxReadPosition()).isEqualTo(PositionFactory.create(5, 0));
+ assertThat(tb.getOngoingTxnCount()).isOne();
+ assertThat(tb.isTxnAborted(openTxn, PositionFactory.create(5,
1))).isFalse();
+ // Committed-txn lingering writes should not pin max read position;
the txn isn't in OPEN set.
+ // (Cleanup is async; we don't assert on its completion here.)
+ }
+
+ // ---- helpers -----------------------------------------------------------
+
+ private static ByteBuf payload(String s) {
+ return Unpooled.copiedBuffer(s.getBytes());
+ }
+
+ private void createOpenHeader(TxnID txnId) throws Exception {
+ txnStore.createHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
+ Instant.ofEpochMilli(1000), null)).get();
+ }
+
+ private void commitTxn(TxnID txnId) throws Exception {
+ var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+ var h = v.value();
+ txnStore.updateHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.COMMITTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ v.version()).get();
+ }
+
+ private void abortTxn(TxnID txnId) throws Exception {
+ var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+ var h = v.value();
+ txnStore.updateHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.ABORTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ v.version()).get();
+ }
+
+ private ManagedLedger mockManagedLedger() {
+ ManagedLedger ml = mock(ManagedLedger.class);
+ when(ml.getLastConfirmedEntry()).thenReturn(PositionFactory.create(10,
0));
+ // asyncAddEntry: synthesize a unique increasing position, invoke
addComplete.
+ doAnswer(inv -> {
+ AsyncCallbacks.AddEntryCallback cb = inv.getArgument(1);
+ Position p = PositionFactory.create(10,
nextEntryId.getAndIncrement());
+ cb.addComplete(p, inv.getArgument(0), inv.getArgument(2));
+ return null;
+ }).when(ml).asyncAddEntry(any(ByteBuf.class), any(), any());
+ when(ml.getPreviousPosition(any())).thenAnswer(inv -> {
+ Position p = inv.getArgument(0);
+ return PositionFactory.create(p.getLedgerId(), p.getEntryId() - 1);
+ });
+ return ml;
+ }
+
+ private PersistentTopic mockTopic(String name, ManagedLedger ml) {
+ PersistentTopic t = mock(PersistentTopic.class);
+ when(t.getName()).thenReturn(name);
+ when(t.getManagedLedger()).thenReturn(ml);
+ when(t.getMaxReadPositionCallBack()).thenReturn(null);
+ return t;
+ }
+
+ @SuppressWarnings("unused") // referenced from Optional to suppress
unused-imports
+ private static <T> Optional<T> emptyOpt() {
+ return Optional.empty();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnIdsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnIdsTest.java
new file mode 100644
index 00000000000..20dd77ea0a3
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnIdsTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.testng.annotations.Test;
+
+public class TxnIdsTest {
+
+ @Test
+ public void roundTripPositive() {
+ TxnID id = new TxnID(1, 2);
+ assertThat(TxnIds.toKey(id)).isEqualTo("1_2");
+ assertThat(TxnIds.fromKey("1_2")).isEqualTo(id);
+ }
+
+ @Test
+ public void roundTripNegativeMostSigBits() {
+ TxnID id = new TxnID(-1, 1);
+ assertThat(TxnIds.toKey(id)).isEqualTo("-1_1");
+ assertThat(TxnIds.fromKey("-1_1")).isEqualTo(id);
+ }
+
+ @Test
+ public void roundTripBothNegative() {
+ TxnID id = new TxnID(-7, -42);
+ assertThat(TxnIds.toKey(id)).isEqualTo("-7_-42");
+ assertThat(TxnIds.fromKey("-7_-42")).isEqualTo(id);
+ }
+
+ @Test
+ public void roundTripExtremes() {
+ TxnID a = new TxnID(Long.MIN_VALUE, Long.MAX_VALUE);
+ TxnID b = new TxnID(Long.MAX_VALUE, Long.MIN_VALUE);
+ assertThat(TxnIds.fromKey(TxnIds.toKey(a))).isEqualTo(a);
+ assertThat(TxnIds.fromKey(TxnIds.toKey(b))).isEqualTo(b);
+ }
+
+ @Test
+ public void fromKeyRejectsMissingSeparator() {
+ assertThatThrownBy(() -> TxnIds.fromKey("nope"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromKeyRejectsTrailingSeparator() {
+ assertThatThrownBy(() -> TxnIds.fromKey("1_"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void fromKeyRejectsExtraSeparators() {
+ assertThatThrownBy(() -> TxnIds.fromKey("1_2_3"))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+}