This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e90a7c56ade [improve][broker] PIP-473: transaction metadata data layer 
(#25754)
e90a7c56ade is described below

commit e90a7c56ade8c9034bc6d43d2549f15000166dc4
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 12 16:49:19 2026 -0700

    [improve][broker] PIP-473: transaction metadata data layer (#25754)
---
 .../broker/transaction/metadata/TxnEvent.java      |  48 ++++
 .../broker/transaction/metadata/TxnHeader.java     |  52 ++++
 .../transaction/metadata/TxnMetadataStore.java     | 264 +++++++++++++++++++++
 .../pulsar/broker/transaction/metadata/TxnOp.java  |  51 ++++
 .../broker/transaction/metadata/TxnOpKind.java     |  32 +++
 .../broker/transaction/metadata/TxnPaths.java      | 144 +++++++++++
 .../broker/transaction/metadata/TxnState.java      |  41 ++++
 .../broker/transaction/metadata/Versioned.java     |  29 +++
 .../broker/transaction/metadata/package-info.java  |  29 +++
 .../transaction/metadata/TxnMetadataStoreTest.java | 258 ++++++++++++++++++++
 .../metadata/impl/AbstractMetadataStore.java       |  29 ++-
 .../metadata/impl/LocalMemoryMetadataStore.java    |   4 +
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |   4 +
 .../metadata/MetadataStoreScanChildrenTest.java    |  30 +++
 14 files changed, 1014 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
new file mode 100644
index 00000000000..6e2224e5342
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Per-participant notification record published by the TC after the end-txn 
CAS lands. One
+ * {@code TxnEvent} is appended per affected (segment) for writes, and per 
(segment, subscription)
+ * for acks, under the participant's notification prefix:
+ *
+ * <pre>
+ *   /txn-segment-events/&lt;segment&gt;-&lt;seq&gt;            
partitionKey=&lt;segment&gt;
+ *   /txn-subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;  
partitionKey=&lt;segment&gt;:&lt;sub&gt;
+ * </pre>
+ *
+ * <p>Each participant subscribes to its own ordered stream via {@code 
subscribeSequence} and
+ * applies the decision locally — see PIP-473's notification mechanism for the 
protocol.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TxnEvent {
+
+    /** The transaction this event refers to. */
+    private String txnId;
+
+    /** Final decision — {@link TxnState#COMMITTED} or {@link 
TxnState#ABORTED}. */
+    private TxnState decision;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
new file mode 100644
index 00000000000..381b6bbfbbc
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import java.time.Duration;
+import java.time.Instant;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Header record stored at {@code /txn/&lt;txnId&gt;}. Linearization point for 
the transaction
+ * lifecycle — the v5 TC's {@code endTxn} is a single CAS on this record's 
{@link #state}.
+ *
+ * <p>Serialized as JSON via {@link 
org.apache.pulsar.common.util.ObjectMapperFactory}.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TxnHeader {
+
+    /** Current transaction lifecycle state. */
+    private TxnState state;
+
+    /** Relative timeout — duration from {@link #createdAt} after which an 
OPEN txn is swept. */
+    private Duration timeout;
+
+    /** When the transaction was created. */
+    private Instant createdAt;
+
+    /**
+     * When the transaction was finalized (committed or aborted). Set by the 
TC immediately after
+     * the CAS that flips {@link #state}. {@code null} while OPEN.
+     */
+    private Instant finalizedAt;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
new file mode 100644
index 00000000000..5c2087e3fba
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.Stat;
+
+/**
+ * Typed façade over a {@link MetadataStore} that implements the PIP-473 
transaction data layout.
+ *
+ * <p>Responsibilities:
+ * <ul>
+ *   <li>JSON serde for {@link TxnHeader}, {@link TxnOp}, {@link 
TxnEvent}.</li>
+ *   <li>Attach {@link Option.PartitionKey} so all records for one txnId / 
segment / (segment, sub)
+ *       co-locate on sharded backends (Oxia).</li>
+ *   <li>Attach {@link Option.SecondaryIndex} entries so the runtime queries 
hit native indexes when
+ *       available, with a deserializing fallback predicate for stores that 
don't have them.</li>
+ *   <li>Append op-log and event records via {@link 
Option.SequenceKeysDeltas}.</li>
+ * </ul>
+ *
+ * <p>The façade is stateless apart from holding the store reference — index 
population happens via
+ * options on writes, so there is no explicit registration step.
+ */
+public class TxnMetadataStore {
+
+    /** Sequence-keys delta used by all append-only streams in this layout. */
+    private static final Option.SequenceKeysDeltas APPEND_DELTAS =
+            new Option.SequenceKeysDeltas(List.of(1L));
+
+    private final MetadataStore store;
+
+    public TxnMetadataStore(MetadataStore store) {
+        this.store = store;
+    }
+
+    // ---- Header CRUD -------------------------------------------------------
+
+    /** @return the header at {@code /txn/<txnId>} with its version, or empty 
if not present. */
+    public CompletableFuture<Optional<Versioned<TxnHeader>>> getHeader(String 
txnId) {
+        return store.get(TxnPaths.header(txnId), Set.of(new 
Option.PartitionKey(txnId)))
+                .thenApply(opt -> opt.map(gr ->
+                        new Versioned<>(fromJson(gr.getValue(), 
TxnHeader.class), gr.getStat().getVersion())));
+    }
+
+    /**
+     * Create the txn header at version -1 (must not exist). Adds the deadline 
secondary-index entry
+     * so the timeout sweeper can range-scan open transactions.
+     */
+    public CompletableFuture<Stat> createHeader(String txnId, TxnHeader 
header) {
+        return store.put(TxnPaths.header(txnId), toJson(header), 
Optional.of(-1L),
+                headerOptions(txnId, header));
+    }
+
+    /**
+     * CAS-update the txn header. Pass the {@code version} returned by a 
previous {@link #getHeader}.
+     * Index entries are recomputed by the store based on the options on this 
write.
+     */
+    public CompletableFuture<Stat> updateHeader(String txnId, TxnHeader 
header, long expectedVersion) {
+        return store.put(TxnPaths.header(txnId), toJson(header), 
Optional.of(expectedVersion),
+                headerOptions(txnId, header));
+    }
+
+    /** Delete the txn header with CAS on the expected version. Tolerates a 
NotFound result. */
+    public CompletableFuture<Void> deleteHeader(String txnId, long 
expectedVersion) {
+        return store.deleteIfExists(TxnPaths.header(txnId), 
Optional.of(expectedVersion),
+                Set.of(new Option.PartitionKey(txnId)));
+    }
+
+    private static Set<Option> headerOptions(String txnId, TxnHeader header) {
+        Option.SecondaryIndex idx;
+        if (header.getState().isTerminal()) {
+            long finalizedMs = header.getFinalizedAt() == null ? 0L : 
header.getFinalizedAt().toEpochMilli();
+            idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_FINAL_STATE,
+                    TxnPaths.finalStateKey(header.getState(), finalizedMs));
+        } else {
+            long deadlineMs = header.getCreatedAt().toEpochMilli() + 
header.getTimeout().toMillis();
+            idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_DEADLINE, 
TxnPaths.longKey(deadlineMs));
+        }
+        return Set.of(new Option.PartitionKey(txnId), idx);
+    }
+
+    // ---- Op-log append -----------------------------------------------------
+
+    /**
+     * Append a {@link TxnOp} under {@code /txn-op/<txnId>-<seq>}. Adds the 
per-kind secondary-index
+     * entry — {@link TxnPaths#IDX_WRITES_BY_SEGMENT} for writes,
+     * {@link TxnPaths#IDX_ACKS_BY_SEGMENT_SUBSCRIPTION} for acks. Returns the 
{@link Stat} whose
+     * {@code path} carries the generated sequence key.
+     */
+    public CompletableFuture<Stat> appendOp(String txnId, TxnOp op) {
+        Option.SecondaryIndex idx = switch (op.getKind()) {
+            case WRITE -> new 
Option.SecondaryIndex(TxnPaths.IDX_WRITES_BY_SEGMENT,
+                    TxnPaths.segmentKey(op.getSegment()));
+            case ACK -> new 
Option.SecondaryIndex(TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
+                    TxnPaths.ackIndexKey(op.getSegment(), 
op.getSubscription()));
+        };
+        Set<Option> opts = Set.of(new Option.PartitionKey(txnId), idx, 
APPEND_DELTAS);
+        return store.put(TxnPaths.opParent(txnId), toJson(op), 
Optional.empty(), opts);
+    }
+
+    // ---- Index queries -----------------------------------------------------
+
+    /** Stream all write ops targeting {@code segment}. */
+    public CompletableFuture<Void> listWritesBySegment(String segment, 
ScanConsumer consumer) {
+        String key = TxnPaths.segmentKey(segment);
+        return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, 
TxnPaths.IDX_WRITES_BY_SEGMENT,
+                key, key,
+                gr -> {
+                    TxnOp op = fromJson(gr.getValue(), TxnOp.class);
+                    return op.getKind() == TxnOpKind.WRITE && 
segment.equals(op.getSegment());
+                },
+                consumer);
+    }
+
+    /** Stream all ack ops targeting {@code (segment, subscription)}. */
+    public CompletableFuture<Void> listAcksBySegmentSubscription(String 
segment, String subscription,
+                                                                 ScanConsumer 
consumer) {
+        String key = TxnPaths.ackIndexKey(segment, subscription);
+        return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, 
TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
+                key, key,
+                gr -> {
+                    TxnOp op = fromJson(gr.getValue(), TxnOp.class);
+                    return op.getKind() == TxnOpKind.ACK
+                            && segment.equals(op.getSegment())
+                            && subscription.equals(op.getSubscription());
+                },
+                consumer);
+    }
+
+    /**
+     * Stream open transactions whose deadline falls in {@code 
[fromMsInclusive, toMsInclusive]}.
+     * Pass {@code null} on either bound for an unbounded range.
+     */
+    public CompletableFuture<Void> listOpenByDeadlineRange(Long 
fromMsInclusive, Long toMsInclusive,
+                                                           ScanConsumer 
consumer) {
+        String from = fromMsInclusive == null ? null : 
TxnPaths.longKey(fromMsInclusive);
+        String to = toMsInclusive == null ? null : 
TxnPaths.longKey(toMsInclusive);
+        return store.scanByIndex(TxnPaths.TXN_HEADER_PREFIX, 
TxnPaths.IDX_TXN_BY_DEADLINE,
+                from, to,
+                gr -> {
+                    TxnHeader h = fromJson(gr.getValue(), TxnHeader.class);
+                    if (h.getState().isTerminal()) {
+                        return false;
+                    }
+                    long deadline = h.getCreatedAt().toEpochMilli() + 
h.getTimeout().toMillis();
+                    return (fromMsInclusive == null || deadline >= 
fromMsInclusive)
+                            && (toMsInclusive == null || deadline <= 
toMsInclusive);
+                },
+                consumer);
+    }
+
+    /**
+     * Stream terminal transactions in {@code state} whose finalization time 
falls in
+     * {@code [fromMsInclusive, toMsInclusive]}. Pass {@code null} on either 
bound for unbounded.
+     */
+    public CompletableFuture<Void> listFinalizedByStateAndTimeRange(TxnState 
state,
+                                                                    Long 
fromMsInclusive, Long toMsInclusive,
+                                                                    
ScanConsumer consumer) {
+        String from = fromMsInclusive == null ? state.name() + ":" : 
TxnPaths.finalStateKey(state, fromMsInclusive);
+        String to = toMsInclusive == null
+                ? state.name() + ":" + TxnPaths.MAX_LONG_KEY
+                : TxnPaths.finalStateKey(state, toMsInclusive);
+        return store.scanByIndex(TxnPaths.TXN_HEADER_PREFIX, 
TxnPaths.IDX_TXN_BY_FINAL_STATE,
+                from, to,
+                gr -> {
+                    TxnHeader h = fromJson(gr.getValue(), TxnHeader.class);
+                    if (h.getState() != state) {
+                        return false;
+                    }
+                    if (h.getFinalizedAt() == null) {
+                        return false;
+                    }
+                    long finalized = h.getFinalizedAt().toEpochMilli();
+                    return (fromMsInclusive == null || finalized >= 
fromMsInclusive)
+                            && (toMsInclusive == null || finalized <= 
toMsInclusive);
+                },
+                consumer);
+    }
+
+    // ---- Event publishing & subscription ----------------------------------
+
+    /** Append a per-segment notification event. */
+    public CompletableFuture<Stat> publishSegmentEvent(String segment, 
TxnEvent event) {
+        Set<Option> opts = Set.of(new 
Option.PartitionKey(TxnPaths.segmentKey(segment)), APPEND_DELTAS);
+        return store.put(TxnPaths.segmentEventsParent(segment), toJson(event), 
Optional.empty(), opts);
+    }
+
+    /** Append a per-(segment, subscription) notification event. */
+    public CompletableFuture<Stat> publishSubscriptionEvent(String segment, 
String subscription,
+                                                            TxnEvent event) {
+        String pk = TxnPaths.ackIndexKey(segment, subscription);
+        Set<Option> opts = Set.of(new Option.PartitionKey(pk), APPEND_DELTAS);
+        return store.put(TxnPaths.subscriptionEventsParent(segment, 
subscription), toJson(event),
+                Optional.empty(), opts);
+    }
+
+    /**
+     * Subscribe to new {@link TxnEvent} entries for {@code segment}. The 
{@code listener} receives
+     * the full generated path of the latest sequence key — fetch the value 
with {@link #fromJson} on
+     * the {@code GetResult} from {@link MetadataStore#get(String, Set)}.
+     */
+    public AutoCloseable subscribeSegmentEvents(String segment, 
Consumer<String> listener)
+            throws MetadataStoreException {
+        return store.subscribeSequence(TxnPaths.segmentEventsParent(segment), 
listener,
+                Set.of(new Option.PartitionKey(TxnPaths.segmentKey(segment))));
+    }
+
+    /** Subscribe to new {@link TxnEvent} entries for {@code (segment, 
subscription)}. */
+    public AutoCloseable subscribeSubscriptionEvents(String segment, String 
subscription,
+                                                     Consumer<String> listener)
+            throws MetadataStoreException {
+        String pk = TxnPaths.ackIndexKey(segment, subscription);
+        return 
store.subscribeSequence(TxnPaths.subscriptionEventsParent(segment, 
subscription),
+                listener, Set.of(new Option.PartitionKey(pk)));
+    }
+
+    // ---- JSON helpers ------------------------------------------------------
+
+    /** @return UTF-8 JSON bytes for {@code value}. Wraps any I/O error as 
{@link CompletionException}. */
+    public static byte[] toJson(Object value) {
+        try {
+            return 
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(value);
+        } catch (JsonProcessingException e) {
+            throw new CompletionException(new MetadataStoreException(e));
+        }
+    }
+
+    /** @return the deserialized record. Wraps any I/O error as {@link 
CompletionException}. */
+    public static <T> T fromJson(byte[] bytes, Class<T> type) {
+        try {
+            return ObjectMapperFactory.getMapper().reader().readValue(bytes, 
type);
+        } catch (IOException e) {
+            throw new CompletionException(new MetadataStoreException(e));
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
new file mode 100644
index 00000000000..04e4f25807f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Per-write or per-ack operation record. One {@code TxnOp} is appended at
+ * {@code /txn-op/&lt;txnId&gt;-&lt;seq&gt;} (with {@code partitionKey = 
txnId}) every time a
+ * participant applies a transactional operation on a segment.
+ *
+ * <p>{@link #kind} discriminates writes from acks. {@link #subscription} is 
set only for acks.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TxnOp {
+
+    /** Whether this is a write or an ack. */
+    private TxnOpKind kind;
+
+    /** The segment topic name (segment://...) the operation applies to. */
+    private String segment;
+
+    /** The subscription FQN — only set on {@link TxnOpKind#ACK} entries. */
+    private String subscription;
+
+    /** Managed-ledger ledger id of the entry this op refers to. */
+    private long ledgerId;
+
+    /** Managed-ledger entry id of the entry this op refers to. */
+    private long entryId;
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOpKind.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOpKind.java
new file mode 100644
index 00000000000..97160c164fe
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOpKind.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * The kind of operation a {@link TxnOp} record represents. Serialized by 
Jackson as the constant's
+ * name to keep the wire format human-readable.
+ */
+public enum TxnOpKind {
+
+    /** A transactional message append (TB). */
+    WRITE,
+
+    /** A transactional acknowledgement (PendingAckStore). */
+    ACK
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
new file mode 100644
index 00000000000..c8952af1048
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import org.apache.pulsar.common.util.Codec;
+
+/**
+ * Path templates and secondary-index names for PIP-473 transaction metadata.
+ *
+ * <p>Layout (all under the metadata-store root). Sequence-key appends use
+ * {@link org.apache.pulsar.metadata.api.Option.SequenceKeysDeltas} with delta 
{@code [1]} —
+ * the store generates a 20-digit zero-padded suffix appended to the prefix 
with a {@code -}:
+ * <pre>
+ *   /txn/&lt;txnId&gt;                                       partitionKey = 
txnId
+ *   /txn-op/&lt;txnId&gt;-&lt;seq&gt;                              
partitionKey = txnId
+ *   /txn-segment-events/&lt;segment&gt;-&lt;seq&gt;                 
partitionKey = segment
+ *   /txn-subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;      
partitionKey = segment:sub
+ * </pre>
+ *
+ * <p>Secondary indexes (registered on the {@code MetadataStore} at startup):
+ * <pre>
+ *   idx:writes-by-segment             key = segment
+ *   idx:acks-by-segment-subscription  key = segment:sub
+ *   idx:txn-by-deadline               key = %020d(deadlineMs)
+ *   idx:txn-by-final-state            key = &lt;state&gt;:%020d(finalizedMs)
+ * </pre>
+ *
+ * <p>Numeric index keys are zero-padded to 20 digits so lexicographic 
ordering matches numeric
+ * ordering — long max is 19 digits, the extra digit leaves headroom.
+ */
+public final class TxnPaths {
+
+    /** Path prefix for transaction headers. {@code /txn/<txnId>}. */
+    public static final String TXN_HEADER_PREFIX = "/txn";
+
+    /** Path prefix for transaction op log entries. {@code 
/txn-op/<txnId>/<seq>}. */
+    public static final String TXN_OP_PREFIX = "/txn-op";
+
+    /** Path prefix for per-segment notification events. {@code 
/txn-segment-events/&lt;segment&gt;-&lt;seq&gt;}. */
+    public static final String TXN_SEGMENT_EVENTS_PREFIX = 
"/txn-segment-events";
+
+    /**
+     * Path prefix for per-(segment, subscription) notification events.
+     * {@code 
/txn-subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;}.
+     */
+    public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX = 
"/txn-subscription-events";
+
+    /** Index: list write ops by segment. Key = segment. */
+    public static final String IDX_WRITES_BY_SEGMENT = "idx:writes-by-segment";
+
+    /** Index: list ack ops by (segment, subscription). Key = {@code 
segment:sub}. */
+    public static final String IDX_ACKS_BY_SEGMENT_SUBSCRIPTION = 
"idx:acks-by-segment-subscription";
+
+    /** Index: list open transactions by deadline. Key = zero-padded 
deadlineMs. */
+    public static final String IDX_TXN_BY_DEADLINE = "idx:txn-by-deadline";
+
+    /**
+     * Index: list terminal transactions by final state and finalization time. 
Key =
+     * {@code <state>:padded(finalizedMs)}.
+     */
+    public static final String IDX_TXN_BY_FINAL_STATE = 
"idx:txn-by-final-state";
+
+    /** Width used when formatting long values into 
lexicographically-orderable index keys. */
+    public static final int LONG_KEY_WIDTH = 20;
+
+    /**
+     * The maximum {@link #LONG_KEY_WIDTH}-wide decimal — useful as the upper 
bound of a
+     * single-state range scan on the final-state index.
+     */
+    public static final String MAX_LONG_KEY = "99999999999999999999";
+
+    /** @return {@code /txn/<txnId>} — the header path for {@code txnId}. */
+    public static String header(String txnId) {
+        return TXN_HEADER_PREFIX + "/" + txnId;
+    }
+
+    /** @return {@code /txn-op/<txnId>} — the parent path under which op-log 
entries are appended. */
+    public static String opParent(String txnId) {
+        return TXN_OP_PREFIX + "/" + txnId;
+    }
+
+    /**
+     * @return URL-encoded form of {@code segment} suitable for use as a 
metadata-store path
+     *     component, partition key, or single-field index key. Encoding is 
required because segment
+     *     topic names contain {@code ://} and {@code /} ({@code 
segment://tenant/ns/topic/...}) —
+     *     ZooKeeper rejects those in paths, and using them raw also makes 
composite keys (see
+     *     {@link #ackIndexKey}) ambiguous. Mirrors the convention used 
elsewhere in the transaction
+     *     code (e.g. {@code MLPendingAckStore}, {@code 
TopicTransactionBuffer}).
+     */
+    public static String segmentKey(String segment) {
+        return Codec.encode(segment);
+    }
+
+    /** @return {@code /txn-segment-events/<encoded-segment>} — parent path 
for {@code segment}'s event stream. */
+    public static String segmentEventsParent(String segment) {
+        return TXN_SEGMENT_EVENTS_PREFIX + "/" + segmentKey(segment);
+    }
+
+    /**
+     * @return {@code 
/txn-subscription-events/<encoded-segment>:<encoded-sub>} — parent for
+     *     (segment, sub) events. Both components are URL-encoded so the 
{@code :} separator is
+     *     unambiguous (segment names contain {@code :} in their URI scheme).
+     */
+    public static String subscriptionEventsParent(String segment, String 
subscription) {
+        return TXN_SUBSCRIPTION_EVENTS_PREFIX + "/" + ackIndexKey(segment, 
subscription);
+    }
+
+    /**
+     * @return the composite ack-index key {@code 
<encoded-segment>:<encoded-sub>}. Used as both
+     *     the secondary-index value and the partition key for ack 
notifications. Components are
+     *     URL-encoded so the separator stays unambiguous.
+     */
+    public static String ackIndexKey(String segment, String subscription) {
+        return segmentKey(segment) + ":" + Codec.encode(subscription);
+    }
+
+    /** @return {@code value} formatted as a zero-padded fixed-width decimal 
for use as a range-scan index key. */
+    public static String longKey(long value) {
+        return String.format("%0" + LONG_KEY_WIDTH + "d", value);
+    }
+
+    /** @return the composite final-state index key {@code 
<state>:padded(finalizedMs)}. */
+    public static String finalStateKey(TxnState state, long finalizedMs) {
+        return state.name() + ":" + longKey(finalizedMs);
+    }
+
+    private TxnPaths() {}
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnState.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnState.java
new file mode 100644
index 00000000000..e7a2cf5c088
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnState.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Transaction header states. Serialized by Jackson as the constant's name to 
keep the wire format
+ * human-readable.
+ *
+ * <p>Valid transitions, applied by the v5 TC as version-conditional puts:
+ * <pre>
+ *   OPEN -&gt; COMMITTED   (terminal)
+ *   OPEN -&gt; ABORTED     (terminal)
+ * </pre>
+ */
+public enum TxnState {
+
+    OPEN,
+    COMMITTED,
+    ABORTED;
+
+    /** @return {@code true} if this is a terminal state (COMMITTED or 
ABORTED). */
+    public boolean isTerminal() {
+        return this != OPEN;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/Versioned.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/Versioned.java
new file mode 100644
index 00000000000..cbdada3a052
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/Versioned.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * A value with the metadata-store version it was read at. Pass {@link 
#version} as the
+ * {@code IfVersionEquals} option on a subsequent {@code put} to perform an 
atomic CAS update.
+ *
+ * @param value   the deserialized record value
+ * @param version the version reported by the metadata store at read time
+ * @param <T>     the record type
+ */
+public record Versioned<T>(T value, long version) {}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/package-info.java
new file mode 100644
index 00000000000..e6da65e52ad
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Metadata-store data layer for PIP-473 transactions: schema-versioned JSON 
records
+ * ({@link org.apache.pulsar.broker.transaction.metadata.TxnHeader},
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnOp},
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnEvent}), path/index 
constants
+ * ({@link org.apache.pulsar.broker.transaction.metadata.TxnPaths}), and a 
typed façade over
+ * {@link org.apache.pulsar.metadata.api.MetadataStore}
+ * ({@link org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore}).
+ */
+package org.apache.pulsar.broker.transaction.metadata;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
new file mode 100644
index 00000000000..39d2e812431
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link TxnMetadataStore} on the in-memory backend. The 
façade is a thin layer
+ * over {@link MetadataStore}; coverage of native-index vs fallback behaviour 
for the underlying
+ * {@code scanByIndex}/{@code subscribeSequence} primitives lives in {@code 
pulsar-metadata}'s
+ * cross-backend tests.
+ */
+public class TxnMetadataStoreTest {
+
+    private static MetadataStore newMemoryStore() throws Exception {
+        return MetadataStoreFactory.create("memory:local",
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+    }
+
+    private static TxnHeader open(long createdMs, long timeoutMs) {
+        return new TxnHeader(TxnState.OPEN, Duration.ofMillis(timeoutMs),
+                Instant.ofEpochMilli(createdMs), null);
+    }
+
+    private static TxnHeader finalized(TxnState state, long createdMs, long 
timeoutMs, long finalizedMs) {
+        return new TxnHeader(state, Duration.ofMillis(timeoutMs),
+                Instant.ofEpochMilli(createdMs), 
Instant.ofEpochMilli(finalizedMs));
+    }
+
+    @Test
+    public void headerLifecycle() throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        String txnId = "tx-1";
+        TxnHeader h = open(1000L, 5000L);
+        Stat created = txn.createHeader(txnId, h).get();
+        assertThat(created.getVersion()).isZero();
+
+        // Get sees the right value + version.
+        Versioned<TxnHeader> v = txn.getHeader(txnId).get().orElseThrow();
+        assertThat(v.value()).isEqualTo(h);
+        assertThat(v.version()).isEqualTo(created.getVersion());
+
+        // CAS update with the right version succeeds; finalize the txn.
+        TxnHeader committed = finalized(TxnState.COMMITTED, 1000L, 5000L, 
1500L);
+        Stat updated = txn.updateHeader(txnId, committed, v.version()).get();
+        assertThat(updated.getVersion()).isGreaterThan(v.version());
+
+        // CAS update with stale version fails.
+        assertThatThrownBy(() -> txn.updateHeader(txnId, committed, 
v.version()).get())
+                
.hasCauseInstanceOf(MetadataStoreException.BadVersionException.class);
+
+        // Delete then re-read returns empty.
+        txn.deleteHeader(txnId, updated.getVersion()).get();
+        assertThat(txn.getHeader(txnId).get()).isEmpty();
+    }
+
+    @Test
+    public void appendAndScanWriteOps() throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        String txnId = "tx-w";
+        // Use realistic segment URIs with the segment:// scheme so we 
exercise URL encoding of
+        // path components — segment names contain "://" and "/" which would 
otherwise break ZK.
+        String segA = "segment://public/default/topic/0000-7fff-0";
+        String segB = "segment://public/default/topic/8000-ffff-0";
+        TxnOp w1 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 1L);
+        TxnOp w2 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 2L);
+        TxnOp wOther = new TxnOp(TxnOpKind.WRITE, segB, null, 2L, 1L);
+        Stat s1 = txn.appendOp(txnId, w1).get();
+        Stat s2 = txn.appendOp(txnId, w2).get();
+        txn.appendOp(txnId, wOther).get();
+        assertThat(s2.getPath()).isGreaterThan(s1.getPath());
+
+        List<TxnOp> hits = new ArrayList<>();
+        txn.listWritesBySegment(segA, collectOps(hits)).get();
+        assertThat(hits).containsExactlyInAnyOrder(w1, w2);
+    }
+
+    @Test
+    public void appendAndScanAckOps() throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        String txnId = "tx-a";
+        String segA = "segment://public/default/topic/0000-7fff-0";
+        TxnOp a1 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 5L);
+        TxnOp a2 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 6L);
+        TxnOp aOther = new TxnOp(TxnOpKind.ACK, segA, "sub/y", 1L, 7L);
+        txn.appendOp(txnId, a1).get();
+        txn.appendOp(txnId, a2).get();
+        txn.appendOp(txnId, aOther).get();
+
+        List<TxnOp> hits = new ArrayList<>();
+        txn.listAcksBySegmentSubscription(segA, "sub/x", 
collectOps(hits)).get();
+        assertThat(hits).containsExactlyInAnyOrder(a1, a2);
+    }
+
+    @Test
+    public void deadlineRangeScanFiltersTerminalAndOutOfRange() throws 
Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        // Three open txns with deadlines 1100, 1200, 1300 (created+timeout).
+        txn.createHeader("t1", open(100L, 1000L)).get();
+        TxnHeader open2 = open(200L, 1000L);
+        txn.createHeader("t2", open2).get();
+        txn.createHeader("t3", open(300L, 1000L)).get();
+
+        // One terminal txn — must be excluded by the deadline scan.
+        txn.createHeader("t-term", finalized(TxnState.COMMITTED, 50L, 1000L, 
1150L)).get();
+
+        List<TxnHeader> hits = new ArrayList<>();
+        txn.listOpenByDeadlineRange(1150L, 1250L, collectHeaders(hits)).get();
+        assertThat(hits).containsExactly(open2);
+    }
+
+    @Test
+    public void finalStateRangeScan() throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        TxnHeader c1 = finalized(TxnState.COMMITTED, 1000L, 1000L, 1100L);
+        TxnHeader c2 = finalized(TxnState.COMMITTED, 1000L, 1000L, 1200L);
+        TxnHeader a1 = finalized(TxnState.ABORTED, 1000L, 1000L, 1150L);
+        txn.createHeader("c1", c1).get();
+        txn.createHeader("c2", c2).get();
+        txn.createHeader("a1", a1).get();
+
+        List<TxnHeader> committed = new ArrayList<>();
+        txn.listFinalizedByStateAndTimeRange(TxnState.COMMITTED, null, null, 
collectHeaders(committed)).get();
+        assertThat(committed).containsExactlyInAnyOrder(c1, c2);
+
+        List<TxnHeader> recent = new ArrayList<>();
+        txn.listFinalizedByStateAndTimeRange(TxnState.COMMITTED, 1150L, 1300L, 
collectHeaders(recent)).get();
+        assertThat(recent).containsExactly(c2);
+
+        List<TxnHeader> aborted = new ArrayList<>();
+        txn.listFinalizedByStateAndTimeRange(TxnState.ABORTED, null, null, 
collectHeaders(aborted)).get();
+        assertThat(aborted).containsExactly(a1);
+    }
+
+    @Test
+    public void publishAndSubscribeSegmentEvents() throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        String segment = "segment://public/default/topic/0000-7fff-0";
+        ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+        @Cleanup AutoCloseable handle = txn.subscribeSegmentEvents(segment, 
received::add);
+
+        TxnEvent e1 = new TxnEvent("tx-1", TxnState.COMMITTED);
+        TxnEvent e2 = new TxnEvent("tx-2", TxnState.ABORTED);
+        Stat s1 = txn.publishSegmentEvent(segment, e1).get();
+        Stat s2 = txn.publishSegmentEvent(segment, e2).get();
+
+        // Sequence-key suffixed under the segment parent.
+        assertThat(s1.getPath()).matches("\\Q" + 
TxnPaths.segmentEventsParent(segment) + "\\E-\\d{20}");
+        assertThat(s2.getPath()).isGreaterThan(s1.getPath());
+
+        // Subscription receives the latest sequence key (intermediate updates 
may be collapsed).
+        Awaitility.await().untilAsserted(() ->
+                
assertThat(received).isNotEmpty().last().asString().isEqualTo(s2.getPath()));
+
+        // Fetched event round-trips through fromJson.
+        GetResult gr = store.get(s2.getPath()).get().orElseThrow();
+        TxnEvent fetched = TxnMetadataStore.fromJson(gr.getValue(), 
TxnEvent.class);
+        assertThat(fetched).isEqualTo(e2);
+    }
+
+    @Test
+    public void publishAndSubscribeSubscriptionEvents() throws Exception {
+        @Cleanup MetadataStore store = newMemoryStore();
+        TxnMetadataStore txn = new TxnMetadataStore(store);
+
+        String segment = "segment://public/default/topic/0000-7fff-0";
+        String sub = "sub/x";
+        ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+        @Cleanup AutoCloseable handle = 
txn.subscribeSubscriptionEvents(segment, sub, received::add);
+
+        TxnEvent e = new TxnEvent("tx-1", TxnState.COMMITTED);
+        Stat s = txn.publishSubscriptionEvent(segment, sub, e).get();
+
+        Awaitility.await().untilAsserted(() ->
+                
assertThat(received).isNotEmpty().last().asString().isEqualTo(s.getPath()));
+    }
+
+    // ---- helpers -----------------------------------------------------------
+
+    private static ScanConsumer collectHeaders(List<TxnHeader> out) {
+        return new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                out.add(TxnMetadataStore.fromJson(r.getValue(), 
TxnHeader.class));
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        };
+    }
+
+    private static ScanConsumer collectOps(List<TxnOp> out) {
+        return new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                out.add(TxnMetadataStore.fromJson(r.getValue(), TxnOp.class));
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        };
+    }
+
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 1112a4e0975..a8e11fc6180 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -597,6 +597,10 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         getChildrenFromStore(parentPath, opts).thenCompose(children -> {
             CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
             for (String child : children) {
+                if (isSequenceCounterChild(child)) {
+                    // Sidecar bookkeeping for SequenceKeysDeltas — not a user 
record.
+                    continue;
+                }
                 String childPath = parentPath.equals("/") ? "/" + child : 
parentPath + "/" + child;
                 chain = chain.thenCompose(__ -> storeGet(childPath, opts))
                         .thenAccept(opt -> opt.ifPresent(consumer::onNext));
@@ -636,6 +640,10 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         getChildrenFromStore(scanPathPrefix, opts).thenCompose(children -> {
             CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
             for (String child : children) {
+                if (isSequenceCounterChild(child)) {
+                    // Sidecar bookkeeping for SequenceKeysDeltas — not a user 
record.
+                    continue;
+                }
                 String childPath = scanPathPrefix.equals("/") ? "/" + child : 
scanPathPrefix + "/" + child;
                 chain = chain.thenCompose(__ -> storeGet(childPath, opts))
                         .thenAccept(opt -> 
opt.filter(fallbackFilter).ifPresent(consumer::onNext));
@@ -759,9 +767,28 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     /** Counter-document path for a sequence prefix. Sibling of the prefix at 
the parent level. */
     static String sequenceCounterPath(String prefix) {
-        return prefix + "__seq_counter__";
+        return prefix + SEQUENCE_COUNTER_SUFFIX;
+    }
+
+    /**
+     * @return {@code true} when {@code childName} is a synthesized 
sequence-counter sidecar — i.e.
+     *     bookkeeping written by {@link #atomicIncrementSequenceCounter}, not 
a user record. Scan
+     *     primitives use this to filter counters out of their output on 
non-native backends.
+     *
+     * <p>The match is a literal-suffix check. A user record whose final path 
segment happens to
+     *     end with {@value #SEQUENCE_COUNTER_SUFFIX} would also be filtered. 
We accept that as
+     *     acceptable: callers don't get to pick paths ending in the {@code 
__seq_counter__} marker
+     *     accidentally (the suffix is 16 characters of internal-only marker), 
and a strict check
+     *     would require either tracking active prefixes or reserving a 
delimiter that the path
+     *     backends forbid. The cost of a false positive is silently dropping 
that record from
+     *     {@code scanChildren}/{@code scanByIndex}; no data loss.
+     */
+    static boolean isSequenceCounterChild(String childName) {
+        return childName != null && 
childName.endsWith(SEQUENCE_COUNTER_SUFFIX);
     }
 
+    private static final String SEQUENCE_COUNTER_SUFFIX = "__seq_counter__";
+
     /** Format a synthesized sequence key matching Oxia's native format: 
{@code prefix-{seq:%020d}-...}. */
     static String formatSequenceKey(String prefix, long[] seqs) {
         StringBuilder sb = new StringBuilder(prefix);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index c1f00c081c5..f612f2ea790 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -137,6 +137,10 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
                 if (key.indexOf('/', relStart) >= 0) {
                     return;
                 }
+                if (isSequenceCounterChild(key.substring(relStart))) {
+                    // Sidecar bookkeeping for SequenceKeysDeltas — not a user 
record.
+                    return;
+                }
                 snapshot.add(new GetResult(
                         value.data,
                         new Stat(key, value.version, value.createdTimestamp, 
value.modifiedTimestamp,
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 5e9024a3104..cda329aa6a4 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -438,6 +438,10 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
                     if (currentPath.indexOf('/', firstKey.length()) >= 0) {
                         continue;
                     }
+                    if 
(isSequenceCounterChild(currentPath.substring(firstKey.length()))) {
+                        // Sidecar bookkeeping for SequenceKeysDeltas — not a 
user record.
+                        continue;
+                    }
                     byte[] value = iterator.value();
                     if (value == null) {
                         continue;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
index 37eea03713c..f8f1e6f1573 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
@@ -36,6 +36,7 @@ import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
 import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.testng.annotations.Test;
@@ -134,6 +135,35 @@ public class MetadataStoreScanChildrenTest extends 
BaseMetadataStoreTest {
         assertEquals(consumer.records.get(0).getStat().getPath(), child);
     }
 
+    @Test(dataProvider = "impl")
+    public void skipsSequenceCounterSidecars(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        // When SequenceKeysDeltas is used on a non-native backend, the 
abstract layer writes a
+        // sidecar counter document under the same parent. scanChildren must 
not surface it as a
+        // user record (its value is the binary counter encoding, not the 
caller's payload).
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(
+                urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String parent = newKey();
+        String prefix = parent + "/op";
+        Set<Option> putOpts = "Oxia".equals(provider)
+                ? Set.of(new Option.SequenceKeysDeltas(List.of(1L)), new 
Option.PartitionKey(prefix))
+                : Set.of(new Option.SequenceKeysDeltas(List.of(1L)));
+        store.put(prefix, "a".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), putOpts).join();
+        store.put(prefix, "b".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), putOpts).join();
+
+        CollectingConsumer consumer = new CollectingConsumer();
+        store.scanChildren(parent, consumer).join();
+        consumer.awaitDone();
+
+        // Only the two appended records; the counter sidecar (if any) is 
filtered out.
+        assertEquals(consumer.records.size(), 2);
+        for (GetResult r : consumer.records) {
+            assertTrue(!r.getStat().getPath().endsWith("__seq_counter__"),
+                    "counter sidecar leaked through scanChildren: " + 
r.getStat().getPath());
+        }
+    }
+
     @Test(dataProvider = "impl")
     public void closedStoreRejectsScan(String provider, Supplier<String> 
urlSupplier) throws Exception {
         MetadataStoreExtended store = MetadataStoreExtended.create(

Reply via email to