This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dd938e3b17a [improve][broker] PIP-473: group all txn metadata paths
under /txn/ (#25792)
dd938e3b17a is described below
commit dd938e3b17a5444fda605bf1b044b19c1fd3e760
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 19 03:22:57 2026 -0700
[improve][broker] PIP-473: group all txn metadata paths under /txn/ (#25792)
---
.../buffer/impl/MetadataTransactionBuffer.java | 8 ++--
.../broker/transaction/metadata/TxnEvent.java | 4 +-
.../transaction/metadata/TxnMetadataStore.java | 6 +--
.../pulsar/broker/transaction/metadata/TxnOp.java | 2 +-
.../broker/transaction/metadata/TxnPaths.java | 46 ++++++++++++----------
.../pendingack/impl/MetadataPendingAckStore.java | 6 +--
.../impl/MetadataPendingAckStoreProvider.java | 2 +-
7 files changed, 39 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index b6b8d0328ab..46326b91afd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -65,8 +65,8 @@ import org.apache.pulsar.metadata.api.ScanConsumer;
* <ul>
* <li><b>Publish</b> — {@link #appendBufferToTxn} reads the txn header
(cache-first), appends the
* entry to the managed ledger, then appends a {@link TxnOp} record under
- * {@code /txn-op/<txnId>-<seq>}. Both must succeed before we ack the
publisher.</li>
- * <li><b>State transitions</b> — driven by {@code
/txn-segment-events/<segment>-*} sequence
+ * {@code /txn/op/<txnId>-<seq>}. Both must succeed before we ack the
publisher.</li>
+ * <li><b>State transitions</b> — driven by {@code
/txn/segment-events/<segment>-*} sequence
* events. The events are wake-ups; the truth is the header. On each
notification we
* re-read headers for every currently-open txn and apply the resulting
state changes.</li>
* <li><b>Recovery</b> (Option C) — scan {@code idx:writes-by-segment} for
this segment, group by
@@ -136,7 +136,7 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
}
subscription = handle;
- // Scan all /txn-op records for this segment, group by txnId.
+ // Scan all /txn/op records for this segment, group by txnId.
Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
@Override
@@ -335,7 +335,7 @@ public class MetadataTransactionBuffer implements
TransactionBuffer {
}
/**
- * Delete every {@code /txn-op} record for {@code (this segment,
txnIdKey)}. Best-effort —
+ * Delete every {@code /txn/op} record for {@code (this segment,
txnIdKey)}. Best-effort —
* failures are logged and retried by the next reconcile.
*/
private void cleanupOpRecords(String txnIdKey) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
index 6e2224e5342..deea3faae89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
@@ -28,8 +28,8 @@ import lombok.NoArgsConstructor;
* for acks, under the participant's notification prefix:
*
* <pre>
- * /txn-segment-events/<segment>-<seq>
partitionKey=<segment>
- * /txn-subscription-events/<segment>:<sub>-<seq>
partitionKey=<segment>:<sub>
+ * /txn/segment-events/<segment>-<seq>
partitionKey=<segment>
+ * /txn/subscription-events/<segment>:<sub>-<seq>
partitionKey=<segment>:<sub>
* </pre>
*
* <p>Each participant subscribes to its own ordered stream via {@code
subscribeSequence} and
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index bdc0bda5fd6..7f95a9be907 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -112,7 +112,7 @@ public class TxnMetadataStore {
// ---- Op-log append -----------------------------------------------------
/**
- * Append a {@link TxnOp} under {@code /txn-op/<txnId>-<seq>}. Adds the
per-kind secondary-index
+ * Append a {@link TxnOp} under {@code /txn/op/<txnId>-<seq>}. Adds the
per-kind secondary-index
* entry — {@link TxnPaths#IDX_WRITES_BY_SEGMENT} for writes,
* {@link TxnPaths#IDX_ACKS_BY_SEGMENT_SUBSCRIPTION} for acks. Returns the
{@link Stat} whose
* {@code path} carries the generated sequence key.
@@ -158,7 +158,7 @@ public class TxnMetadataStore {
}
/**
- * Delete every {@code /txn-op} write record for {@code (segment, txnId)}
— used by the TB once
+ * Delete every {@code /txn/op} write record for {@code (segment, txnId)}
— used by the TB once
* an event tells it the txn is terminal. Path extraction follows the
layout in
* {@link TxnPaths#txnIdFromOpPath}. Best-effort: tolerates concurrent
deletions via
* {@link MetadataStore#deleteIfExists}.
@@ -168,7 +168,7 @@ public class TxnMetadataStore {
}
/**
- * Delete every {@code /txn-op} ack record for {@code (segment,
subscription, txnId)} — used by
+ * Delete every {@code /txn/op} ack record for {@code (segment,
subscription, txnId)} — used by
* the PendingAckStore once an event tells it the txn is terminal. Same
path-extraction +
* best-effort semantics as {@link #deleteWriteOpsForSegmentAndTxn}.
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
index 8941a098e82..0b5b9d2644b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
@@ -24,7 +24,7 @@ import lombok.NoArgsConstructor;
/**
* Per-write or per-ack operation record. One {@code TxnOp} is appended at
- * {@code /txn-op/<txnId>-<seq>} (with {@code partitionKey =
txnId}) every time a
+ * {@code /txn/op/<txnId>-<seq>} (with {@code partitionKey =
txnId}) every time a
* participant applies a transactional operation on a segment.
*
* <p>{@link #kind} discriminates writes from acks. {@link #subscription} and
{@link #cumulative}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index d90c7a470cf..d9dbbb6b79a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -23,14 +23,15 @@ import org.apache.pulsar.common.util.Codec;
/**
* Path templates and secondary-index names for PIP-473 transaction metadata.
*
- * <p>Layout (all under the metadata-store root). Sequence-key appends use
- * {@link org.apache.pulsar.metadata.api.Option.SequenceKeysDeltas} with delta
{@code [1]} —
- * the store generates a 20-digit zero-padded suffix appended to the prefix
with a {@code -}:
+ * <p>All transaction-related metadata lives under the {@code /txn} root,
grouped by purpose.
+ * Sequence-key appends use {@link
org.apache.pulsar.metadata.api.Option.SequenceKeysDeltas} with
+ * delta {@code [1]} — the store generates a 20-digit zero-padded suffix
appended to the prefix
+ * with a {@code -}:
* <pre>
- * /txn/<txnId> partitionKey =
txnId
- * /txn-op/<txnId>-<seq>
partitionKey = txnId
- * /txn-segment-events/<segment>-<seq>
partitionKey = segment
- * /txn-subscription-events/<segment>:<sub>-<seq>
partitionKey = segment:sub
+ * /txn/id/<txnId> partitionKey
= txnId
+ * /txn/op/<txnId>-<seq>
partitionKey = txnId
+ * /txn/segment-events/<segment>-<seq>
partitionKey = segment
+ * /txn/subscription-events/<segment>:<sub>-<seq>
partitionKey = segment:sub
* </pre>
*
* <p>Secondary indexes (registered on the {@code MetadataStore} at startup):
@@ -46,20 +47,20 @@ import org.apache.pulsar.common.util.Codec;
*/
public final class TxnPaths {
- /** Path prefix for transaction headers. {@code /txn/<txnId>}. */
- public static final String TXN_HEADER_PREFIX = "/txn";
+ /** Path prefix for transaction headers. {@code /txn/id/<txnId>}. */
+ public static final String TXN_HEADER_PREFIX = "/txn/id";
- /** Path prefix for transaction op log entries. {@code
/txn-op/<txnId>/<seq>}. */
- public static final String TXN_OP_PREFIX = "/txn-op";
+ /** Path prefix for transaction op log entries. {@code
/txn/op/<txnId>-<seq>}. */
+ public static final String TXN_OP_PREFIX = "/txn/op";
- /** Path prefix for per-segment notification events. {@code
/txn-segment-events/<segment>-<seq>}. */
- public static final String TXN_SEGMENT_EVENTS_PREFIX =
"/txn-segment-events";
+ /** Path prefix for per-segment notification events. {@code
/txn/segment-events/<segment>-<seq>}. */
+ public static final String TXN_SEGMENT_EVENTS_PREFIX =
"/txn/segment-events";
/**
* Path prefix for per-(segment, subscription) notification events.
- * {@code
/txn-subscription-events/<segment>:<sub>-<seq>}.
+ * {@code
/txn/subscription-events/<segment>:<sub>-<seq>}.
*/
- public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX =
"/txn-subscription-events";
+ public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX =
"/txn/subscription-events";
/** Index: list write ops by segment. Key = segment. */
public static final String IDX_WRITES_BY_SEGMENT = "idx:writes-by-segment";
@@ -85,12 +86,12 @@ public final class TxnPaths {
*/
public static final String MAX_LONG_KEY = "99999999999999999999";
- /** @return {@code /txn/<txnId>} — the header path for {@code txnId}. */
+ /** @return {@code /txn/id/<txnId>} — the header path for {@code txnId}. */
public static String header(String txnId) {
return TXN_HEADER_PREFIX + "/" + txnId;
}
- /** @return {@code /txn-op/<txnId>} — the parent path under which op-log
entries are appended. */
+ /** @return {@code /txn/op/<txnId>} — the parent path under which op-log
entries are appended. */
public static String opParent(String txnId) {
return TXN_OP_PREFIX + "/" + txnId;
}
@@ -107,13 +108,16 @@ public final class TxnPaths {
return Codec.encode(segment);
}
- /** @return {@code /txn-segment-events/<encoded-segment>} — parent path
for {@code segment}'s event stream. */
+ /**
+ * @return {@code /txn/segment-events/<encoded-segment>} — parent path for
{@code segment}'s
+ * event stream.
+ */
public static String segmentEventsParent(String segment) {
return TXN_SEGMENT_EVENTS_PREFIX + "/" + segmentKey(segment);
}
/**
- * @return {@code
/txn-subscription-events/<encoded-segment>:<encoded-sub>} — parent for
+ * @return {@code
/txn/subscription-events/<encoded-segment>:<encoded-sub>} — parent for
* (segment, sub) events. Both components are URL-encoded so the
{@code :} separator is
* unambiguous (segment names contain {@code :} in their URI scheme).
*/
@@ -142,8 +146,8 @@ public final class TxnPaths {
/**
* Extract the {@code txnId} key from a path under {@link #TXN_OP_PREFIX}.
The path layout is
- * {@code /txn-op/<txnId>-<paddedSeq>}; txnId itself is {@code
<most>-<least>} (one dash), so
- * the sequence dash is always the last one and the substring before it is
the txnId key.
+ * {@code /txn/op/<txnId>-<paddedSeq>}; txnId itself is {@code
<most>_<least>}, so the sequence
+ * dash is always the last one and the substring before it is the txnId
key.
*
* @return the txnId key, or {@code null} if {@code opPath} doesn't have
the expected shape
*/
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
index 546209c8276..fba4ec44361 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
@@ -51,17 +51,17 @@ import org.apache.pulsar.metadata.api.ScanConsumer;
* <p>Lifecycle:
* <ul>
* <li><b>Ack</b> — {@link #appendIndividualAck} / {@link
#appendCumulativeAck} write a
- * {@link TxnOp} record under {@code /txn-op/<txnId>-<seq>} with
+ * {@link TxnOp} record under {@code /txn/op/<txnId>-<seq>} with
* {@code kind=ACK, segment, subscription, ledgerId, entryId,
cumulative}. The associated
* {@code PendingAckHandle} keeps the in-memory state via the legacy
interface.</li>
* <li><b>Commit / Abort marks</b> — {@link #appendCommitMark} / {@link
#appendAbortMark} are
* <b>no-ops</b>. In v5 the TC owns the lifecycle: it CAS-updates the
header and publishes
* subscription events; this store consumes those events.</li>
- * <li><b>State transitions</b> — driven by {@code
/txn-subscription-events/<seg>:<sub>-*}
+ * <li><b>State transitions</b> — driven by {@code
/txn/subscription-events/<seg>:<sub>-*}
* sequence events. The events are wake-ups; the truth is the header. On
each notification
* we re-read headers for every currently-open txn this subscription is
involved in and
* call {@code PendingAckHandleImpl.commitTxn} / {@code abortTxn} for
those that have gone
- * terminal — then delete the corresponding {@code /txn-op} ack
records.</li>
+ * terminal — then delete the corresponding {@code /txn/op} ack
records.</li>
* <li><b>Recovery</b> (Option C) — on {@link #replayAsync}, subscribe to
the event stream,
* scan {@code idx:acks-by-segment-subscription} for this {@code
(segment, sub)}, group by
* {@code txnId}, fetch each header, and seed the in-memory open-txn
set; then mark the
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
index 2687c5d981a..b20fa6a5229 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
@@ -42,7 +42,7 @@ public class MetadataPendingAckStoreProvider implements
TransactionPendingAckSto
public CompletableFuture<Boolean>
checkInitializedBefore(PersistentSubscription subscription) {
// The metadata layout is global — there is no per-subscription
"initialized" log to
// check. State (open txns, leftover op records) is rebuilt on demand
from the
- // /txn-op + /txn records at replay time.
+ // /txn/op + /txn records at replay time.
return CompletableFuture.completedFuture(true);
}
}