This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fc00c02adb [improve][broker] PIP-473: MetadataPendingAckStore for 
segment subscriptions (#25772)
7fc00c02adb is described below

commit 7fc00c02adb973e8bc3f7e7df65eccd1e6517b7f
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 15 10:07:06 2026 -0700

    [improve][broker] PIP-473: MetadataPendingAckStore for segment 
subscriptions (#25772)
---
 .../buffer/impl/MetadataTransactionBuffer.java     |   7 +-
 .../transaction/metadata/TxnMetadataStore.java     |  32 +-
 .../pulsar/broker/transaction/metadata/TxnOp.java  |  10 +-
 ...patchingTransactionPendingAckStoreProvider.java |  54 ++++
 .../pendingack/impl/MetadataPendingAckStore.java   | 325 +++++++++++++++++++++
 .../impl/MetadataPendingAckStoreProvider.java      |  48 +++
 .../buffer/impl/MetadataTransactionBufferTest.java |   8 +-
 .../transaction/metadata/TxnMetadataStoreTest.java |  12 +-
 ...hingTransactionPendingAckStoreProviderTest.java |  68 +++++
 .../impl/MetadataPendingAckStoreTest.java          | 276 +++++++++++++++++
 10 files changed, 825 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 42f38a78428..b6b8d0328ab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -152,7 +152,10 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
 
             @Override
             public void onError(Throwable throwable) {
-                // Future propagation handled by listWritesBySegment's 
returned future.
+                // Recovery still fails loudly via the scan's returned future 
and the terminal
+                // whenComplete below; logging here captures the cause with 
segment context.
+                log.warn().attr("segment", segmentName).exception(throwable)
+                        .log("TB recovery scan errored");
             }
 
             @Override
@@ -261,7 +264,7 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
 
     private CompletableFuture<Position> recordOp(TxnID txnId, String txnIdKey, 
Position position) {
         TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
-                position.getLedgerId(), position.getEntryId());
+                position.getLedgerId(), position.getEntryId(), null);
         return txnStore.appendOp(txnIdKey, op).thenApply(stat -> {
             synchronized (lock) {
                 TxnEntry entry = txns.get(txnIdKey);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 8e20ed305f8..bdc0bda5fd6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
+import lombok.CustomLog;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -49,6 +50,7 @@ import org.apache.pulsar.metadata.api.Stat;
  * <p>The façade is stateless apart from holding the store reference — index 
population happens via
  * options on writes, so there is no explicit registration step.
  */
+@CustomLog
 public class TxnMetadataStore {
 
     /** Sequence-keys delta used by all append-only streams in this layout. */
@@ -162,8 +164,29 @@ public class TxnMetadataStore {
      * {@link MetadataStore#deleteIfExists}.
      */
     public CompletableFuture<Void> deleteWriteOpsForSegmentAndTxn(String 
segment, String txnId) {
+        return scanAndDeleteOpsForTxn(txnId, collector -> 
listWritesBySegment(segment, collector));
+    }
+
+    /**
+     * Delete every {@code /txn-op} ack record for {@code (segment, 
subscription, txnId)} — used by
+     * the PendingAckStore once an event tells it the txn is terminal. Same 
path-extraction +
+     * best-effort semantics as {@link #deleteWriteOpsForSegmentAndTxn}.
+     */
+    public CompletableFuture<Void> 
deleteAckOpsForSegmentSubscriptionAndTxn(String segment, String subscription,
+                                                                            
String txnId) {
+        return scanAndDeleteOpsForTxn(txnId,
+                collector -> listAcksBySegmentSubscription(segment, 
subscription, collector));
+    }
+
+    /**
+     * Shared implementation: invoke the supplied scan with a collector that 
captures only paths
+     * matching {@code txnId}, then delete each captured path with the 
txn-scoped partition key.
+     */
+    private CompletableFuture<Void> scanAndDeleteOpsForTxn(
+            String txnId,
+            java.util.function.Function<ScanConsumer, CompletableFuture<Void>> 
scan) {
         java.util.List<String> paths = new java.util.ArrayList<>();
-        return listWritesBySegment(segment, new ScanConsumer() {
+        ScanConsumer collector = new ScanConsumer() {
             @Override
             public void onNext(org.apache.pulsar.metadata.api.GetResult r) {
                 if 
(txnId.equals(TxnPaths.txnIdFromOpPath(r.getStat().getPath()))) {
@@ -173,12 +196,17 @@ public class TxnMetadataStore {
 
             @Override
             public void onError(Throwable throwable) {
+                // The caller observes failure via the scan's returned future; 
logging here so
+                // the cause is visible alongside the txnId context.
+                log.warn().attr("txnId", txnId).exception(throwable)
+                        .log("Op-record cleanup scan errored");
             }
 
             @Override
             public void onCompleted() {
             }
-        }).thenCompose(__ -> {
+        };
+        return scan.apply(collector).thenCompose(__ -> {
             Set<Option> opts = Set.of(new Option.PartitionKey(txnId));
             CompletableFuture<?>[] deletes = new 
CompletableFuture<?>[paths.size()];
             for (int i = 0; i < paths.size(); i++) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
index 04e4f25807f..8941a098e82 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
@@ -27,7 +27,8 @@ import lombok.NoArgsConstructor;
  * {@code /txn-op/&lt;txnId&gt;-&lt;seq&gt;} (with {@code partitionKey = 
txnId}) every time a
  * participant applies a transactional operation on a segment.
  *
- * <p>{@link #kind} discriminates writes from acks. {@link #subscription} is 
set only for acks.
+ * <p>{@link #kind} discriminates writes from acks. {@link #subscription} and 
{@link #cumulative}
+ * are only set on {@link TxnOpKind#ACK} entries.
  */
 @Data
 @NoArgsConstructor
@@ -48,4 +49,11 @@ public class TxnOp {
 
     /** Managed-ledger entry id of the entry this op refers to. */
     private long entryId;
+
+    /**
+     * For {@link TxnOpKind#ACK} entries only: {@code true} if this is a 
cumulative ack
+     * (markDelete up-to-and-including the position), {@code false} (or 
omitted) for individual
+     * acks. Always {@code null} for {@link TxnOpKind#WRITE} entries.
+     */
+    private Boolean cumulative;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProvider.java
new file mode 100644
index 00000000000..821fd997f39
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Routing {@link TransactionPendingAckStoreProvider}: returns {@link 
MetadataPendingAckStore} for
+ * subscriptions on {@code segment://} topics (PIP-473) and falls back to 
{@link MLPendingAckStore}
+ * via {@link MLPendingAckStoreProvider} for {@code persistent://} / {@code 
topic://}.
+ *
+ * <p>Available but not the configured default — operators (and P5.4) flip
+ * {@code transactionPendingAckStoreProviderClassName} to opt segment topics 
into the
+ * metadata-driven implementation once the v5 TC is in place.
+ */
+public class DispatchingTransactionPendingAckStoreProvider implements 
TransactionPendingAckStoreProvider {
+
+    private final TransactionPendingAckStoreProvider legacy = new 
MLPendingAckStoreProvider();
+    private final TransactionPendingAckStoreProvider metadata = new 
MetadataPendingAckStoreProvider();
+
+    @Override
+    public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {
+        return delegateFor(subscription).newPendingAckStore(subscription);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription subscription) {
+        return delegateFor(subscription).checkInitializedBefore(subscription);
+    }
+
+    private TransactionPendingAckStoreProvider 
delegateFor(PersistentSubscription subscription) {
+        return TopicName.get(subscription.getTopicName()).isSegment() ? 
metadata : legacy;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
new file mode 100644
index 00000000000..546209c8276
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+
+/**
+ * {@link PendingAckStore} for {@code segment://} subscriptions that reads 
truth from the
+ * metadata-store transaction layout (PIP-473) rather than from a 
per-subscription log.
+ *
+ * <p>Lifecycle:
+ * <ul>
+ *   <li><b>Ack</b> — {@link #appendIndividualAck} / {@link 
#appendCumulativeAck} write a
+ *       {@link TxnOp} record under {@code /txn-op/<txnId>-<seq>} with
+ *       {@code kind=ACK, segment, subscription, ledgerId, entryId, 
cumulative}. The associated
+ *       {@code PendingAckHandle} keeps the in-memory state via the legacy 
interface.</li>
+ *   <li><b>Commit / Abort marks</b> — {@link #appendCommitMark} / {@link 
#appendAbortMark} are
+ *       <b>no-ops</b>. In v5 the TC owns the lifecycle: it CAS-updates the 
header and publishes
+ *       subscription events; this store consumes those events.</li>
+ *   <li><b>State transitions</b> — driven by {@code 
/txn-subscription-events/&lt;seg&gt;:&lt;sub&gt;-*}
+ *       sequence events. The events are wake-ups; the truth is the header. On 
each notification
+ *       we re-read headers for every currently-open txn this subscription is 
involved in and
+ *       call {@code PendingAckHandleImpl.commitTxn} / {@code abortTxn} for 
those that have gone
+ *       terminal — then delete the corresponding {@code /txn-op} ack 
records.</li>
+ *   <li><b>Recovery</b> (Option C) — on {@link #replayAsync}, subscribe to 
the event stream,
+ *       scan {@code idx:acks-by-segment-subscription} for this {@code 
(segment, sub)}, group by
+ *       {@code txnId}, fetch each header, and seed the in-memory open-txn 
set; then mark the
+ *       handle ready and drain any events that fired during recovery.</li>
+ * </ul>
+ *
+ * <p>Same TOCTOU / unbounded-cache caveats documented on {@code 
MetadataTransactionBuffer} apply
+ * here; see that class's javadoc for the TC ordering contract.
+ */
+@CustomLog
+public class MetadataPendingAckStore implements PendingAckStore {
+
+    private final PersistentSubscription subscription;
+    private final TxnMetadataStore txnStore;
+    private final String segmentName;
+    private final String subscriptionName;
+
+    private final Object lock = new Object();
+    /** Set of txnIdKeys we believe are OPEN — populated on append + recovery, 
drained by reconcile. */
+    private final Set<String> openTxns = new HashSet<>();
+
+    private final CompletableFuture<Void> recoveryFuture = new 
CompletableFuture<>();
+    private volatile AutoCloseable eventSubscription;
+    private volatile PendingAckHandleImpl handle;
+    private volatile boolean closed;
+
+    public MetadataPendingAckStore(PersistentSubscription subscription, 
TxnMetadataStore txnStore) {
+        this.subscription = subscription;
+        this.txnStore = txnStore;
+        this.segmentName = subscription.getTopicName();
+        this.subscriptionName = subscription.getName();
+    }
+
+    // ---- Replay / recovery -------------------------------------------------
+
+    @Override
+    public void replayAsync(PendingAckHandleImpl pendingAckHandle, 
ExecutorService executorService) {
+        this.handle = pendingAckHandle;
+        try {
+            this.eventSubscription = txnStore.subscribeSubscriptionEvents(
+                    segmentName, subscriptionName, path -> triggerReconcile());
+        } catch (MetadataStoreException e) {
+            recoveryFuture.completeExceptionally(e);
+            pendingAckHandle.exceptionHandleFuture(e);
+            return;
+        }
+
+        // Scan all ack op records for this (segment, sub) and seed openTxns 
from the headers.
+        // The store delivers per-entry errors via this scan's returned future 
as well — recovery
+        // still fails loudly via the terminal whenComplete below — but we log 
here so the cause
+        // is captured with the segment/subscription context.
+        Set<String> txnIdKeys = ConcurrentHashMap.newKeySet();
+        txnStore.listAcksBySegmentSubscription(segmentName, subscriptionName, 
new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                TxnOp op = TxnMetadataStore.fromJson(r.getValue(), 
TxnOp.class);
+                if (op.getKind() != TxnOpKind.ACK) {
+                    return;
+                }
+                String txnIdKey = 
org.apache.pulsar.broker.transaction.metadata.TxnPaths
+                        .txnIdFromOpPath(r.getStat().getPath());
+                if (txnIdKey != null) {
+                    txnIdKeys.add(txnIdKey);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                log.warn().attr("segment", segmentName).attr("sub", 
subscriptionName)
+                        .exception(throwable).log("PendingAckStore recovery 
scan errored");
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        })
+        .thenCompose(__ -> {
+            List<CompletableFuture<Void>> reads = new 
ArrayList<>(txnIdKeys.size());
+            for (String txnIdKey : txnIdKeys) {
+                reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
+                    TxnState state = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
+                    synchronized (lock) {
+                        openTxns.add(txnIdKey);
+                    }
+                    if (state.isTerminal()) {
+                        // Schedule cleanup — applyTerminal's "remove → 
applied" gate fires once
+                        // and processes (handle commit/abort + delete op 
records).
+                        applyTerminal(txnIdKey, state);
+                    }
+                }));
+            }
+            return FutureUtil.waitForAll(reads);
+        })
+        .whenComplete((__, err) -> {
+            if (err != null) {
+                log.error().attr("segment", segmentName).attr("sub", 
subscriptionName)
+                        .exception(err).log("PendingAckStore recovery failed");
+                closeSubscriptionQuietly();
+                recoveryFuture.completeExceptionally(err);
+                pendingAckHandle.exceptionHandleFuture(err);
+                return;
+            }
+            recoveryFuture.complete(null);
+            pendingAckHandle.changeToReadyState();
+            // Drain any events that fired during recovery.
+            triggerReconcile();
+        });
+    }
+
+    // ---- Ack append --------------------------------------------------------
+
+    @Override
+    public CompletableFuture<Void> appendIndividualAck(TxnID txnID,
+                                                       
List<MutablePair<Position, Integer>> positions) {
+        return appendAcks(txnID, positions, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, Position 
position) {
+        return appendAcks(txnID, 
Collections.singletonList(MutablePair.of(position, 1)), true);
+    }
+
+    private CompletableFuture<Void> appendAcks(TxnID txnID, 
List<MutablePair<Position, Integer>> positions,
+                                               boolean cumulative) {
+        if (closed) {
+            return FutureUtil.failedFuture(new 
IllegalStateException("PendingAckStore is closed"));
+        }
+        String txnIdKey = TxnIds.toKey(txnID);
+        // Append one TxnOp per position. PendingAckHandleImpl already 
validates the txn isn't
+        // terminal before calling us, so we don't repeat the header check 
here.
+        CompletableFuture<?>[] appends = new 
CompletableFuture<?>[positions.size()];
+        for (int i = 0; i < positions.size(); i++) {
+            Position p = positions.get(i).getLeft();
+            TxnOp op = new TxnOp(TxnOpKind.ACK, segmentName, subscriptionName,
+                    p.getLedgerId(), p.getEntryId(), cumulative ? Boolean.TRUE 
: null);
+            appends[i] = txnStore.appendOp(txnIdKey, op);
+        }
+        return CompletableFuture.allOf(appends).thenRun(() -> {
+            synchronized (lock) {
+                openTxns.add(txnIdKey);
+            }
+        });
+    }
+
+    @Override
+    public CompletableFuture<Void> appendCommitMark(TxnID txnID, AckType 
ackType) {
+        // No-op for the metadata-driven store: v5 commits are driven by the 
TC writing the
+        // /txn header CAS and publishing the subscription event, not by 
direct SPI calls.
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType 
ackType) {
+        // No-op (see appendCommitMark).
+        return CompletableFuture.completedFuture(null);
+    }
+
+    // ---- Reconcile (event-driven) -----------------------------------------
+
+    private void triggerReconcile() {
+        if (closed || !recoveryFuture.isDone() || 
recoveryFuture.isCompletedExceptionally()) {
+            return;
+        }
+        Set<String> snapshot;
+        synchronized (lock) {
+            snapshot = new HashSet<>(openTxns);
+        }
+        if (snapshot.isEmpty()) {
+            return;
+        }
+        List<CompletableFuture<Void>> reads = new ArrayList<>(snapshot.size());
+        for (String txnIdKey : snapshot) {
+            reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
+                TxnState state = opt.map(v -> 
v.value().getState()).orElse(TxnState.ABORTED);
+                if (state.isTerminal()) {
+                    applyTerminal(txnIdKey, state);
+                }
+            }));
+        }
+        FutureUtil.waitForAll(reads).whenComplete((__, err) -> {
+            if (err != null) {
+                log.warn().attr("segment", segmentName).attr("sub", 
subscriptionName)
+                        .exception(err).log("Reconcile encountered error");
+            }
+        });
+    }
+
+    /**
+     * Apply a terminal state for {@code txnIdKey} — call the handle to 
advance / clear its state,
+     * delete the matching op records, drop from {@link #openTxns}. 
Idempotent: if the handle
+     * doesn't know the txn (already applied / was never on this sub), the 
calls are inexpensive.
+     */
+    private void applyTerminal(String txnIdKey, TxnState state) {
+        boolean removed;
+        synchronized (lock) {
+            removed = openTxns.remove(txnIdKey);
+        }
+        if (!removed) {
+            return;
+        }
+        TxnID txnID = TxnIds.fromKey(txnIdKey);
+        PendingAckHandleImpl h = handle;
+        CompletableFuture<Void> handleCall;
+        if (h == null) {
+            handleCall = CompletableFuture.completedFuture(null);
+        } else if (state == TxnState.COMMITTED) {
+            handleCall = h.commitTxn(txnID, Map.of(), 0L);
+        } else {
+            handleCall = h.abortTxn(txnID, null, 0L);
+        }
+        handleCall.whenComplete((__, err) -> {
+            if (err != null) {
+                log.warn().attr("segment", segmentName).attr("sub", 
subscriptionName).attr("txn", txnID)
+                        .exception(err).log("Handle terminal-callback failed; 
will retry on next reconcile");
+                // Re-add so a future event can retry.
+                synchronized (lock) {
+                    openTxns.add(txnIdKey);
+                }
+                return;
+            }
+            // Successful handle call → clean up the ack op records on disk.
+            txnStore.deleteAckOpsForSegmentSubscriptionAndTxn(segmentName, 
subscriptionName, txnIdKey)
+                    .exceptionally(cleanupErr -> {
+                        log.warn().attr("segment", segmentName).attr("sub", 
subscriptionName)
+                                .attr("txn", txnID).exception(cleanupErr)
+                                .log("Op-record cleanup failed; will retry on 
next reconcile");
+                        return null;
+                    });
+        });
+    }
+
+    // ---- Close -------------------------------------------------------------
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        closed = true;
+        closeSubscriptionQuietly();
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private void closeSubscriptionQuietly() {
+        AutoCloseable h = eventSubscription;
+        if (h == null) {
+            return;
+        }
+        eventSubscription = null;
+        try {
+            h.close();
+        } catch (Throwable t) {
+            log.warn().attr("segment", segmentName).attr("sub", 
subscriptionName).exception(t)
+                    .log("Failed to close event subscription");
+        }
+    }
+
+    // ---- Test seams --------------------------------------------------------
+
+    /** @return future that completes when initial recovery has finished. 
Intended for tests. */
+    public CompletableFuture<Void> recoveryFuture() {
+        return recoveryFuture;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
new file mode 100644
index 00000000000..2687c5d981a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+
+/**
+ * Provider that builds a {@link MetadataPendingAckStore} backed by the 
broker's local
+ * {@code MetadataStore}. Intended for {@code segment://} subscriptions. The 
dispatching provider
+ * routes here when the topic is a segment topic.
+ */
+public class MetadataPendingAckStoreProvider implements 
TransactionPendingAckStoreProvider {
+
+    @Override
+    public CompletableFuture<PendingAckStore> 
newPendingAckStore(PersistentSubscription subscription) {
+        TxnMetadataStore txnStore = new TxnMetadataStore(
+                
subscription.getTopic().getBrokerService().getPulsar().getLocalMetadataStore());
+        return CompletableFuture.completedFuture(new 
MetadataPendingAckStore(subscription, txnStore));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription subscription) {
+        // The metadata layout is global — there is no per-subscription 
"initialized" log to
+        // check. State (open txns, leftover op records) is rebuilt on demand 
from the
+        // /txn-op + /txn records at replay time.
+        return CompletableFuture.completedFuture(true);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 5e4dfe736b7..44b6e01aaa4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -189,19 +189,19 @@ public class MetadataTransactionBufferTest {
 
         createOpenHeader(openTxn);
         txnStore.appendOp(TxnIds.toKey(openTxn),
-                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L)).get();
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L, null)).get();
         txnStore.appendOp(TxnIds.toKey(openTxn),
-                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L)).get();
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L, null)).get();
 
         txnStore.createHeader(TxnIds.toKey(committedTxn),
                 new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
                         Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
         txnStore.appendOp(TxnIds.toKey(committedTxn),
-                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L)).get();
+                new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L, null)).get();
 
         createOpenHeader(otherSegTxn);
         txnStore.appendOp(TxnIds.toKey(otherSegTxn),
-                new TxnOp(TxnOpKind.WRITE, 
"segment://public/default/topic/other-seg", null, 5L, 9L)).get();
+                new TxnOp(TxnOpKind.WRITE, 
"segment://public/default/topic/other-seg", null, 5L, 9L, null)).get();
 
         MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
         tb.checkIfTBRecoverCompletely().get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
index 39d2e812431..5a9a9849349 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -98,9 +98,9 @@ public class TxnMetadataStoreTest {
         // path components — segment names contain "://" and "/" which would 
otherwise break ZK.
         String segA = "segment://public/default/topic/0000-7fff-0";
         String segB = "segment://public/default/topic/8000-ffff-0";
-        TxnOp w1 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 1L);
-        TxnOp w2 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 2L);
-        TxnOp wOther = new TxnOp(TxnOpKind.WRITE, segB, null, 2L, 1L);
+        TxnOp w1 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 1L, null);
+        TxnOp w2 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 2L, null);
+        TxnOp wOther = new TxnOp(TxnOpKind.WRITE, segB, null, 2L, 1L, null);
         Stat s1 = txn.appendOp(txnId, w1).get();
         Stat s2 = txn.appendOp(txnId, w2).get();
         txn.appendOp(txnId, wOther).get();
@@ -118,9 +118,9 @@ public class TxnMetadataStoreTest {
 
         String txnId = "tx-a";
         String segA = "segment://public/default/topic/0000-7fff-0";
-        TxnOp a1 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 5L);
-        TxnOp a2 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 6L);
-        TxnOp aOther = new TxnOp(TxnOpKind.ACK, segA, "sub/y", 1L, 7L);
+        TxnOp a1 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 5L, false);
+        TxnOp a2 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 6L, false);
+        TxnOp aOther = new TxnOp(TxnOpKind.ACK, segA, "sub/y", 1L, 7L, false);
         txn.appendOp(txnId, a1).get();
         txn.appendOp(txnId, a2).get();
         txn.appendOp(txnId, aOther).get();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProviderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProviderTest.java
new file mode 100644
index 00000000000..a89e1c42ac3
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProviderTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+/**
+ * Routing tests for {@link DispatchingTransactionPendingAckStoreProvider}: 
{@code segment://}
+ * subscriptions get {@link MetadataPendingAckStore}; everything else falls 
through to the legacy
+ * {@link MLPendingAckStore}.
+ */
+public class DispatchingTransactionPendingAckStoreProviderTest {
+
+    @Test
+    public void routesSegmentSubscriptionToMetadataStore() throws Exception {
+        try (MetadataStoreExtended store = 
MetadataStoreExtended.create("memory:dispatcher-test",
+                MetadataStoreConfig.builder().fsyncEnable(false).build())) {
+            PersistentSubscription sub = 
mockSubscription("segment://public/default/topic/0000-ffff-0", store);
+            PendingAckStore pas = new 
DispatchingTransactionPendingAckStoreProvider()
+                    .newPendingAckStore(sub).get();
+            assertThat(pas).isInstanceOf(MetadataPendingAckStore.class);
+            pas.closeAsync().get();
+        }
+    }
+
+    @Test
+    public void initializedBeforeIsRoutedSegmentToMetadata() throws Exception {
+        try (MetadataStoreExtended store = 
MetadataStoreExtended.create("memory:dispatcher-init",
+                MetadataStoreConfig.builder().fsyncEnable(false).build())) {
+            PersistentSubscription sub = 
mockSubscription("segment://public/default/topic/0000-ffff-0", store);
+            // MetadataPendingAckStoreProvider returns true unconditionally — 
verify routing reaches it.
+            assertThat(new DispatchingTransactionPendingAckStoreProvider()
+                    .checkInitializedBefore(sub).get()).isTrue();
+        }
+    }
+
+    private static PersistentSubscription mockSubscription(String topicName, 
MetadataStoreExtended store) {
+        PersistentSubscription sub = mock(PersistentSubscription.class, 
RETURNS_DEEP_STUBS);
+        when(sub.getTopicName()).thenReturn(topicName);
+        when(sub.getName()).thenReturn("my-sub");
+        
when(sub.getTopic().getBrokerService().getPulsar().getLocalMetadataStore()).thenReturn(store);
+        return sub;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
new file mode 100644
index 00000000000..bb86ede033b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link MetadataPendingAckStore} against the in-memory {@code 
MetadataStore}.
+ * The {@link PersistentSubscription} and {@link PendingAckHandleImpl} 
collaborators are mocked so
+ * the test focuses on the store's state machine, append behaviour, and 
event-driven reconcile.
+ */
+public class MetadataPendingAckStoreTest {
+
+    private static final String SEGMENT = 
"segment://public/default/topic/0000-ffff-0";
+    private static final String SUB = "my-sub";
+
+    private MetadataStore store;
+    private TxnMetadataStore txnStore;
+    private PersistentSubscription subscription;
+    private PendingAckHandleImpl handle;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = MetadataStoreFactory.create("memory:local",
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+        txnStore = new TxnMetadataStore(store);
+        subscription = mock(PersistentSubscription.class);
+        when(subscription.getTopicName()).thenReturn(SEGMENT);
+        when(subscription.getName()).thenReturn(SUB);
+        handle = mock(PendingAckHandleImpl.class);
+        when(handle.commitTxn(any(), any(), 
org.mockito.ArgumentMatchers.anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+        when(handle.abortTxn(any(), any(), 
org.mockito.ArgumentMatchers.anyLong()))
+                .thenReturn(CompletableFuture.completedFuture(null));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void appendIndividualAck_writesTxnOpsAndTracksOpenTxn() throws 
Exception {
+        MetadataPendingAckStore ackStore = new 
MetadataPendingAckStore(subscription, txnStore);
+        ackStore.replayAsync(handle, null);
+        ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        List<MutablePair<Position, Integer>> positions = new ArrayList<>();
+        positions.add(MutablePair.of(PositionFactory.create(1, 1), 1));
+        positions.add(MutablePair.of(PositionFactory.create(1, 2), 1));
+        ackStore.appendIndividualAck(txnId, positions).get();
+
+        // Two ack op records on disk for this (segment, sub, txn).
+        List<TxnOp> hits = new ArrayList<>();
+        txnStore.listAcksBySegmentSubscription(SEGMENT, SUB,
+                collectOps(hits)).get();
+        assertThat(hits).hasSize(2);
+        assertThat(hits).allMatch(o -> o.getKind() == TxnOpKind.ACK
+                && SEGMENT.equals(o.getSegment())
+                && SUB.equals(o.getSubscription())
+                && !Boolean.TRUE.equals(o.getCumulative()));
+    }
+
+    @Test
+    public void appendCumulativeAck_writesSingleRecordWithCumulativeTrue() 
throws Exception {
+        MetadataPendingAckStore ackStore = new 
MetadataPendingAckStore(subscription, txnStore);
+        ackStore.replayAsync(handle, null);
+        ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        ackStore.appendCumulativeAck(txnId, PositionFactory.create(5, 
9)).get();
+
+        List<TxnOp> hits = new ArrayList<>();
+        txnStore.listAcksBySegmentSubscription(SEGMENT, SUB, 
collectOps(hits)).get();
+        assertThat(hits).hasSize(1);
+        assertThat(hits.get(0).getCumulative()).isTrue();
+    }
+
+    @Test
+    public void appendCommitMarkAndAbortMark_areNoOps() throws Exception {
+        MetadataPendingAckStore ackStore = new 
MetadataPendingAckStore(subscription, txnStore);
+        ackStore.replayAsync(handle, null);
+        ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+        // The TC drives commits/aborts via subscription events; SPI-level 
marks are intentional
+        // no-ops on the metadata-driven store.
+        ackStore.appendCommitMark(new TxnID(1, 1), null).get();
+        ackStore.appendAbortMark(new TxnID(1, 1), null).get();
+    }
+
+    @Test
+    public void commitEvent_callsHandleCommitTxn_andDeletesOpRecords() throws 
Exception {
+        MetadataPendingAckStore ackStore = new 
MetadataPendingAckStore(subscription, txnStore);
+        ackStore.replayAsync(handle, null);
+        ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        ackStore.appendIndividualAck(txnId, Collections.singletonList(
+                MutablePair.of(PositionFactory.create(3, 5), 1))).get();
+
+        // TC flips header to COMMITTED + publishes the subscription event.
+        commitHeader(txnId);
+        txnStore.publishSubscriptionEvent(SEGMENT, SUB,
+                new org.apache.pulsar.broker.transaction.metadata.TxnEvent(
+                        TxnIds.toKey(txnId), TxnState.COMMITTED)).get();
+
+        Awaitility.await().untilAsserted(() ->
+                verify(handle).commitTxn(eq(txnId), eq(Map.of()), eq(0L)));
+
+        // Op records cleaned up.
+        Awaitility.await().untilAsserted(() -> {
+            List<TxnOp> remaining = new ArrayList<>();
+            txnStore.listAcksBySegmentSubscription(SEGMENT, SUB, 
collectOps(remaining)).get();
+            assertThat(remaining).isEmpty();
+        });
+    }
+
+    @Test
+    public void abortEvent_callsHandleAbortTxn_andDeletesOpRecords() throws 
Exception {
+        MetadataPendingAckStore ackStore = new 
MetadataPendingAckStore(subscription, txnStore);
+        ackStore.replayAsync(handle, null);
+        ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+        TxnID txnId = new TxnID(1, 1);
+        createOpenHeader(txnId);
+        ackStore.appendIndividualAck(txnId, Collections.singletonList(
+                MutablePair.of(PositionFactory.create(3, 5), 1))).get();
+
+        abortHeader(txnId);
+        txnStore.publishSubscriptionEvent(SEGMENT, SUB,
+                new org.apache.pulsar.broker.transaction.metadata.TxnEvent(
+                        TxnIds.toKey(txnId), TxnState.ABORTED)).get();
+
+        Awaitility.await().untilAsserted(() ->
+                verify(handle).abortTxn(eq(txnId), eq(null), eq(0L)));
+
+        Awaitility.await().untilAsserted(() -> {
+            List<TxnOp> remaining = new ArrayList<>();
+            txnStore.listAcksBySegmentSubscription(SEGMENT, SUB, 
collectOps(remaining)).get();
+            assertThat(remaining).isEmpty();
+        });
+    }
+
+    @Test
+    public void recovery_rebuildsOpenTxnStateFromAckRecords() throws Exception 
{
+        // Pre-populate: an OPEN txn with two acks on this (seg, sub), and a 
terminal txn with a
+        // lingering ack that the store should drive to cleanup during 
recovery.
+        TxnID openTxn = new TxnID(1, 1);
+        TxnID committedTxn = new TxnID(1, 2);
+
+        createOpenHeader(openTxn);
+        txnStore.appendOp(TxnIds.toKey(openTxn),
+                new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 1L, false)).get();
+        txnStore.appendOp(TxnIds.toKey(openTxn),
+                new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 2L, false)).get();
+
+        txnStore.createHeader(TxnIds.toKey(committedTxn),
+                new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
+                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
+        txnStore.appendOp(TxnIds.toKey(committedTxn),
+                new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 3L, false)).get();
+
+        MetadataPendingAckStore ackStore = new 
MetadataPendingAckStore(subscription, txnStore);
+        ackStore.replayAsync(handle, null);
+        ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+        // Terminal txn was discovered during recovery → commitTxn fired and 
its op record cleaned up.
+        Awaitility.await().untilAsserted(() ->
+                verify(handle).commitTxn(eq(committedTxn), eq(Map.of()), 
eq(0L)));
+        Awaitility.await().untilAsserted(() -> {
+            List<TxnOp> remaining = new ArrayList<>();
+            txnStore.listAcksBySegmentSubscription(SEGMENT, SUB, 
collectOps(remaining)).get();
+            // Only the open txn's two ack records remain.
+            assertThat(remaining).hasSize(2);
+            assertThat(remaining).allMatch(o ->
+                    SEGMENT.equals(o.getSegment()) && 
SUB.equals(o.getSubscription()));
+        });
+    }
+
+    // ---- helpers ----------------------------------------------------------
+
+    private static org.apache.pulsar.metadata.api.ScanConsumer 
collectOps(List<TxnOp> out) {
+        return new org.apache.pulsar.metadata.api.ScanConsumer() {
+            @Override
+            public void onNext(org.apache.pulsar.metadata.api.GetResult r) {
+                out.add(TxnMetadataStore.fromJson(r.getValue(), TxnOp.class));
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        };
+    }
+
+    private void createOpenHeader(TxnID txnId) throws Exception {
+        txnStore.createHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
+                        Instant.ofEpochMilli(1000), null)).get();
+    }
+
+    private void commitHeader(TxnID txnId) throws Exception {
+        Stat created;
+        var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+        var h = v.value();
+        created = txnStore.updateHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.COMMITTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                v.version()).get();
+        assertThat(created.getVersion()).isPositive();
+    }
+
+    private void abortHeader(TxnID txnId) throws Exception {
+        var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+        var h = v.value();
+        txnStore.updateHeader(TxnIds.toKey(txnId),
+                new TxnHeader(TxnState.ABORTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                v.version()).get();
+    }
+}

Reply via email to