This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e90a7c56ade [improve][broker] PIP-473: transaction metadata data layer
(#25754)
e90a7c56ade is described below
commit e90a7c56ade8c9034bc6d43d2549f15000166dc4
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 12 16:49:19 2026 -0700
[improve][broker] PIP-473: transaction metadata data layer (#25754)
---
.../broker/transaction/metadata/TxnEvent.java | 48 ++++
.../broker/transaction/metadata/TxnHeader.java | 52 ++++
.../transaction/metadata/TxnMetadataStore.java | 264 +++++++++++++++++++++
.../pulsar/broker/transaction/metadata/TxnOp.java | 51 ++++
.../broker/transaction/metadata/TxnOpKind.java | 32 +++
.../broker/transaction/metadata/TxnPaths.java | 144 +++++++++++
.../broker/transaction/metadata/TxnState.java | 41 ++++
.../broker/transaction/metadata/Versioned.java | 29 +++
.../broker/transaction/metadata/package-info.java | 29 +++
.../transaction/metadata/TxnMetadataStoreTest.java | 258 ++++++++++++++++++++
.../metadata/impl/AbstractMetadataStore.java | 29 ++-
.../metadata/impl/LocalMemoryMetadataStore.java | 4 +
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 4 +
.../metadata/MetadataStoreScanChildrenTest.java | 30 +++
14 files changed, 1014 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
new file mode 100644
index 00000000000..6e2224e5342
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Per-participant notification record published by the TC after the end-txn
CAS lands. One
+ * {@code TxnEvent} is appended per affected (segment) for writes, and per
(segment, subscription)
+ * for acks, under the participant's notification prefix:
+ *
+ * <pre>
+ * /txn-segment-events/<segment>-<seq>
partitionKey=<segment>
+ * /txn-subscription-events/<segment>:<sub>-<seq>
partitionKey=<segment>:<sub>
+ * </pre>
+ *
+ * <p>Each participant subscribes to its own ordered stream via {@code
subscribeSequence} and
+ * applies the decision locally — see PIP-473's notification mechanism for the
protocol.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TxnEvent {
+
+ /** The transaction this event refers to. */
+ private String txnId;
+
+ /** Final decision — {@link TxnState#COMMITTED} or {@link
TxnState#ABORTED}. */
+ private TxnState decision;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
new file mode 100644
index 00000000000..381b6bbfbbc
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import java.time.Duration;
+import java.time.Instant;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Header record stored at {@code /txn/<txnId>}. Linearization point for
the transaction
+ * lifecycle — the v5 TC's {@code endTxn} is a single CAS on this record's
{@link #state}.
+ *
+ * <p>Serialized as JSON via {@link
org.apache.pulsar.common.util.ObjectMapperFactory}.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TxnHeader {
+
+ /** Current transaction lifecycle state. */
+ private TxnState state;
+
+ /** Relative timeout — duration from {@link #createdAt} after which an
OPEN txn is swept. */
+ private Duration timeout;
+
+ /** When the transaction was created. */
+ private Instant createdAt;
+
+ /**
+ * When the transaction was finalized (committed or aborted). Set by the
TC immediately after
+ * the CAS that flips {@link #state}. {@code null} while OPEN.
+ */
+ private Instant finalizedAt;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
new file mode 100644
index 00000000000..5c2087e3fba
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.Stat;
+
+/**
+ * Typed façade over a {@link MetadataStore} that implements the PIP-473
transaction data layout.
+ *
+ * <p>Responsibilities:
+ * <ul>
+ * <li>JSON serde for {@link TxnHeader}, {@link TxnOp}, {@link
TxnEvent}.</li>
+ * <li>Attach {@link Option.PartitionKey} so all records for one txnId /
segment / (segment, sub)
+ * co-locate on sharded backends (Oxia).</li>
+ * <li>Attach {@link Option.SecondaryIndex} entries so the runtime queries
hit native indexes when
+ * available, with a deserializing fallback predicate for stores that
don't have them.</li>
+ * <li>Append op-log and event records via {@link
Option.SequenceKeysDeltas}.</li>
+ * </ul>
+ *
+ * <p>The façade is stateless apart from holding the store reference — index
population happens via
+ * options on writes, so there is no explicit registration step.
+ */
+public class TxnMetadataStore {
+
+ /** Sequence-keys delta used by all append-only streams in this layout. */
+ private static final Option.SequenceKeysDeltas APPEND_DELTAS =
+ new Option.SequenceKeysDeltas(List.of(1L));
+
+ private final MetadataStore store;
+
+ public TxnMetadataStore(MetadataStore store) {
+ this.store = store;
+ }
+
+ // ---- Header CRUD -------------------------------------------------------
+
+ /** @return the header at {@code /txn/<txnId>} with its version, or empty
if not present. */
+ public CompletableFuture<Optional<Versioned<TxnHeader>>> getHeader(String
txnId) {
+ return store.get(TxnPaths.header(txnId), Set.of(new
Option.PartitionKey(txnId)))
+ .thenApply(opt -> opt.map(gr ->
+ new Versioned<>(fromJson(gr.getValue(),
TxnHeader.class), gr.getStat().getVersion())));
+ }
+
+ /**
+ * Create the txn header at version -1 (must not exist). Adds the deadline
secondary-index entry
+ * so the timeout sweeper can range-scan open transactions.
+ */
+ public CompletableFuture<Stat> createHeader(String txnId, TxnHeader
header) {
+ return store.put(TxnPaths.header(txnId), toJson(header),
Optional.of(-1L),
+ headerOptions(txnId, header));
+ }
+
+ /**
+ * CAS-update the txn header. Pass the {@code version} returned by a
previous {@link #getHeader}.
+ * Index entries are recomputed by the store based on the options on this
write.
+ */
+ public CompletableFuture<Stat> updateHeader(String txnId, TxnHeader
header, long expectedVersion) {
+ return store.put(TxnPaths.header(txnId), toJson(header),
Optional.of(expectedVersion),
+ headerOptions(txnId, header));
+ }
+
+ /** Delete the txn header with CAS on the expected version. Tolerates a
NotFound result. */
+ public CompletableFuture<Void> deleteHeader(String txnId, long
expectedVersion) {
+ return store.deleteIfExists(TxnPaths.header(txnId),
Optional.of(expectedVersion),
+ Set.of(new Option.PartitionKey(txnId)));
+ }
+
+ private static Set<Option> headerOptions(String txnId, TxnHeader header) {
+ Option.SecondaryIndex idx;
+ if (header.getState().isTerminal()) {
+ long finalizedMs = header.getFinalizedAt() == null ? 0L :
header.getFinalizedAt().toEpochMilli();
+ idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_FINAL_STATE,
+ TxnPaths.finalStateKey(header.getState(), finalizedMs));
+ } else {
+ long deadlineMs = header.getCreatedAt().toEpochMilli() +
header.getTimeout().toMillis();
+ idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_DEADLINE,
TxnPaths.longKey(deadlineMs));
+ }
+ return Set.of(new Option.PartitionKey(txnId), idx);
+ }
+
+ // ---- Op-log append -----------------------------------------------------
+
+ /**
+ * Append a {@link TxnOp} under {@code /txn-op/<txnId>-<seq>}. Adds the
per-kind secondary-index
+ * entry — {@link TxnPaths#IDX_WRITES_BY_SEGMENT} for writes,
+ * {@link TxnPaths#IDX_ACKS_BY_SEGMENT_SUBSCRIPTION} for acks. Returns the
{@link Stat} whose
+ * {@code path} carries the generated sequence key.
+ */
+ public CompletableFuture<Stat> appendOp(String txnId, TxnOp op) {
+ Option.SecondaryIndex idx = switch (op.getKind()) {
+ case WRITE -> new
Option.SecondaryIndex(TxnPaths.IDX_WRITES_BY_SEGMENT,
+ TxnPaths.segmentKey(op.getSegment()));
+ case ACK -> new
Option.SecondaryIndex(TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
+ TxnPaths.ackIndexKey(op.getSegment(),
op.getSubscription()));
+ };
+ Set<Option> opts = Set.of(new Option.PartitionKey(txnId), idx,
APPEND_DELTAS);
+ return store.put(TxnPaths.opParent(txnId), toJson(op),
Optional.empty(), opts);
+ }
+
+ // ---- Index queries -----------------------------------------------------
+
+ /** Stream all write ops targeting {@code segment}. */
+ public CompletableFuture<Void> listWritesBySegment(String segment,
ScanConsumer consumer) {
+ String key = TxnPaths.segmentKey(segment);
+ return store.scanByIndex(TxnPaths.TXN_OP_PREFIX,
TxnPaths.IDX_WRITES_BY_SEGMENT,
+ key, key,
+ gr -> {
+ TxnOp op = fromJson(gr.getValue(), TxnOp.class);
+ return op.getKind() == TxnOpKind.WRITE &&
segment.equals(op.getSegment());
+ },
+ consumer);
+ }
+
+ /** Stream all ack ops targeting {@code (segment, subscription)}. */
+ public CompletableFuture<Void> listAcksBySegmentSubscription(String
segment, String subscription,
+ ScanConsumer
consumer) {
+ String key = TxnPaths.ackIndexKey(segment, subscription);
+ return store.scanByIndex(TxnPaths.TXN_OP_PREFIX,
TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
+ key, key,
+ gr -> {
+ TxnOp op = fromJson(gr.getValue(), TxnOp.class);
+ return op.getKind() == TxnOpKind.ACK
+ && segment.equals(op.getSegment())
+ && subscription.equals(op.getSubscription());
+ },
+ consumer);
+ }
+
+ /**
+ * Stream open transactions whose deadline falls in {@code
[fromMsInclusive, toMsInclusive]}.
+ * Pass {@code null} on either bound for an unbounded range.
+ */
+ public CompletableFuture<Void> listOpenByDeadlineRange(Long
fromMsInclusive, Long toMsInclusive,
+ ScanConsumer
consumer) {
+ String from = fromMsInclusive == null ? null :
TxnPaths.longKey(fromMsInclusive);
+ String to = toMsInclusive == null ? null :
TxnPaths.longKey(toMsInclusive);
+ return store.scanByIndex(TxnPaths.TXN_HEADER_PREFIX,
TxnPaths.IDX_TXN_BY_DEADLINE,
+ from, to,
+ gr -> {
+ TxnHeader h = fromJson(gr.getValue(), TxnHeader.class);
+ if (h.getState().isTerminal()) {
+ return false;
+ }
+ long deadline = h.getCreatedAt().toEpochMilli() +
h.getTimeout().toMillis();
+ return (fromMsInclusive == null || deadline >=
fromMsInclusive)
+ && (toMsInclusive == null || deadline <=
toMsInclusive);
+ },
+ consumer);
+ }
+
+ /**
+ * Stream terminal transactions in {@code state} whose finalization time
falls in
+ * {@code [fromMsInclusive, toMsInclusive]}. Pass {@code null} on either
bound for unbounded.
+ */
+ public CompletableFuture<Void> listFinalizedByStateAndTimeRange(TxnState
state,
+ Long
fromMsInclusive, Long toMsInclusive,
+
ScanConsumer consumer) {
+ String from = fromMsInclusive == null ? state.name() + ":" :
TxnPaths.finalStateKey(state, fromMsInclusive);
+ String to = toMsInclusive == null
+ ? state.name() + ":" + TxnPaths.MAX_LONG_KEY
+ : TxnPaths.finalStateKey(state, toMsInclusive);
+ return store.scanByIndex(TxnPaths.TXN_HEADER_PREFIX,
TxnPaths.IDX_TXN_BY_FINAL_STATE,
+ from, to,
+ gr -> {
+ TxnHeader h = fromJson(gr.getValue(), TxnHeader.class);
+ if (h.getState() != state) {
+ return false;
+ }
+ if (h.getFinalizedAt() == null) {
+ return false;
+ }
+ long finalized = h.getFinalizedAt().toEpochMilli();
+ return (fromMsInclusive == null || finalized >=
fromMsInclusive)
+ && (toMsInclusive == null || finalized <=
toMsInclusive);
+ },
+ consumer);
+ }
+
+ // ---- Event publishing & subscription ----------------------------------
+
+ /** Append a per-segment notification event. */
+ public CompletableFuture<Stat> publishSegmentEvent(String segment,
TxnEvent event) {
+ Set<Option> opts = Set.of(new
Option.PartitionKey(TxnPaths.segmentKey(segment)), APPEND_DELTAS);
+ return store.put(TxnPaths.segmentEventsParent(segment), toJson(event),
Optional.empty(), opts);
+ }
+
+ /** Append a per-(segment, subscription) notification event. */
+ public CompletableFuture<Stat> publishSubscriptionEvent(String segment,
String subscription,
+ TxnEvent event) {
+ String pk = TxnPaths.ackIndexKey(segment, subscription);
+ Set<Option> opts = Set.of(new Option.PartitionKey(pk), APPEND_DELTAS);
+ return store.put(TxnPaths.subscriptionEventsParent(segment,
subscription), toJson(event),
+ Optional.empty(), opts);
+ }
+
+ /**
+ * Subscribe to new {@link TxnEvent} entries for {@code segment}. The
{@code listener} receives
+ * the full generated path of the latest sequence key — fetch the value
with {@link #fromJson} on
+ * the {@code GetResult} from {@link MetadataStore#get(String, Set)}.
+ */
+ public AutoCloseable subscribeSegmentEvents(String segment,
Consumer<String> listener)
+ throws MetadataStoreException {
+ return store.subscribeSequence(TxnPaths.segmentEventsParent(segment),
listener,
+ Set.of(new Option.PartitionKey(TxnPaths.segmentKey(segment))));
+ }
+
+ /** Subscribe to new {@link TxnEvent} entries for {@code (segment,
subscription)}. */
+ public AutoCloseable subscribeSubscriptionEvents(String segment, String
subscription,
+ Consumer<String> listener)
+ throws MetadataStoreException {
+ String pk = TxnPaths.ackIndexKey(segment, subscription);
+ return
store.subscribeSequence(TxnPaths.subscriptionEventsParent(segment,
subscription),
+ listener, Set.of(new Option.PartitionKey(pk)));
+ }
+
+ // ---- JSON helpers ------------------------------------------------------
+
+ /** @return UTF-8 JSON bytes for {@code value}. Wraps any I/O error as
{@link CompletionException}. */
+ public static byte[] toJson(Object value) {
+ try {
+ return
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(value);
+ } catch (JsonProcessingException e) {
+ throw new CompletionException(new MetadataStoreException(e));
+ }
+ }
+
+ /** @return the deserialized record. Wraps any I/O error as {@link
CompletionException}. */
+ public static <T> T fromJson(byte[] bytes, Class<T> type) {
+ try {
+ return ObjectMapperFactory.getMapper().reader().readValue(bytes,
type);
+ } catch (IOException e) {
+ throw new CompletionException(new MetadataStoreException(e));
+ }
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
new file mode 100644
index 00000000000..04e4f25807f
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Per-write or per-ack operation record. One {@code TxnOp} is appended at
+ * {@code /txn-op/<txnId>-<seq>} (with {@code partitionKey =
txnId}) every time a
+ * participant applies a transactional operation on a segment.
+ *
+ * <p>{@link #kind} discriminates writes from acks. {@link #subscription} is
set only for acks.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class TxnOp {
+
+ /** Whether this is a write or an ack. */
+ private TxnOpKind kind;
+
+ /** The segment topic name (segment://...) the operation applies to. */
+ private String segment;
+
+ /** The subscription FQN — only set on {@link TxnOpKind#ACK} entries. */
+ private String subscription;
+
+ /** Managed-ledger ledger id of the entry this op refers to. */
+ private long ledgerId;
+
+ /** Managed-ledger entry id of the entry this op refers to. */
+ private long entryId;
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOpKind.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOpKind.java
new file mode 100644
index 00000000000..97160c164fe
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOpKind.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * The kind of operation a {@link TxnOp} record represents. Serialized by
Jackson as the constant's
+ * name to keep the wire format human-readable.
+ */
+public enum TxnOpKind {
+
+ /** A transactional message append (TB). */
+ WRITE,
+
+ /** A transactional acknowledgement (PendingAckStore). */
+ ACK
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
new file mode 100644
index 00000000000..c8952af1048
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import org.apache.pulsar.common.util.Codec;
+
+/**
+ * Path templates and secondary-index names for PIP-473 transaction metadata.
+ *
+ * <p>Layout (all under the metadata-store root). Sequence-key appends use
+ * {@link org.apache.pulsar.metadata.api.Option.SequenceKeysDeltas} with delta
{@code [1]} —
+ * the store generates a 20-digit zero-padded suffix appended to the prefix
with a {@code -}:
+ * <pre>
+ * /txn/<txnId> partitionKey =
txnId
+ * /txn-op/<txnId>-<seq>
partitionKey = txnId
+ * /txn-segment-events/<segment>-<seq>
partitionKey = segment
+ * /txn-subscription-events/<segment>:<sub>-<seq>
partitionKey = segment:sub
+ * </pre>
+ *
+ * <p>Secondary indexes (registered on the {@code MetadataStore} at startup):
+ * <pre>
+ * idx:writes-by-segment key = segment
+ * idx:acks-by-segment-subscription key = segment:sub
+ * idx:txn-by-deadline key = %020d(deadlineMs)
+ * idx:txn-by-final-state key = <state>:%020d(finalizedMs)
+ * </pre>
+ *
+ * <p>Numeric index keys are zero-padded to 20 digits so lexicographic
ordering matches numeric
+ * ordering — long max is 19 digits, the extra digit leaves headroom.
+ */
+public final class TxnPaths {
+
+ /** Path prefix for transaction headers. {@code /txn/<txnId>}. */
+ public static final String TXN_HEADER_PREFIX = "/txn";
+
+ /** Path prefix for transaction op log entries. {@code
/txn-op/<txnId>/<seq>}. */
+ public static final String TXN_OP_PREFIX = "/txn-op";
+
+ /** Path prefix for per-segment notification events. {@code
/txn-segment-events/<segment>-<seq>}. */
+ public static final String TXN_SEGMENT_EVENTS_PREFIX =
"/txn-segment-events";
+
+ /**
+ * Path prefix for per-(segment, subscription) notification events.
+ * {@code
/txn-subscription-events/<segment>:<sub>-<seq>}.
+ */
+ public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX =
"/txn-subscription-events";
+
+ /** Index: list write ops by segment. Key = segment. */
+ public static final String IDX_WRITES_BY_SEGMENT = "idx:writes-by-segment";
+
+ /** Index: list ack ops by (segment, subscription). Key = {@code
segment:sub}. */
+ public static final String IDX_ACKS_BY_SEGMENT_SUBSCRIPTION =
"idx:acks-by-segment-subscription";
+
+ /** Index: list open transactions by deadline. Key = zero-padded
deadlineMs. */
+ public static final String IDX_TXN_BY_DEADLINE = "idx:txn-by-deadline";
+
+ /**
+ * Index: list terminal transactions by final state and finalization time.
Key =
+ * {@code <state>:padded(finalizedMs)}.
+ */
+ public static final String IDX_TXN_BY_FINAL_STATE =
"idx:txn-by-final-state";
+
+ /** Width used when formatting long values into
lexicographically-orderable index keys. */
+ public static final int LONG_KEY_WIDTH = 20;
+
+ /**
+ * The maximum {@link #LONG_KEY_WIDTH}-wide decimal — useful as the upper
bound of a
+ * single-state range scan on the final-state index.
+ */
+ public static final String MAX_LONG_KEY = "99999999999999999999";
+
+ /** @return {@code /txn/<txnId>} — the header path for {@code txnId}. */
+ public static String header(String txnId) {
+ return TXN_HEADER_PREFIX + "/" + txnId;
+ }
+
+ /** @return {@code /txn-op/<txnId>} — the parent path under which op-log
entries are appended. */
+ public static String opParent(String txnId) {
+ return TXN_OP_PREFIX + "/" + txnId;
+ }
+
+ /**
+ * @return URL-encoded form of {@code segment} suitable for use as a
metadata-store path
+ * component, partition key, or single-field index key. Encoding is
required because segment
+ * topic names contain {@code ://} and {@code /} ({@code
segment://tenant/ns/topic/...}) —
+ * ZooKeeper rejects those in paths, and using them raw also makes
composite keys (see
+ * {@link #ackIndexKey}) ambiguous. Mirrors the convention used
elsewhere in the transaction
+ * code (e.g. {@code MLPendingAckStore}, {@code
TopicTransactionBuffer}).
+ */
+ public static String segmentKey(String segment) {
+ return Codec.encode(segment);
+ }
+
+ /** @return {@code /txn-segment-events/<encoded-segment>} — parent path
for {@code segment}'s event stream. */
+ public static String segmentEventsParent(String segment) {
+ return TXN_SEGMENT_EVENTS_PREFIX + "/" + segmentKey(segment);
+ }
+
+ /**
+ * @return {@code
/txn-subscription-events/<encoded-segment>:<encoded-sub>} — parent for
+ * (segment, sub) events. Both components are URL-encoded so the
{@code :} separator is
+ * unambiguous (segment names contain {@code :} in their URI scheme).
+ */
+ public static String subscriptionEventsParent(String segment, String
subscription) {
+ return TXN_SUBSCRIPTION_EVENTS_PREFIX + "/" + ackIndexKey(segment,
subscription);
+ }
+
+ /**
+ * @return the composite ack-index key {@code
<encoded-segment>:<encoded-sub>}. Used as both
+ * the secondary-index value and the partition key for ack
notifications. Components are
+ * URL-encoded so the separator stays unambiguous.
+ */
+ public static String ackIndexKey(String segment, String subscription) {
+ return segmentKey(segment) + ":" + Codec.encode(subscription);
+ }
+
+ /** @return {@code value} formatted as a zero-padded fixed-width decimal
for use as a range-scan index key. */
+ public static String longKey(long value) {
+ return String.format("%0" + LONG_KEY_WIDTH + "d", value);
+ }
+
+ /** @return the composite final-state index key {@code
<state>:padded(finalizedMs)}. */
+ public static String finalStateKey(TxnState state, long finalizedMs) {
+ return state.name() + ":" + longKey(finalizedMs);
+ }
+
+ private TxnPaths() {}
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnState.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnState.java
new file mode 100644
index 00000000000..e7a2cf5c088
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnState.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Transaction header states. Serialized by Jackson as the constant's name to
keep the wire format
+ * human-readable.
+ *
+ * <p>Valid transitions, applied by the v5 TC as version-conditional puts:
+ * <pre>
+ * OPEN -> COMMITTED (terminal)
+ * OPEN -> ABORTED (terminal)
+ * </pre>
+ */
+public enum TxnState {
+
+ OPEN,
+ COMMITTED,
+ ABORTED;
+
+ /** @return {@code true} if this is a terminal state (COMMITTED or
ABORTED). */
+ public boolean isTerminal() {
+ return this != OPEN;
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/Versioned.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/Versioned.java
new file mode 100644
index 00000000000..cbdada3a052
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/Versioned.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * A value with the metadata-store version it was read at. Pass {@link
#version} as the
+ * {@code IfVersionEquals} option on a subsequent {@code put} to perform an
atomic CAS update.
+ *
+ * @param value the deserialized record value
+ * @param version the version reported by the metadata store at read time
+ * @param <T> the record type
+ */
+public record Versioned<T>(T value, long version) {}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/package-info.java
new file mode 100644
index 00000000000..e6da65e52ad
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Metadata-store data layer for PIP-473 transactions: schema-versioned JSON
records
+ * ({@link org.apache.pulsar.broker.transaction.metadata.TxnHeader},
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnOp},
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnEvent}), path/index
constants
+ * ({@link org.apache.pulsar.broker.transaction.metadata.TxnPaths}), and a
typed façade over
+ * {@link org.apache.pulsar.metadata.api.MetadataStore}
+ * ({@link org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore}).
+ */
+package org.apache.pulsar.broker.transaction.metadata;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
new file mode 100644
index 00000000000..39d2e812431
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link TxnMetadataStore} on the in-memory backend. The
façade is a thin layer
+ * over {@link MetadataStore}; coverage of native-index vs fallback behaviour
for the underlying
+ * {@code scanByIndex}/{@code subscribeSequence} primitives lives in {@code
pulsar-metadata}'s
+ * cross-backend tests.
+ */
+public class TxnMetadataStoreTest {
+
+ private static MetadataStore newMemoryStore() throws Exception {
+ return MetadataStoreFactory.create("memory:local",
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ }
+
+ private static TxnHeader open(long createdMs, long timeoutMs) {
+ return new TxnHeader(TxnState.OPEN, Duration.ofMillis(timeoutMs),
+ Instant.ofEpochMilli(createdMs), null);
+ }
+
+ private static TxnHeader finalized(TxnState state, long createdMs, long
timeoutMs, long finalizedMs) {
+ return new TxnHeader(state, Duration.ofMillis(timeoutMs),
+ Instant.ofEpochMilli(createdMs),
Instant.ofEpochMilli(finalizedMs));
+ }
+
+ @Test
+ public void headerLifecycle() throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ String txnId = "tx-1";
+ TxnHeader h = open(1000L, 5000L);
+ Stat created = txn.createHeader(txnId, h).get();
+ assertThat(created.getVersion()).isZero();
+
+ // Get sees the right value + version.
+ Versioned<TxnHeader> v = txn.getHeader(txnId).get().orElseThrow();
+ assertThat(v.value()).isEqualTo(h);
+ assertThat(v.version()).isEqualTo(created.getVersion());
+
+ // CAS update with the right version succeeds; finalize the txn.
+ TxnHeader committed = finalized(TxnState.COMMITTED, 1000L, 5000L,
1500L);
+ Stat updated = txn.updateHeader(txnId, committed, v.version()).get();
+ assertThat(updated.getVersion()).isGreaterThan(v.version());
+
+ // CAS update with stale version fails.
+ assertThatThrownBy(() -> txn.updateHeader(txnId, committed,
v.version()).get())
+
.hasCauseInstanceOf(MetadataStoreException.BadVersionException.class);
+
+ // Delete then re-read returns empty.
+ txn.deleteHeader(txnId, updated.getVersion()).get();
+ assertThat(txn.getHeader(txnId).get()).isEmpty();
+ }
+
+ @Test
+ public void appendAndScanWriteOps() throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ String txnId = "tx-w";
+ // Use realistic segment URIs with the segment:// scheme so we
exercise URL encoding of
+ // path components — segment names contain "://" and "/" which would
otherwise break ZK.
+ String segA = "segment://public/default/topic/0000-7fff-0";
+ String segB = "segment://public/default/topic/8000-ffff-0";
+ TxnOp w1 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 1L);
+ TxnOp w2 = new TxnOp(TxnOpKind.WRITE, segA, null, 1L, 2L);
+ TxnOp wOther = new TxnOp(TxnOpKind.WRITE, segB, null, 2L, 1L);
+ Stat s1 = txn.appendOp(txnId, w1).get();
+ Stat s2 = txn.appendOp(txnId, w2).get();
+ txn.appendOp(txnId, wOther).get();
+ assertThat(s2.getPath()).isGreaterThan(s1.getPath());
+
+ List<TxnOp> hits = new ArrayList<>();
+ txn.listWritesBySegment(segA, collectOps(hits)).get();
+ assertThat(hits).containsExactlyInAnyOrder(w1, w2);
+ }
+
+ @Test
+ public void appendAndScanAckOps() throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ String txnId = "tx-a";
+ String segA = "segment://public/default/topic/0000-7fff-0";
+ TxnOp a1 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 5L);
+ TxnOp a2 = new TxnOp(TxnOpKind.ACK, segA, "sub/x", 1L, 6L);
+ TxnOp aOther = new TxnOp(TxnOpKind.ACK, segA, "sub/y", 1L, 7L);
+ txn.appendOp(txnId, a1).get();
+ txn.appendOp(txnId, a2).get();
+ txn.appendOp(txnId, aOther).get();
+
+ List<TxnOp> hits = new ArrayList<>();
+ txn.listAcksBySegmentSubscription(segA, "sub/x",
collectOps(hits)).get();
+ assertThat(hits).containsExactlyInAnyOrder(a1, a2);
+ }
+
+ @Test
+ public void deadlineRangeScanFiltersTerminalAndOutOfRange() throws
Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ // Three open txns with deadlines 1100, 1200, 1300 (created+timeout).
+ txn.createHeader("t1", open(100L, 1000L)).get();
+ TxnHeader open2 = open(200L, 1000L);
+ txn.createHeader("t2", open2).get();
+ txn.createHeader("t3", open(300L, 1000L)).get();
+
+ // One terminal txn — must be excluded by the deadline scan.
+ txn.createHeader("t-term", finalized(TxnState.COMMITTED, 50L, 1000L,
1150L)).get();
+
+ List<TxnHeader> hits = new ArrayList<>();
+ txn.listOpenByDeadlineRange(1150L, 1250L, collectHeaders(hits)).get();
+ assertThat(hits).containsExactly(open2);
+ }
+
+ @Test
+ public void finalStateRangeScan() throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ TxnHeader c1 = finalized(TxnState.COMMITTED, 1000L, 1000L, 1100L);
+ TxnHeader c2 = finalized(TxnState.COMMITTED, 1000L, 1000L, 1200L);
+ TxnHeader a1 = finalized(TxnState.ABORTED, 1000L, 1000L, 1150L);
+ txn.createHeader("c1", c1).get();
+ txn.createHeader("c2", c2).get();
+ txn.createHeader("a1", a1).get();
+
+ List<TxnHeader> committed = new ArrayList<>();
+ txn.listFinalizedByStateAndTimeRange(TxnState.COMMITTED, null, null,
collectHeaders(committed)).get();
+ assertThat(committed).containsExactlyInAnyOrder(c1, c2);
+
+ List<TxnHeader> recent = new ArrayList<>();
+ txn.listFinalizedByStateAndTimeRange(TxnState.COMMITTED, 1150L, 1300L,
collectHeaders(recent)).get();
+ assertThat(recent).containsExactly(c2);
+
+ List<TxnHeader> aborted = new ArrayList<>();
+ txn.listFinalizedByStateAndTimeRange(TxnState.ABORTED, null, null,
collectHeaders(aborted)).get();
+ assertThat(aborted).containsExactly(a1);
+ }
+
+ @Test
+ public void publishAndSubscribeSegmentEvents() throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ String segment = "segment://public/default/topic/0000-7fff-0";
+ ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+ @Cleanup AutoCloseable handle = txn.subscribeSegmentEvents(segment,
received::add);
+
+ TxnEvent e1 = new TxnEvent("tx-1", TxnState.COMMITTED);
+ TxnEvent e2 = new TxnEvent("tx-2", TxnState.ABORTED);
+ Stat s1 = txn.publishSegmentEvent(segment, e1).get();
+ Stat s2 = txn.publishSegmentEvent(segment, e2).get();
+
+ // Sequence-key suffixed under the segment parent.
+ assertThat(s1.getPath()).matches("\\Q" +
TxnPaths.segmentEventsParent(segment) + "\\E-\\d{20}");
+ assertThat(s2.getPath()).isGreaterThan(s1.getPath());
+
+ // Subscription receives the latest sequence key (intermediate updates
may be collapsed).
+ Awaitility.await().untilAsserted(() ->
+
assertThat(received).isNotEmpty().last().asString().isEqualTo(s2.getPath()));
+
+ // Fetched event round-trips through fromJson.
+ GetResult gr = store.get(s2.getPath()).get().orElseThrow();
+ TxnEvent fetched = TxnMetadataStore.fromJson(gr.getValue(),
TxnEvent.class);
+ assertThat(fetched).isEqualTo(e2);
+ }
+
+ @Test
+ public void publishAndSubscribeSubscriptionEvents() throws Exception {
+ @Cleanup MetadataStore store = newMemoryStore();
+ TxnMetadataStore txn = new TxnMetadataStore(store);
+
+ String segment = "segment://public/default/topic/0000-7fff-0";
+ String sub = "sub/x";
+ ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+ @Cleanup AutoCloseable handle =
txn.subscribeSubscriptionEvents(segment, sub, received::add);
+
+ TxnEvent e = new TxnEvent("tx-1", TxnState.COMMITTED);
+ Stat s = txn.publishSubscriptionEvent(segment, sub, e).get();
+
+ Awaitility.await().untilAsserted(() ->
+
assertThat(received).isNotEmpty().last().asString().isEqualTo(s.getPath()));
+ }
+
+ // ---- helpers -----------------------------------------------------------
+
+ private static ScanConsumer collectHeaders(List<TxnHeader> out) {
+ return new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ out.add(TxnMetadataStore.fromJson(r.getValue(),
TxnHeader.class));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }
+
+ private static ScanConsumer collectOps(List<TxnOp> out) {
+ return new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ out.add(TxnMetadataStore.fromJson(r.getValue(), TxnOp.class));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ };
+ }
+
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 1112a4e0975..a8e11fc6180 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -597,6 +597,10 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
getChildrenFromStore(parentPath, opts).thenCompose(children -> {
CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
for (String child : children) {
+ if (isSequenceCounterChild(child)) {
+ // Sidecar bookkeeping for SequenceKeysDeltas — not a user
record.
+ continue;
+ }
String childPath = parentPath.equals("/") ? "/" + child :
parentPath + "/" + child;
chain = chain.thenCompose(__ -> storeGet(childPath, opts))
.thenAccept(opt -> opt.ifPresent(consumer::onNext));
@@ -636,6 +640,10 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
getChildrenFromStore(scanPathPrefix, opts).thenCompose(children -> {
CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
for (String child : children) {
+ if (isSequenceCounterChild(child)) {
+ // Sidecar bookkeeping for SequenceKeysDeltas — not a user
record.
+ continue;
+ }
String childPath = scanPathPrefix.equals("/") ? "/" + child :
scanPathPrefix + "/" + child;
chain = chain.thenCompose(__ -> storeGet(childPath, opts))
.thenAccept(opt ->
opt.filter(fallbackFilter).ifPresent(consumer::onNext));
@@ -759,9 +767,28 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
/** Counter-document path for a sequence prefix. Sibling of the prefix at
the parent level. */
static String sequenceCounterPath(String prefix) {
- return prefix + "__seq_counter__";
+ return prefix + SEQUENCE_COUNTER_SUFFIX;
+ }
+
+ /**
+ * @return {@code true} when {@code childName} is a synthesized
sequence-counter sidecar — i.e.
+ * bookkeeping written by {@link #atomicIncrementSequenceCounter}, not
a user record. Scan
+ * primitives use this to filter counters out of their output on
non-native backends.
+ *
+ * <p>The match is a literal-suffix check. A user record whose final path
segment happens to
+ * end with {@value #SEQUENCE_COUNTER_SUFFIX} would also be filtered.
We accept that as
+ * acceptable: callers don't get to pick paths ending in the {@code
__seq_counter__} marker
+ * accidentally (the suffix is 16 characters of internal-only marker),
and a strict check
+ * would require either tracking active prefixes or reserving a
delimiter that the path
+ * backends forbid. The cost of a false positive is silently dropping
that record from
+ * {@code scanChildren}/{@code scanByIndex}; no data loss.
+ */
+ static boolean isSequenceCounterChild(String childName) {
+ return childName != null &&
childName.endsWith(SEQUENCE_COUNTER_SUFFIX);
}
+ private static final String SEQUENCE_COUNTER_SUFFIX = "__seq_counter__";
+
/** Format a synthesized sequence key matching Oxia's native format:
{@code prefix-{seq:%020d}-...}. */
static String formatSequenceKey(String prefix, long[] seqs) {
StringBuilder sb = new StringBuilder(prefix);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index c1f00c081c5..f612f2ea790 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -137,6 +137,10 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
if (key.indexOf('/', relStart) >= 0) {
return;
}
+ if (isSequenceCounterChild(key.substring(relStart))) {
+ // Sidecar bookkeeping for SequenceKeysDeltas — not a user
record.
+ return;
+ }
snapshot.add(new GetResult(
value.data,
new Stat(key, value.version, value.createdTimestamp,
value.modifiedTimestamp,
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 5e9024a3104..cda329aa6a4 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -438,6 +438,10 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
if (currentPath.indexOf('/', firstKey.length()) >= 0) {
continue;
}
+ if
(isSequenceCounterChild(currentPath.substring(firstKey.length()))) {
+ // Sidecar bookkeeping for SequenceKeysDeltas — not a
user record.
+ continue;
+ }
byte[] value = iterator.value();
if (value == null) {
continue;
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
index 37eea03713c..f8f1e6f1573 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
@@ -36,6 +36,7 @@ import lombok.Cleanup;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
@@ -134,6 +135,35 @@ public class MetadataStoreScanChildrenTest extends
BaseMetadataStoreTest {
assertEquals(consumer.records.get(0).getStat().getPath(), child);
}
+ @Test(dataProvider = "impl")
+ public void skipsSequenceCounterSidecars(String provider, Supplier<String>
urlSupplier) throws Exception {
+ // When SequenceKeysDeltas is used on a non-native backend, the
abstract layer writes a
+ // sidecar counter document under the same parent. scanChildren must
not surface it as a
+ // user record (its value is the binary counter encoding, not the
caller's payload).
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(
+ urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String parent = newKey();
+ String prefix = parent + "/op";
+ Set<Option> putOpts = "Oxia".equals(provider)
+ ? Set.of(new Option.SequenceKeysDeltas(List.of(1L)), new
Option.PartitionKey(prefix))
+ : Set.of(new Option.SequenceKeysDeltas(List.of(1L)));
+ store.put(prefix, "a".getBytes(StandardCharsets.UTF_8),
Optional.empty(), putOpts).join();
+ store.put(prefix, "b".getBytes(StandardCharsets.UTF_8),
Optional.empty(), putOpts).join();
+
+ CollectingConsumer consumer = new CollectingConsumer();
+ store.scanChildren(parent, consumer).join();
+ consumer.awaitDone();
+
+ // Only the two appended records; the counter sidecar (if any) is
filtered out.
+ assertEquals(consumer.records.size(), 2);
+ for (GetResult r : consumer.records) {
+ assertTrue(!r.getStat().getPath().endsWith("__seq_counter__"),
+ "counter sidecar leaked through scanChildren: " +
r.getStat().getPath());
+ }
+ }
+
@Test(dataProvider = "impl")
public void closedStoreRejectsScan(String provider, Supplier<String>
urlSupplier) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(