This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7fc00c02adb [improve][broker] PIP-473: MetadataPendingAckStore for
segment subscriptions (#25772)
7fc00c02adb is described below
commit 7fc00c02adb973e8bc3f7e7df65eccd1e6517b7f
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 15 10:07:06 2026 -0700
[improve][broker] PIP-473: MetadataPendingAckStore for segment
subscriptions (#25772)
---
.../buffer/impl/MetadataTransactionBuffer.java | 7 +-
.../transaction/metadata/TxnMetadataStore.java | 32 +-
.../pulsar/broker/transaction/metadata/TxnOp.java | 10 +-
...patchingTransactionPendingAckStoreProvider.java | 54 ++++
.../pendingack/impl/MetadataPendingAckStore.java | 325 +++++++++++++++++++++
.../impl/MetadataPendingAckStoreProvider.java | 48 +++
.../buffer/impl/MetadataTransactionBufferTest.java | 8 +-
.../transaction/metadata/TxnMetadataStoreTest.java | 12 +-
...hingTransactionPendingAckStoreProviderTest.java | 68 +++++
.../impl/MetadataPendingAckStoreTest.java | 276 +++++++++++++++++
10 files changed, 825 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 42f38a78428..b6b8d0328ab 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -152,7 +152,10 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
@Override
public void onError(Throwable throwable) {
- // Future propagation handled by listWritesBySegment's
returned future.
+ // Recovery still fails loudly via the scan's returned future
and the terminal
+ // whenComplete below; logging here captures the cause with
segment context.
+ log.warn().attr("segment", segmentName).exception(throwable)
+ .log("TB recovery scan errored");
}
@Override
@@ -261,7 +264,7 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
private CompletableFuture<Position> recordOp(TxnID txnId, String txnIdKey,
Position position) {
TxnOp op = new TxnOp(TxnOpKind.WRITE, segmentName, null,
- position.getLedgerId(), position.getEntryId());
+ position.getLedgerId(), position.getEntryId(), null);
return txnStore.appendOp(txnIdKey, op).thenApply(stat -> {
synchronized (lock) {
TxnEntry entry = txns.get(txnIdKey);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 8e20ed305f8..bdc0bda5fd6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
+import lombok.CustomLog;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -49,6 +50,7 @@ import org.apache.pulsar.metadata.api.Stat;
* <p>The façade is stateless apart from holding the store reference — index
population happens via
* options on writes, so there is no explicit registration step.
*/
+@CustomLog
public class TxnMetadataStore {
/** Sequence-keys delta used by all append-only streams in this layout. */
@@ -162,8 +164,29 @@ public class TxnMetadataStore {
* {@link MetadataStore#deleteIfExists}.
*/
public CompletableFuture<Void> deleteWriteOpsForSegmentAndTxn(String
segment, String txnId) {
+ return scanAndDeleteOpsForTxn(txnId, collector ->
listWritesBySegment(segment, collector));
+ }
+
+ /**
+ * Delete every {@code /txn-op} ack record for {@code (segment,
subscription, txnId)} — used by
+ * the PendingAckStore once an event tells it the txn is terminal. Same
path-extraction +
+ * best-effort semantics as {@link #deleteWriteOpsForSegmentAndTxn}.
+ */
+ public CompletableFuture<Void>
deleteAckOpsForSegmentSubscriptionAndTxn(String segment, String subscription,
+
String txnId) {
+ return scanAndDeleteOpsForTxn(txnId,
+ collector -> listAcksBySegmentSubscription(segment,
subscription, collector));
+ }
+
+ /**
+ * Shared implementation: invoke the supplied scan with a collector that
captures only paths
+ * matching {@code txnId}, then delete each captured path with the
txn-scoped partition key.
+ */
+ private CompletableFuture<Void> scanAndDeleteOpsForTxn(
+ String txnId,
+ java.util.function.Function<ScanConsumer, CompletableFuture<Void>>
scan) {
java.util.List<String> paths = new java.util.ArrayList<>();
- return listWritesBySegment(segment, new ScanConsumer() {
+ ScanConsumer collector = new ScanConsumer() {
@Override
public void onNext(org.apache.pulsar.metadata.api.GetResult r) {
if
(txnId.equals(TxnPaths.txnIdFromOpPath(r.getStat().getPath()))) {
@@ -173,12 +196,17 @@ public class TxnMetadataStore {
@Override
public void onError(Throwable throwable) {
+ // The caller observes failure via the scan's returned future;
logging here so
+ // the cause is visible alongside the txnId context.
+ log.warn().attr("txnId", txnId).exception(throwable)
+ .log("Op-record cleanup scan errored");
}
@Override
public void onCompleted() {
}
- }).thenCompose(__ -> {
+ };
+ return scan.apply(collector).thenCompose(__ -> {
Set<Option> opts = Set.of(new Option.PartitionKey(txnId));
CompletableFuture<?>[] deletes = new
CompletableFuture<?>[paths.size()];
for (int i = 0; i < paths.size(); i++) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
index 04e4f25807f..8941a098e82 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
@@ -27,7 +27,8 @@ import lombok.NoArgsConstructor;
* {@code /txn-op/<txnId>-<seq>} (with {@code partitionKey =
txnId}) every time a
* participant applies a transactional operation on a segment.
*
- * <p>{@link #kind} discriminates writes from acks. {@link #subscription} is
set only for acks.
+ * <p>{@link #kind} discriminates writes from acks. {@link #subscription} and
{@link #cumulative}
+ * are only set on {@link TxnOpKind#ACK} entries.
*/
@Data
@NoArgsConstructor
@@ -48,4 +49,11 @@ public class TxnOp {
/** Managed-ledger entry id of the entry this op refers to. */
private long entryId;
+
+ /**
+ * For {@link TxnOpKind#ACK} entries only: {@code true} if this is a
cumulative ack
+ * (markDelete up-to-and-including the position), {@code false} (or
omitted) for individual
+ * acks. Always {@code null} for {@link TxnOpKind#WRITE} entries.
+ */
+ private Boolean cumulative;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProvider.java
new file mode 100644
index 00000000000..821fd997f39
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProvider.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Routing {@link TransactionPendingAckStoreProvider}: returns {@link
MetadataPendingAckStore} for
+ * subscriptions on {@code segment://} topics (PIP-473) and falls back to
{@link MLPendingAckStore}
+ * via {@link MLPendingAckStoreProvider} for {@code persistent://} / {@code
topic://}.
+ *
+ * <p>Available but not the configured default — operators (and P5.4) flip
+ * {@code transactionPendingAckStoreProviderClassName} to opt segment topics
into the
+ * metadata-driven implementation once the v5 TC is in place.
+ */
+public class DispatchingTransactionPendingAckStoreProvider implements
TransactionPendingAckStoreProvider {
+
+ private final TransactionPendingAckStoreProvider legacy = new
MLPendingAckStoreProvider();
+ private final TransactionPendingAckStoreProvider metadata = new
MetadataPendingAckStoreProvider();
+
+ @Override
+ public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {
+ return delegateFor(subscription).newPendingAckStore(subscription);
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkInitializedBefore(PersistentSubscription subscription) {
+ return delegateFor(subscription).checkInitializedBefore(subscription);
+ }
+
+ private TransactionPendingAckStoreProvider
delegateFor(PersistentSubscription subscription) {
+ return TopicName.get(subscription.getTopicName()).isSegment() ?
metadata : legacy;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
new file mode 100644
index 00000000000..546209c8276
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import lombok.CustomLog;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.CommandAck.AckType;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+
+/**
+ * {@link PendingAckStore} for {@code segment://} subscriptions that reads
truth from the
+ * metadata-store transaction layout (PIP-473) rather than from a
per-subscription log.
+ *
+ * <p>Lifecycle:
+ * <ul>
+ * <li><b>Ack</b> — {@link #appendIndividualAck} / {@link
#appendCumulativeAck} write a
+ * {@link TxnOp} record under {@code /txn-op/<txnId>-<seq>} with
+ * {@code kind=ACK, segment, subscription, ledgerId, entryId,
cumulative}. The associated
+ * {@code PendingAckHandle} keeps the in-memory state via the legacy
interface.</li>
+ * <li><b>Commit / Abort marks</b> — {@link #appendCommitMark} / {@link
#appendAbortMark} are
+ * <b>no-ops</b>. In v5 the TC owns the lifecycle: it CAS-updates the
header and publishes
+ * subscription events; this store consumes those events.</li>
+ * <li><b>State transitions</b> — driven by {@code
/txn-subscription-events/<seg>:<sub>-*}
+ * sequence events. The events are wake-ups; the truth is the header. On
each notification
+ * we re-read headers for every currently-open txn this subscription is
involved in and
+ * call {@code PendingAckHandleImpl.commitTxn} / {@code abortTxn} for
those that have gone
+ * terminal — then delete the corresponding {@code /txn-op} ack
records.</li>
+ * <li><b>Recovery</b> (Option C) — on {@link #replayAsync}, subscribe to
the event stream,
+ * scan {@code idx:acks-by-segment-subscription} for this {@code
(segment, sub)}, group by
+ * {@code txnId}, fetch each header, and seed the in-memory open-txn
set; then mark the
+ * handle ready and drain any events that fired during recovery.</li>
+ * </ul>
+ *
+ * <p>Same TOCTOU / unbounded-cache caveats documented on {@code
MetadataTransactionBuffer} apply
+ * here; see that class's javadoc for the TC ordering contract.
+ */
+@CustomLog
+public class MetadataPendingAckStore implements PendingAckStore {
+
+ private final PersistentSubscription subscription;
+ private final TxnMetadataStore txnStore;
+ private final String segmentName;
+ private final String subscriptionName;
+
+ private final Object lock = new Object();
+ /** Set of txnIdKeys we believe are OPEN — populated on append + recovery,
drained by reconcile. */
+ private final Set<String> openTxns = new HashSet<>();
+
+ private final CompletableFuture<Void> recoveryFuture = new
CompletableFuture<>();
+ private volatile AutoCloseable eventSubscription;
+ private volatile PendingAckHandleImpl handle;
+ private volatile boolean closed;
+
+ public MetadataPendingAckStore(PersistentSubscription subscription,
TxnMetadataStore txnStore) {
+ this.subscription = subscription;
+ this.txnStore = txnStore;
+ this.segmentName = subscription.getTopicName();
+ this.subscriptionName = subscription.getName();
+ }
+
+ // ---- Replay / recovery -------------------------------------------------
+
+ @Override
+ public void replayAsync(PendingAckHandleImpl pendingAckHandle,
ExecutorService executorService) {
+ this.handle = pendingAckHandle;
+ try {
+ this.eventSubscription = txnStore.subscribeSubscriptionEvents(
+ segmentName, subscriptionName, path -> triggerReconcile());
+ } catch (MetadataStoreException e) {
+ recoveryFuture.completeExceptionally(e);
+ pendingAckHandle.exceptionHandleFuture(e);
+ return;
+ }
+
+ // Scan all ack op records for this (segment, sub) and seed openTxns
from the headers.
+ // The store delivers per-entry errors via this scan's returned future
as well — recovery
+ // still fails loudly via the terminal whenComplete below — but we log
here so the cause
+ // is captured with the segment/subscription context.
+ Set<String> txnIdKeys = ConcurrentHashMap.newKeySet();
+ txnStore.listAcksBySegmentSubscription(segmentName, subscriptionName,
new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ TxnOp op = TxnMetadataStore.fromJson(r.getValue(),
TxnOp.class);
+ if (op.getKind() != TxnOpKind.ACK) {
+ return;
+ }
+ String txnIdKey =
org.apache.pulsar.broker.transaction.metadata.TxnPaths
+ .txnIdFromOpPath(r.getStat().getPath());
+ if (txnIdKey != null) {
+ txnIdKeys.add(txnIdKey);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("segment", segmentName).attr("sub",
subscriptionName)
+ .exception(throwable).log("PendingAckStore recovery
scan errored");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ })
+ .thenCompose(__ -> {
+ List<CompletableFuture<Void>> reads = new
ArrayList<>(txnIdKeys.size());
+ for (String txnIdKey : txnIdKeys) {
+ reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
+ TxnState state = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
+ synchronized (lock) {
+ openTxns.add(txnIdKey);
+ }
+ if (state.isTerminal()) {
+ // Schedule cleanup — applyTerminal's "remove →
applied" gate fires once
+ // and processes (handle commit/abort + delete op
records).
+ applyTerminal(txnIdKey, state);
+ }
+ }));
+ }
+ return FutureUtil.waitForAll(reads);
+ })
+ .whenComplete((__, err) -> {
+ if (err != null) {
+ log.error().attr("segment", segmentName).attr("sub",
subscriptionName)
+ .exception(err).log("PendingAckStore recovery failed");
+ closeSubscriptionQuietly();
+ recoveryFuture.completeExceptionally(err);
+ pendingAckHandle.exceptionHandleFuture(err);
+ return;
+ }
+ recoveryFuture.complete(null);
+ pendingAckHandle.changeToReadyState();
+ // Drain any events that fired during recovery.
+ triggerReconcile();
+ });
+ }
+
+ // ---- Ack append --------------------------------------------------------
+
+ @Override
+ public CompletableFuture<Void> appendIndividualAck(TxnID txnID,
+
List<MutablePair<Position, Integer>> positions) {
+ return appendAcks(txnID, positions, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendCumulativeAck(TxnID txnID, Position
position) {
+ return appendAcks(txnID,
Collections.singletonList(MutablePair.of(position, 1)), true);
+ }
+
+ private CompletableFuture<Void> appendAcks(TxnID txnID,
List<MutablePair<Position, Integer>> positions,
+ boolean cumulative) {
+ if (closed) {
+ return FutureUtil.failedFuture(new
IllegalStateException("PendingAckStore is closed"));
+ }
+ String txnIdKey = TxnIds.toKey(txnID);
+ // Append one TxnOp per position. PendingAckHandleImpl already
validates the txn isn't
+ // terminal before calling us, so we don't repeat the header check
here.
+ CompletableFuture<?>[] appends = new
CompletableFuture<?>[positions.size()];
+ for (int i = 0; i < positions.size(); i++) {
+ Position p = positions.get(i).getLeft();
+ TxnOp op = new TxnOp(TxnOpKind.ACK, segmentName, subscriptionName,
+ p.getLedgerId(), p.getEntryId(), cumulative ? Boolean.TRUE
: null);
+ appends[i] = txnStore.appendOp(txnIdKey, op);
+ }
+ return CompletableFuture.allOf(appends).thenRun(() -> {
+ synchronized (lock) {
+ openTxns.add(txnIdKey);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> appendCommitMark(TxnID txnID, AckType
ackType) {
+ // No-op for the metadata-driven store: v5 commits are driven by the
TC writing the
+ // /txn header CAS and publishing the subscription event, not by
direct SPI calls.
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType
ackType) {
+ // No-op (see appendCommitMark).
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // ---- Reconcile (event-driven) -----------------------------------------
+
+ private void triggerReconcile() {
+ if (closed || !recoveryFuture.isDone() ||
recoveryFuture.isCompletedExceptionally()) {
+ return;
+ }
+ Set<String> snapshot;
+ synchronized (lock) {
+ snapshot = new HashSet<>(openTxns);
+ }
+ if (snapshot.isEmpty()) {
+ return;
+ }
+ List<CompletableFuture<Void>> reads = new ArrayList<>(snapshot.size());
+ for (String txnIdKey : snapshot) {
+ reads.add(txnStore.getHeader(txnIdKey).thenAccept(opt -> {
+ TxnState state = opt.map(v ->
v.value().getState()).orElse(TxnState.ABORTED);
+ if (state.isTerminal()) {
+ applyTerminal(txnIdKey, state);
+ }
+ }));
+ }
+ FutureUtil.waitForAll(reads).whenComplete((__, err) -> {
+ if (err != null) {
+ log.warn().attr("segment", segmentName).attr("sub",
subscriptionName)
+ .exception(err).log("Reconcile encountered error");
+ }
+ });
+ }
+
+ /**
+ * Apply a terminal state for {@code txnIdKey} — call the handle to
advance / clear its state,
+ * delete the matching op records, drop from {@link #openTxns}.
Idempotent: if the handle
+ * doesn't know the txn (already applied / was never on this sub), the
calls are inexpensive.
+ */
+ private void applyTerminal(String txnIdKey, TxnState state) {
+ boolean removed;
+ synchronized (lock) {
+ removed = openTxns.remove(txnIdKey);
+ }
+ if (!removed) {
+ return;
+ }
+ TxnID txnID = TxnIds.fromKey(txnIdKey);
+ PendingAckHandleImpl h = handle;
+ CompletableFuture<Void> handleCall;
+ if (h == null) {
+ handleCall = CompletableFuture.completedFuture(null);
+ } else if (state == TxnState.COMMITTED) {
+ handleCall = h.commitTxn(txnID, Map.of(), 0L);
+ } else {
+ handleCall = h.abortTxn(txnID, null, 0L);
+ }
+ handleCall.whenComplete((__, err) -> {
+ if (err != null) {
+ log.warn().attr("segment", segmentName).attr("sub",
subscriptionName).attr("txn", txnID)
+ .exception(err).log("Handle terminal-callback failed;
will retry on next reconcile");
+ // Re-add so a future event can retry.
+ synchronized (lock) {
+ openTxns.add(txnIdKey);
+ }
+ return;
+ }
+ // Successful handle call → clean up the ack op records on disk.
+ txnStore.deleteAckOpsForSegmentSubscriptionAndTxn(segmentName,
subscriptionName, txnIdKey)
+ .exceptionally(cleanupErr -> {
+ log.warn().attr("segment", segmentName).attr("sub",
subscriptionName)
+ .attr("txn", txnID).exception(cleanupErr)
+ .log("Op-record cleanup failed; will retry on
next reconcile");
+ return null;
+ });
+ });
+ }
+
+ // ---- Close -------------------------------------------------------------
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ closed = true;
+ closeSubscriptionQuietly();
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void closeSubscriptionQuietly() {
+ AutoCloseable h = eventSubscription;
+ if (h == null) {
+ return;
+ }
+ eventSubscription = null;
+ try {
+ h.close();
+ } catch (Throwable t) {
+ log.warn().attr("segment", segmentName).attr("sub",
subscriptionName).exception(t)
+ .log("Failed to close event subscription");
+ }
+ }
+
+ // ---- Test seams --------------------------------------------------------
+
+ /** @return future that completes when initial recovery has finished.
Intended for tests. */
+ public CompletableFuture<Void> recoveryFuture() {
+ return recoveryFuture;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
new file mode 100644
index 00000000000..2687c5d981a
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+
+/**
+ * Provider that builds a {@link MetadataPendingAckStore} backed by the
broker's local
+ * {@code MetadataStore}. Intended for {@code segment://} subscriptions. The
dispatching provider
+ * routes here when the topic is a segment topic.
+ */
+public class MetadataPendingAckStoreProvider implements
TransactionPendingAckStoreProvider {
+
+ @Override
+ public CompletableFuture<PendingAckStore>
newPendingAckStore(PersistentSubscription subscription) {
+ TxnMetadataStore txnStore = new TxnMetadataStore(
+
subscription.getTopic().getBrokerService().getPulsar().getLocalMetadataStore());
+ return CompletableFuture.completedFuture(new
MetadataPendingAckStore(subscription, txnStore));
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
checkInitializedBefore(PersistentSubscription subscription) {
+ // The metadata layout is global — there is no per-subscription
"initialized" log to
+ // check. State (open txns, leftover op records) is rebuilt on demand
from the
+ // /txn-op + /txn records at replay time.
+ return CompletableFuture.completedFuture(true);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 5e4dfe736b7..44b6e01aaa4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -189,19 +189,19 @@ public class MetadataTransactionBufferTest {
createOpenHeader(openTxn);
txnStore.appendOp(TxnIds.toKey(openTxn),
- new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L)).get();
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 1L, null)).get();
txnStore.appendOp(TxnIds.toKey(openTxn),
- new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L)).get();
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 2L, null)).get();
txnStore.createHeader(TxnIds.toKey(committedTxn),
new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
txnStore.appendOp(TxnIds.toKey(committedTxn),
- new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L)).get();
+ new TxnOp(TxnOpKind.WRITE, SEGMENT, null, 5L, 3L, null)).get();
createOpenHeader(otherSegTxn);
txnStore.appendOp(TxnIds.toKey(otherSegTxn),
- new TxnOp(TxnOpKind.WRITE,
"segment://public/default/topic/other-seg", null, 5L, 9L)).get();
+ new TxnOp(TxnOpKind.WRITE,
"segment://public/default/topic/other-seg", null, 5L, 9L, null)).get();
MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
tb.checkIfTBRecoverCompletely().get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
index 39d2e812431..5a9a9849349 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -98,9 +98,9 @@ public class TxnMetadataStoreTest {
// path components — segment names contain "://" and "/" which would
otherwise break ZK.
String segA = "segment://public/default/topic/0000-7fff-0";
String segB = "segment://public/default/topic/8000-ffff-0";
- TxnOp w1 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 1L);
- TxnOp w2 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 2L);
- TxnOp wOther = new TxnOp(TxnOpKind.WRITE, segB, null, 2L, 1L);
+ TxnOp w1 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 1L, null);
+ TxnOp w2 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 2L, null);
+ TxnOp wOther = new TxnOp(TxnOpKind.WRITE, segB, null, 2L, 1L, null);
Stat s1 = txn.appendOp(txnId, w1).get();
Stat s2 = txn.appendOp(txnId, w2).get();
txn.appendOp(txnId, wOther).get();
@@ -118,9 +118,9 @@ public class TxnMetadataStoreTest {
String txnId = "tx-a";
String segA = "segment://public/default/topic/0000-7fff-0";
- TxnOp a1 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 5L);
- TxnOp a2 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 6L);
- TxnOp aOther = new TxnOp(TxnOpKind.ACK, segA, "sub/y", 1L, 7L);
+ TxnOp a1 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 5L, false);
+ TxnOp a2 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 6L, false);
+ TxnOp aOther = new TxnOp(TxnOpKind.ACK, segA, "sub/y", 1L, 7L, false);
txn.appendOp(txnId, a1).get();
txn.appendOp(txnId, a2).get();
txn.appendOp(txnId, aOther).get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProviderTest.java
new file mode 100644
index 00000000000..a89e1c42ac3
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/DispatchingTransactionPendingAckStoreProviderTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+/**
+ * Routing tests for {@link DispatchingTransactionPendingAckStoreProvider}:
{@code segment://}
+ * subscriptions get {@link MetadataPendingAckStore}; everything else falls
through to the legacy
+ * {@link MLPendingAckStore}.
+ */
+public class DispatchingTransactionPendingAckStoreProviderTest {
+
+ @Test
+ public void routesSegmentSubscriptionToMetadataStore() throws Exception {
+ try (MetadataStoreExtended store =
MetadataStoreExtended.create("memory:dispatcher-test",
+ MetadataStoreConfig.builder().fsyncEnable(false).build())) {
+ PersistentSubscription sub =
mockSubscription("segment://public/default/topic/0000-ffff-0", store);
+ PendingAckStore pas = new
DispatchingTransactionPendingAckStoreProvider()
+ .newPendingAckStore(sub).get();
+ assertThat(pas).isInstanceOf(MetadataPendingAckStore.class);
+ pas.closeAsync().get();
+ }
+ }
+
+ @Test
+ public void initializedBeforeIsRoutedSegmentToMetadata() throws Exception {
+ try (MetadataStoreExtended store =
MetadataStoreExtended.create("memory:dispatcher-init",
+ MetadataStoreConfig.builder().fsyncEnable(false).build())) {
+ PersistentSubscription sub =
mockSubscription("segment://public/default/topic/0000-ffff-0", store);
+ // MetadataPendingAckStoreProvider returns true unconditionally —
verify routing reaches it.
+ assertThat(new DispatchingTransactionPendingAckStoreProvider()
+ .checkInitializedBefore(sub).get()).isTrue();
+ }
+ }
+
+ private static PersistentSubscription mockSubscription(String topicName,
MetadataStoreExtended store) {
+ PersistentSubscription sub = mock(PersistentSubscription.class,
RETURNS_DEEP_STUBS);
+ when(sub.getTopicName()).thenReturn(topicName);
+ when(sub.getName()).thenReturn("my-sub");
+
when(sub.getTopic().getBrokerService().getPulsar().getLocalMetadataStore()).thenReturn(store);
+ return sub;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
new file mode 100644
index 00000000000..bb86ede033b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.pendingack.impl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link MetadataPendingAckStore} against the in-memory {@code
MetadataStore}.
+ * The {@link PersistentSubscription} and {@link PendingAckHandleImpl}
collaborators are mocked so
+ * the test focuses on the store's state machine, append behaviour, and
event-driven reconcile.
+ */
+public class MetadataPendingAckStoreTest {
+
+ private static final String SEGMENT =
"segment://public/default/topic/0000-ffff-0";
+ private static final String SUB = "my-sub";
+
+ private MetadataStore store;
+ private TxnMetadataStore txnStore;
+ private PersistentSubscription subscription;
+ private PendingAckHandleImpl handle;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = MetadataStoreFactory.create("memory:local",
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ txnStore = new TxnMetadataStore(store);
+ subscription = mock(PersistentSubscription.class);
+ when(subscription.getTopicName()).thenReturn(SEGMENT);
+ when(subscription.getName()).thenReturn(SUB);
+ handle = mock(PendingAckHandleImpl.class);
+ when(handle.commitTxn(any(), any(),
org.mockito.ArgumentMatchers.anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ when(handle.abortTxn(any(), any(),
org.mockito.ArgumentMatchers.anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ @Test
+ public void appendIndividualAck_writesTxnOpsAndTracksOpenTxn() throws
Exception {
+ MetadataPendingAckStore ackStore = new
MetadataPendingAckStore(subscription, txnStore);
+ ackStore.replayAsync(handle, null);
+ ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ List<MutablePair<Position, Integer>> positions = new ArrayList<>();
+ positions.add(MutablePair.of(PositionFactory.create(1, 1), 1));
+ positions.add(MutablePair.of(PositionFactory.create(1, 2), 1));
+ ackStore.appendIndividualAck(txnId, positions).get();
+
+ // Two ack op records on disk for this (segment, sub, txn).
+ List<TxnOp> hits = new ArrayList<>();
+ txnStore.listAcksBySegmentSubscription(SEGMENT, SUB,
+ collectOps(hits)).get();
+ assertThat(hits).hasSize(2);
+ assertThat(hits).allMatch(o -> o.getKind() == TxnOpKind.ACK
+ && SEGMENT.equals(o.getSegment())
+ && SUB.equals(o.getSubscription())
+ && !Boolean.TRUE.equals(o.getCumulative()));
+ }
+
+ @Test
+ public void appendCumulativeAck_writesSingleRecordWithCumulativeTrue()
throws Exception {
+ MetadataPendingAckStore ackStore = new
MetadataPendingAckStore(subscription, txnStore);
+ ackStore.replayAsync(handle, null);
+ ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ ackStore.appendCumulativeAck(txnId, PositionFactory.create(5,
9)).get();
+
+ List<TxnOp> hits = new ArrayList<>();
+ txnStore.listAcksBySegmentSubscription(SEGMENT, SUB,
collectOps(hits)).get();
+ assertThat(hits).hasSize(1);
+ assertThat(hits.get(0).getCumulative()).isTrue();
+ }
+
+ @Test
+ public void appendCommitMarkAndAbortMark_areNoOps() throws Exception {
+ MetadataPendingAckStore ackStore = new
MetadataPendingAckStore(subscription, txnStore);
+ ackStore.replayAsync(handle, null);
+ ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+ // The TC drives commits/aborts via subscription events; SPI-level
marks are intentional
+ // no-ops on the metadata-driven store.
+ ackStore.appendCommitMark(new TxnID(1, 1), null).get();
+ ackStore.appendAbortMark(new TxnID(1, 1), null).get();
+ }
+
+ @Test
+ public void commitEvent_callsHandleCommitTxn_andDeletesOpRecords() throws
Exception {
+ MetadataPendingAckStore ackStore = new
MetadataPendingAckStore(subscription, txnStore);
+ ackStore.replayAsync(handle, null);
+ ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ ackStore.appendIndividualAck(txnId, Collections.singletonList(
+ MutablePair.of(PositionFactory.create(3, 5), 1))).get();
+
+ // TC flips header to COMMITTED + publishes the subscription event.
+ commitHeader(txnId);
+ txnStore.publishSubscriptionEvent(SEGMENT, SUB,
+ new org.apache.pulsar.broker.transaction.metadata.TxnEvent(
+ TxnIds.toKey(txnId), TxnState.COMMITTED)).get();
+
+ Awaitility.await().untilAsserted(() ->
+ verify(handle).commitTxn(eq(txnId), eq(Map.of()), eq(0L)));
+
+ // Op records cleaned up.
+ Awaitility.await().untilAsserted(() -> {
+ List<TxnOp> remaining = new ArrayList<>();
+ txnStore.listAcksBySegmentSubscription(SEGMENT, SUB,
collectOps(remaining)).get();
+ assertThat(remaining).isEmpty();
+ });
+ }
+
+ @Test
+ public void abortEvent_callsHandleAbortTxn_andDeletesOpRecords() throws
Exception {
+ MetadataPendingAckStore ackStore = new
MetadataPendingAckStore(subscription, txnStore);
+ ackStore.replayAsync(handle, null);
+ ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+ TxnID txnId = new TxnID(1, 1);
+ createOpenHeader(txnId);
+ ackStore.appendIndividualAck(txnId, Collections.singletonList(
+ MutablePair.of(PositionFactory.create(3, 5), 1))).get();
+
+ abortHeader(txnId);
+ txnStore.publishSubscriptionEvent(SEGMENT, SUB,
+ new org.apache.pulsar.broker.transaction.metadata.TxnEvent(
+ TxnIds.toKey(txnId), TxnState.ABORTED)).get();
+
+ Awaitility.await().untilAsserted(() ->
+ verify(handle).abortTxn(eq(txnId), eq(null), eq(0L)));
+
+ Awaitility.await().untilAsserted(() -> {
+ List<TxnOp> remaining = new ArrayList<>();
+ txnStore.listAcksBySegmentSubscription(SEGMENT, SUB,
collectOps(remaining)).get();
+ assertThat(remaining).isEmpty();
+ });
+ }
+
+ @Test
+ public void recovery_rebuildsOpenTxnStateFromAckRecords() throws Exception
{
+ // Pre-populate: an OPEN txn with two acks on this (seg, sub), and a
terminal txn with a
+ // lingering ack that the store should drive to cleanup during
recovery.
+ TxnID openTxn = new TxnID(1, 1);
+ TxnID committedTxn = new TxnID(1, 2);
+
+ createOpenHeader(openTxn);
+ txnStore.appendOp(TxnIds.toKey(openTxn),
+ new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 1L, false)).get();
+ txnStore.appendOp(TxnIds.toKey(openTxn),
+ new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 2L, false)).get();
+
+ txnStore.createHeader(TxnIds.toKey(committedTxn),
+ new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
+ Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
+ txnStore.appendOp(TxnIds.toKey(committedTxn),
+ new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 3L, false)).get();
+
+ MetadataPendingAckStore ackStore = new
MetadataPendingAckStore(subscription, txnStore);
+ ackStore.replayAsync(handle, null);
+ ackStore.recoveryFuture().get(5, TimeUnit.SECONDS);
+
+ // Terminal txn was discovered during recovery → commitTxn fired and
its op record cleaned up.
+ Awaitility.await().untilAsserted(() ->
+ verify(handle).commitTxn(eq(committedTxn), eq(Map.of()),
eq(0L)));
+ Awaitility.await().untilAsserted(() -> {
+ List<TxnOp> remaining = new ArrayList<>();
+ txnStore.listAcksBySegmentSubscription(SEGMENT, SUB,
collectOps(remaining)).get();
+ // Only the open txn's two ack records remain.
+ assertThat(remaining).hasSize(2);
+ assertThat(remaining).allMatch(o ->
+ SEGMENT.equals(o.getSegment()) &&
SUB.equals(o.getSubscription()));
+ });
+ }
+
+ // ---- helpers ----------------------------------------------------------
+
+ private static org.apache.pulsar.metadata.api.ScanConsumer
collectOps(List<TxnOp> out) {
+ return new org.apache.pulsar.metadata.api.ScanConsumer() {
+ @Override
+ public void onNext(org.apache.pulsar.metadata.api.GetResult r) {
+ out.add(TxnMetadataStore.fromJson(r.getValue(), TxnOp.class));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }
+
+ private void createOpenHeader(TxnID txnId) throws Exception {
+ txnStore.createHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
+ Instant.ofEpochMilli(1000), null)).get();
+ }
+
+ private void commitHeader(TxnID txnId) throws Exception {
+ Stat created;
+ var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+ var h = v.value();
+ created = txnStore.updateHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.COMMITTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ v.version()).get();
+ assertThat(created.getVersion()).isPositive();
+ }
+
+ private void abortHeader(TxnID txnId) throws Exception {
+ var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+ var h = v.value();
+ txnStore.updateHeader(TxnIds.toKey(txnId),
+ new TxnHeader(TxnState.ABORTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ v.version()).get();
+ }
+}