This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dd938e3b17a [improve][broker] PIP-473: group all txn metadata paths 
under /txn/ (#25792)
dd938e3b17a is described below

commit dd938e3b17a5444fda605bf1b044b19c1fd3e760
Author: Matteo Merli <[email protected]>
AuthorDate: Tue May 19 03:22:57 2026 -0700

    [improve][broker] PIP-473: group all txn metadata paths under /txn/ (#25792)
---
 .../buffer/impl/MetadataTransactionBuffer.java     |  8 ++--
 .../broker/transaction/metadata/TxnEvent.java      |  4 +-
 .../transaction/metadata/TxnMetadataStore.java     |  6 +--
 .../pulsar/broker/transaction/metadata/TxnOp.java  |  2 +-
 .../broker/transaction/metadata/TxnPaths.java      | 46 ++++++++++++----------
 .../pendingack/impl/MetadataPendingAckStore.java   |  6 +--
 .../impl/MetadataPendingAckStoreProvider.java      |  2 +-
 7 files changed, 39 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index b6b8d0328ab..46326b91afd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -65,8 +65,8 @@ import org.apache.pulsar.metadata.api.ScanConsumer;
  * <ul>
  *   <li><b>Publish</b> — {@link #appendBufferToTxn} reads the txn header 
(cache-first), appends the
  *       entry to the managed ledger, then appends a {@link TxnOp} record under
- *       {@code /txn-op/<txnId>-<seq>}. Both must succeed before we ack the 
publisher.</li>
- *   <li><b>State transitions</b> — driven by {@code 
/txn-segment-events/<segment>-*} sequence
+ *       {@code /txn/op/<txnId>-<seq>}. Both must succeed before we ack the 
publisher.</li>
+ *   <li><b>State transitions</b> — driven by {@code 
/txn/segment-events/<segment>-*} sequence
  *       events. The events are wake-ups; the truth is the header. On each 
notification we
  *       re-read headers for every currently-open txn and apply the resulting 
state changes.</li>
  *   <li><b>Recovery</b> (Option C) — scan {@code idx:writes-by-segment} for 
this segment, group by
@@ -136,7 +136,7 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         }
         subscription = handle;
 
-        // Scan all /txn-op records for this segment, group by txnId.
+        // Scan all /txn/op records for this segment, group by txnId.
         Map<String, List<Position>> opsByTxn = new ConcurrentHashMap<>();
         txnStore.listWritesBySegment(segmentName, new ScanConsumer() {
             @Override
@@ -335,7 +335,7 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
     }
 
     /**
-     * Delete every {@code /txn-op} record for {@code (this segment, 
txnIdKey)}. Best-effort —
+     * Delete every {@code /txn/op} record for {@code (this segment, 
txnIdKey)}. Best-effort —
      * failures are logged and retried by the next reconcile.
      */
     private void cleanupOpRecords(String txnIdKey) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
index 6e2224e5342..deea3faae89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnEvent.java
@@ -28,8 +28,8 @@ import lombok.NoArgsConstructor;
  * for acks, under the participant's notification prefix:
  *
  * <pre>
- *   /txn-segment-events/&lt;segment&gt;-&lt;seq&gt;            
partitionKey=&lt;segment&gt;
- *   /txn-subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;  
partitionKey=&lt;segment&gt;:&lt;sub&gt;
+ *   /txn/segment-events/&lt;segment&gt;-&lt;seq&gt;            
partitionKey=&lt;segment&gt;
+ *   /txn/subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;  
partitionKey=&lt;segment&gt;:&lt;sub&gt;
  * </pre>
  *
  * <p>Each participant subscribes to its own ordered stream via {@code 
subscribeSequence} and
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index bdc0bda5fd6..7f95a9be907 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -112,7 +112,7 @@ public class TxnMetadataStore {
     // ---- Op-log append -----------------------------------------------------
 
     /**
-     * Append a {@link TxnOp} under {@code /txn-op/<txnId>-<seq>}. Adds the 
per-kind secondary-index
+     * Append a {@link TxnOp} under {@code /txn/op/<txnId>-<seq>}. Adds the 
per-kind secondary-index
      * entry — {@link TxnPaths#IDX_WRITES_BY_SEGMENT} for writes,
      * {@link TxnPaths#IDX_ACKS_BY_SEGMENT_SUBSCRIPTION} for acks. Returns the 
{@link Stat} whose
      * {@code path} carries the generated sequence key.
@@ -158,7 +158,7 @@ public class TxnMetadataStore {
     }
 
     /**
-     * Delete every {@code /txn-op} write record for {@code (segment, txnId)} 
— used by the TB once
+     * Delete every {@code /txn/op} write record for {@code (segment, txnId)} 
— used by the TB once
      * an event tells it the txn is terminal. Path extraction follows the 
layout in
      * {@link TxnPaths#txnIdFromOpPath}. Best-effort: tolerates concurrent 
deletions via
      * {@link MetadataStore#deleteIfExists}.
@@ -168,7 +168,7 @@ public class TxnMetadataStore {
     }
 
     /**
-     * Delete every {@code /txn-op} ack record for {@code (segment, 
subscription, txnId)} — used by
+     * Delete every {@code /txn/op} ack record for {@code (segment, 
subscription, txnId)} — used by
      * the PendingAckStore once an event tells it the txn is terminal. Same 
path-extraction +
      * best-effort semantics as {@link #deleteWriteOpsForSegmentAndTxn}.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
index 8941a098e82..0b5b9d2644b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnOp.java
@@ -24,7 +24,7 @@ import lombok.NoArgsConstructor;
 
 /**
  * Per-write or per-ack operation record. One {@code TxnOp} is appended at
- * {@code /txn-op/&lt;txnId&gt;-&lt;seq&gt;} (with {@code partitionKey = 
txnId}) every time a
+ * {@code /txn/op/&lt;txnId&gt;-&lt;seq&gt;} (with {@code partitionKey = 
txnId}) every time a
  * participant applies a transactional operation on a segment.
  *
  * <p>{@link #kind} discriminates writes from acks. {@link #subscription} and 
{@link #cumulative}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index d90c7a470cf..d9dbbb6b79a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -23,14 +23,15 @@ import org.apache.pulsar.common.util.Codec;
 /**
  * Path templates and secondary-index names for PIP-473 transaction metadata.
  *
- * <p>Layout (all under the metadata-store root). Sequence-key appends use
- * {@link org.apache.pulsar.metadata.api.Option.SequenceKeysDeltas} with delta 
{@code [1]} —
- * the store generates a 20-digit zero-padded suffix appended to the prefix 
with a {@code -}:
+ * <p>All transaction-related metadata lives under the {@code /txn} root, 
grouped by purpose.
+ * Sequence-key appends use {@link 
org.apache.pulsar.metadata.api.Option.SequenceKeysDeltas} with
+ * delta {@code [1]} — the store generates a 20-digit zero-padded suffix 
appended to the prefix
+ * with a {@code -}:
  * <pre>
- *   /txn/&lt;txnId&gt;                                       partitionKey = 
txnId
- *   /txn-op/&lt;txnId&gt;-&lt;seq&gt;                              
partitionKey = txnId
- *   /txn-segment-events/&lt;segment&gt;-&lt;seq&gt;                 
partitionKey = segment
- *   /txn-subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;      
partitionKey = segment:sub
+ *   /txn/id/&lt;txnId&gt;                                       partitionKey 
= txnId
+ *   /txn/op/&lt;txnId&gt;-&lt;seq&gt;                                     
partitionKey = txnId
+ *   /txn/segment-events/&lt;segment&gt;-&lt;seq&gt;                        
partitionKey = segment
+ *   /txn/subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;          
   partitionKey = segment:sub
  * </pre>
  *
  * <p>Secondary indexes (registered on the {@code MetadataStore} at startup):
@@ -46,20 +47,20 @@ import org.apache.pulsar.common.util.Codec;
  */
 public final class TxnPaths {
 
-    /** Path prefix for transaction headers. {@code /txn/<txnId>}. */
-    public static final String TXN_HEADER_PREFIX = "/txn";
+    /** Path prefix for transaction headers. {@code /txn/id/<txnId>}. */
+    public static final String TXN_HEADER_PREFIX = "/txn/id";
 
-    /** Path prefix for transaction op log entries. {@code 
/txn-op/<txnId>/<seq>}. */
-    public static final String TXN_OP_PREFIX = "/txn-op";
+    /** Path prefix for transaction op log entries. {@code 
/txn/op/<txnId>-<seq>}. */
+    public static final String TXN_OP_PREFIX = "/txn/op";
 
-    /** Path prefix for per-segment notification events. {@code 
/txn-segment-events/&lt;segment&gt;-&lt;seq&gt;}. */
-    public static final String TXN_SEGMENT_EVENTS_PREFIX = 
"/txn-segment-events";
+    /** Path prefix for per-segment notification events. {@code 
/txn/segment-events/&lt;segment&gt;-&lt;seq&gt;}. */
+    public static final String TXN_SEGMENT_EVENTS_PREFIX = 
"/txn/segment-events";
 
     /**
      * Path prefix for per-(segment, subscription) notification events.
-     * {@code 
/txn-subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;}.
+     * {@code 
/txn/subscription-events/&lt;segment&gt;:&lt;sub&gt;-&lt;seq&gt;}.
      */
-    public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX = 
"/txn-subscription-events";
+    public static final String TXN_SUBSCRIPTION_EVENTS_PREFIX = 
"/txn/subscription-events";
 
     /** Index: list write ops by segment. Key = segment. */
     public static final String IDX_WRITES_BY_SEGMENT = "idx:writes-by-segment";
@@ -85,12 +86,12 @@ public final class TxnPaths {
      */
     public static final String MAX_LONG_KEY = "99999999999999999999";
 
-    /** @return {@code /txn/<txnId>} — the header path for {@code txnId}. */
+    /** @return {@code /txn/id/<txnId>} — the header path for {@code txnId}. */
     public static String header(String txnId) {
         return TXN_HEADER_PREFIX + "/" + txnId;
     }
 
-    /** @return {@code /txn-op/<txnId>} — the parent path under which op-log 
entries are appended. */
+    /** @return {@code /txn/op/<txnId>} — the parent path under which op-log 
entries are appended. */
     public static String opParent(String txnId) {
         return TXN_OP_PREFIX + "/" + txnId;
     }
@@ -107,13 +108,16 @@ public final class TxnPaths {
         return Codec.encode(segment);
     }
 
-    /** @return {@code /txn-segment-events/<encoded-segment>} — parent path 
for {@code segment}'s event stream. */
+    /**
+     * @return {@code /txn/segment-events/<encoded-segment>} — parent path for 
{@code segment}'s
+     *     event stream.
+     */
     public static String segmentEventsParent(String segment) {
         return TXN_SEGMENT_EVENTS_PREFIX + "/" + segmentKey(segment);
     }
 
     /**
-     * @return {@code 
/txn-subscription-events/<encoded-segment>:<encoded-sub>} — parent for
+     * @return {@code 
/txn/subscription-events/<encoded-segment>:<encoded-sub>} — parent for
      *     (segment, sub) events. Both components are URL-encoded so the 
{@code :} separator is
      *     unambiguous (segment names contain {@code :} in their URI scheme).
      */
@@ -142,8 +146,8 @@ public final class TxnPaths {
 
     /**
      * Extract the {@code txnId} key from a path under {@link #TXN_OP_PREFIX}. 
The path layout is
-     * {@code /txn-op/<txnId>-<paddedSeq>}; txnId itself is {@code 
<most>-<least>} (one dash), so
-     * the sequence dash is always the last one and the substring before it is 
the txnId key.
+     * {@code /txn/op/<txnId>-<paddedSeq>}; txnId itself is {@code 
<most>_<least>}, so the sequence
+     * dash is always the last one and the substring before it is the txnId 
key.
      *
      * @return the txnId key, or {@code null} if {@code opPath} doesn't have 
the expected shape
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
index 546209c8276..fba4ec44361 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
@@ -51,17 +51,17 @@ import org.apache.pulsar.metadata.api.ScanConsumer;
  * <p>Lifecycle:
  * <ul>
  *   <li><b>Ack</b> — {@link #appendIndividualAck} / {@link 
#appendCumulativeAck} write a
- *       {@link TxnOp} record under {@code /txn-op/<txnId>-<seq>} with
+ *       {@link TxnOp} record under {@code /txn/op/<txnId>-<seq>} with
  *       {@code kind=ACK, segment, subscription, ledgerId, entryId, 
cumulative}. The associated
  *       {@code PendingAckHandle} keeps the in-memory state via the legacy 
interface.</li>
  *   <li><b>Commit / Abort marks</b> — {@link #appendCommitMark} / {@link 
#appendAbortMark} are
  *       <b>no-ops</b>. In v5 the TC owns the lifecycle: it CAS-updates the 
header and publishes
  *       subscription events; this store consumes those events.</li>
- *   <li><b>State transitions</b> — driven by {@code 
/txn-subscription-events/&lt;seg&gt;:&lt;sub&gt;-*}
+ *   <li><b>State transitions</b> — driven by {@code 
/txn/subscription-events/&lt;seg&gt;:&lt;sub&gt;-*}
  *       sequence events. The events are wake-ups; the truth is the header. On 
each notification
  *       we re-read headers for every currently-open txn this subscription is 
involved in and
  *       call {@code PendingAckHandleImpl.commitTxn} / {@code abortTxn} for 
those that have gone
- *       terminal — then delete the corresponding {@code /txn-op} ack 
records.</li>
+ *       terminal — then delete the corresponding {@code /txn/op} ack 
records.</li>
  *   <li><b>Recovery</b> (Option C) — on {@link #replayAsync}, subscribe to 
the event stream,
  *       scan {@code idx:acks-by-segment-subscription} for this {@code 
(segment, sub)}, group by
  *       {@code txnId}, fetch each header, and seed the in-memory open-txn 
set; then mark the
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
index 2687c5d981a..b20fa6a5229 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreProvider.java
@@ -42,7 +42,7 @@ public class MetadataPendingAckStoreProvider implements 
TransactionPendingAckSto
     public CompletableFuture<Boolean> 
checkInitializedBefore(PersistentSubscription subscription) {
         // The metadata layout is global — there is no per-subscription 
"initialized" log to
         // check. State (open txns, leftover op records) is rebuilt on demand 
from the
-        // /txn-op + /txn records at replay time.
+        // /txn/op + /txn records at replay time.
         return CompletableFuture.completedFuture(true);
     }
 }

Reply via email to