This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5991f6e8cea [improve][broker] PIP-473 P5.1: metadata-driven 
transaction coordinator (NEW_TXN / END_TXN) (#25863)
5991f6e8cea is described below

commit 5991f6e8ceadc1ae2024966e11c44f57cfa912dc
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 13:05:03 2026 -0700

    [improve][broker] PIP-473 P5.1: metadata-driven transaction coordinator 
(NEW_TXN / END_TXN) (#25863)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +
 .../org/apache/pulsar/broker/PulsarService.java    |   6 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 105 ++++++++-
 .../coordinator/v5/TransactionCoordinatorV5.java   | 251 +++++++++++++++++++++
 .../transaction/coordinator/v5/package-info.java   |  27 +++
 .../broker/transaction/metadata/TcSequence.java    |  30 +++
 .../broker/transaction/metadata/TxnHeader.java     |   8 +
 .../transaction/metadata/TxnMetadataStore.java     |  51 ++++-
 .../broker/transaction/metadata/TxnPaths.java      |  15 ++
 .../buffer/impl/MetadataTransactionBufferTest.java |   8 +-
 .../v5/TransactionCoordinatorV5Test.java           | 243 ++++++++++++++++++++
 .../transaction/metadata/TxnMetadataStoreTest.java |   4 +-
 .../impl/MetadataPendingAckStoreTest.java          |   8 +-
 13 files changed, 753 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 96f2908a74d..e5ecea2c44b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3795,6 +3795,17 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean transactionCoordinatorEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "Enable the metadata-driven transaction coordinator used by 
scalable topics."
+                    + " When true, wire commands (NEW_TXN / END_TXN / etc.) 
are served by the"
+                    + " metadata-store-backed coordinator instead of the 
legacy"
+                    + " TransactionMetadataStoreService. Requires 
transactionCoordinatorEnabled"
+                    + " = true, and must be enabled together with the 
scalable-topic transaction"
+                    + " buffer and pending-ack store providers."
+    )
+    private boolean transactionCoordinatorScalableTopicsEnabled = false;
+
     @FieldContext(
         category = CATEGORY_TRANSACTION,
             doc = "Class name for transaction metadata store provider"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6a46fb3760d..1af1d3a39f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -134,6 +134,7 @@ import 
org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import 
org.apache.pulsar.broker.transaction.coordinator.v5.TransactionCoordinatorV5;
 import 
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
 import org.apache.pulsar.broker.validator.BindAddressValidator;
@@ -287,6 +288,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     private OpenTelemetryTransactionPendingAckStoreStats 
openTelemetryTransactionPendingAckStoreStats;
 
     private TransactionMetadataStoreService transactionMetadataStoreService;
+    private TransactionCoordinatorV5 transactionCoordinatorV5;
     private TransactionBufferProvider transactionBufferProvider;
     private TransactionBufferClient transactionBufferClient;
     private HashedWheelTimer transactionTimer;
@@ -1043,6 +1045,10 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
                         transactionBufferClient, transactionTimer);
 
+                if (config.isTransactionCoordinatorScalableTopicsEnabled()) {
+                    transactionCoordinatorV5 = new 
TransactionCoordinatorV5(this);
+                }
+
                 transactionBufferProvider = TransactionBufferProvider
                         
.newProvider(config.getTransactionBufferProviderClassName());
                 transactionPendingAckStoreProvider = 
TransactionPendingAckStoreProvider
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fa7e919880f..11ce47e06db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3348,6 +3348,21 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+            
service.pulsar().getTransactionCoordinatorV5().handleClientConnect(tcId)
+                    .whenComplete((__, e) -> {
+                        if (e == null) {
+                            
commandSender.sendTcClientConnectResponse(requestId);
+                        } else {
+                            log.error().attr("requestId", 
requestId).attr("tcId", tcId).exception(e)
+                                    .log("v5 TC client connect failed");
+                            
commandSender.sendTcClientConnectResponse(requestId,
+                                    
BrokerServiceException.getClientErrorCode(e), e.getMessage());
+                        }
+                    });
+            return;
+        }
+
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
@@ -3414,6 +3429,22 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+            final String v5Owner = getPrincipal();
+            service.pulsar().getTransactionCoordinatorV5()
+                    .newTransaction(tcId, command.getTxnTtlSeconds() * 1000L, 
v5Owner)
+                    .whenComplete((txnId, e) -> {
+                        if (e == null) {
+                            commandSender.sendNewTxnResponse(requestId, txnId, 
tcId.getId());
+                        } else {
+                            Throwable cause = handleTxnException(e, 
BaseCommand.Type.NEW_TXN.name(), requestId);
+                            commandSender.sendNewTxnErrorResponse(requestId, 
tcId.getId(),
+                                    
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
+                        }
+                    });
+            return;
+        }
+
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
         final String owner = getPrincipal();
@@ -3465,6 +3496,28 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+            // v5: TC doesn't need pre-registration — participants advertise 
themselves by writing
+            // /txn/op records when they actually apply ops. Still verify 
ownership before acking,
+            // matching the legacy authorization surface.
+            verifyTxnOwnership(txnID)
+                    .thenCompose(isOwner -> isOwner ? 
CompletableFuture.<Void>completedFuture(null)
+                            : failedFutureTxnNotOwned(txnID))
+                    .whenComplete((v, ex) -> {
+                        if (ex == null) {
+                            
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                        } else {
+                            Throwable cause = handleTxnException(ex,
+                                    
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
+                            
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
+                                    
BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+                        }
+                    });
+            return;
+        }
+
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
         verifyTxnOwnership(txnID)
@@ -3525,6 +3578,27 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+            verifyTxnOwnership(txnID)
+                    .thenCompose(isOwner -> {
+                        if (!isOwner) {
+                            return failedFutureTxnNotOwned(txnID);
+                        }
+                        return service.pulsar().getTransactionCoordinatorV5()
+                                .endTransaction(txnID, txnAction);
+                    })
+                    .whenComplete((__, e) -> {
+                        if (e == null) {
+                            commandSender.sendEndTxnResponse(requestId, txnID, 
txnAction);
+                        } else {
+                            Throwable cause = handleTxnException(e, 
BaseCommand.Type.END_TXN.name(), requestId);
+                            commandSender.sendEndTxnErrorResponse(requestId, 
txnID,
+                                    
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
+                        }
+                    });
+            return;
+        }
+
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
@@ -3569,8 +3643,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
         assert ctx.executor().inEventLoop();
-        return service.pulsar().getTransactionMetadataStoreService()
-                .verifyTxnOwnership(txnID, getPrincipal())
+        CompletableFuture<Boolean> ownerCheck =
+                
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()
+                        ? service.pulsar().getTransactionCoordinatorV5()
+                                .verifyTxnOwnership(txnID, getPrincipal())
+                        : service.pulsar().getTransactionMetadataStoreService()
+                                .verifyTxnOwnership(txnID, getPrincipal());
+        return ownerCheck
                 .thenComposeAsync(isOwner -> {
                     if (isOwner) {
                         return CompletableFuture.completedFuture(true);
@@ -3839,6 +3918,28 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
+        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+            // v5: TC doesn't need pre-registration — participants advertise 
themselves by writing
+            // /txn/op records when they actually apply ops. Still verify 
ownership before acking,
+            // matching the legacy authorization surface.
+            verifyTxnOwnership(txnID)
+                    .thenCompose(isOwner -> isOwner ? 
CompletableFuture.<Void>completedFuture(null)
+                            : failedFutureTxnNotOwned(txnID))
+                    .whenComplete((v, ex) -> {
+                        if (ex == null) {
+                            
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                        } else {
+                            Throwable cause = handleTxnException(ex,
+                                    
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
+                            
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                                    txnID.getLeastSigBits(), 
txnID.getMostSigBits(),
+                                    
BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+                        }
+                    });
+            return;
+        }
+
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
new file mode 100644
index 00000000000..1d3df7071ec
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import 
org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
+import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.broker.transaction.metadata.Versioned;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+
+/**
+ * PIP-473 v5 transaction coordinator — broker-side service.
+ *
+ * <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code 
N} iff it owns
+ * partition {@code N} of {@code 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
+ * leader-election mechanism the legacy {@code 
TransactionMetadataStoreService} uses; reusing
+ * it keeps the client-side discovery surface unchanged.
+ *
+ * <p>Wire commands handled (routed by {@code ServerCnx} when
+ * {@code transactionCoordinatorScalableTopicsEnabled} is on):
+ * <ul>
+ *   <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li>
+ *   <li>{@code NEW_TXN} → {@link #newTransaction}</li>
+ *   <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — 
no-ops per PIP; the v5
+ *       participants advertise themselves by writing {@code /txn/op} records, 
so the TC doesn't
+ *       need a pre-registration step.</li>
+ *   <li>{@code END_TXN} → {@link #endTransaction}</li>
+ * </ul>
+ *
+ * <p>{@code endTransaction} CAS-updates the header to the terminal state, 
enumerates
+ * {@code /txn/op/<txnId>-*} via {@link TxnPaths#IDX_OPS_BY_TXN}, and 
publishes one
+ * segment-event per affected segment + one subscription-event per affected
+ * {@code (segment, subscription)} pair. The fan-out is metadata-store writes 
(not RPCs) and
+ * is bounded by the txn's participant count.
+ *
+ * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — 
those land in
+ * P5.2.
+ */
+@CustomLog
+public class TransactionCoordinatorV5 {
+
+    private final PulsarService pulsar;
+    private final TxnMetadataStore txnStore;
+
+    public TransactionCoordinatorV5(PulsarService pulsar) {
+        this.pulsar = pulsar;
+        this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+    }
+
+    // ---- TC client connect ------------------------------------------------
+
+    /**
+     * Verify this broker is the leader for {@code tcId} (owns the 
corresponding partition of
+     * {@code transaction_coordinator_assign}). Mirrors the ownership check 
the legacy
+     * {@code TransactionMetadataStoreService.handleTcClientConnect} performs 
— the same
+     * topic-ownership mechanism serves as our leader-election surface.
+     */
+    public CompletableFuture<Void> 
handleClientConnect(TransactionCoordinatorID tcId) {
+        String assignPartition = 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+                .getPartition((int) tcId.getId()).toString();
+        return 
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition);
+    }
+
+    // ---- newTransaction ---------------------------------------------------
+
+    /**
+     * Create a new transaction header at {@code /txn/id/<tcId>_<seq>}. The 
{@code leastSigBits}
+     * is drawn from the per-tcId monotonic sequence counter ({@link 
TxnMetadataStore#nextTxnSequence})
+     * so txnIds are never reused — the participant-side aborted-set is keyed 
by txnId, and reuse
+     * would break that.
+     */
+    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID 
tcId, long timeoutInMillis,
+                                                   String owner) {
+        return txnStore.nextTxnSequence(tcId.getId()).thenCompose(seq -> {
+            TxnID txnId = new TxnID(tcId.getId(), seq);
+            TxnHeader header = new TxnHeader(TxnState.OPEN,
+                    Duration.ofMillis(timeoutInMillis), Instant.now(), null, 
owner);
+            return txnStore.createHeader(TxnIds.toKey(txnId), 
header).thenApply(stat -> txnId);
+        });
+    }
+
+    /**
+     * Verify {@code principal} owns {@code txnId}. Mirrors the legacy 
coordinator's
+     * {@code TransactionMetadataStoreService.verifyTxnOwnership} semantics: a 
{@code null} stored
+     * owner (authentication disabled, or a legacy txn) is always allowed; 
otherwise the principal
+     * must match. The superuser fallback lives in {@code 
ServerCnx#verifyTxnOwnership}, same as the
+     * legacy path. A missing header resolves to {@code false} (not owned).
+     */
+    public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnId, String 
principal) {
+        return txnStore.getHeader(TxnIds.toKey(txnId)).thenApply(opt -> opt
+                .map(v -> {
+                    String owner = v.value().getOwner();
+                    return owner == null || owner.equals(principal);
+                })
+                .orElse(false));
+    }
+
+    // ---- addPartition / addSubscription (no-op in v5) ---------------------
+
+    /**
+     * No-op per PIP-473 — in v5, participants advertise themselves by writing 
{@code /txn/op}
+     * records when they actually apply ops. The pre-registration step is 
unnecessary.
+     */
+    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, 
List<String> partitions) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /** No-op (see {@link #addProducedPartitionToTxn}). */
+    public CompletableFuture<Void> addAckedSubscriptionToTxn(TxnID txnId,
+                                                             
List<TransactionSubscription> subscriptions) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    // ---- endTransaction ---------------------------------------------------
+
+    /**
+     * Finalise a transaction: CAS the header to {@code COMMITTED}/{@code 
ABORTED}, enumerate
+     * the txn's participants via {@link TxnMetadataStore#listOpsByTxn}, and 
publish one
+     * segment-event per affected segment and one subscription-event per 
affected
+     * {@code (segment, subscription)} pair. Idempotent against retries — a 
header already in
+     * the requested terminal state short-circuits without republishing.
+     */
+    public CompletableFuture<Void> endTransaction(TxnID txnId, int txnAction) {
+        TxnState newState = newStateFor(txnAction);
+        if (newState == null) {
+            return FutureUtil.failedFuture(
+                    new 
TransactionCoordinatorException.UnsupportedTxnActionException(txnId, 
txnAction));
+        }
+        String txnIdKey = TxnIds.toKey(txnId);
+        return txnStore.getHeader(txnIdKey).thenCompose(opt -> {
+            if (opt.isEmpty()) {
+                return FutureUtil.failedFuture(
+                        new CoordinatorException.TransactionNotFoundException(
+                                "Transaction not found: " + txnId));
+            }
+            Versioned<TxnHeader> v = opt.get();
+            TxnHeader current = v.value();
+            if (current.getState() == newState) {
+                // Idempotent retry — header already terminal-and-matching. 
Re-drive the fan-out
+                // rather than short-circuiting: if a previous attempt CAS'd 
the header but failed
+                // (partially or fully) before publishing, the only way a 
participant ever learns
+                // the decision is the event, and a terminal header gives the 
reconcile path nothing
+                // to act on. Re-publishing is safe — participants key on 
(txnId, decision) and the
+                // decision can't change once terminal.
+                return fanOutEvents(txnId, txnIdKey, newState);
+            }
+            if (current.getState() != TxnState.OPEN) {
+                return FutureUtil.failedFuture(
+                        new CoordinatorException.InvalidTxnStatusException(
+                                "Transaction " + txnId + " is " + 
current.getState()
+                                        + ", cannot transition to " + 
newState));
+            }
+            TxnHeader updated = new TxnHeader(newState, current.getTimeout(),
+                    current.getCreatedAt(), Instant.now(), current.getOwner());
+            return txnStore.updateHeader(txnIdKey, updated, v.version())
+                    .thenCompose(stat -> fanOutEvents(txnId, txnIdKey, 
newState));
+        });
+    }
+
+    private static TxnState newStateFor(int txnAction) {
+        if (txnAction == TxnAction.COMMIT_VALUE) {
+            return TxnState.COMMITTED;
+        } else if (txnAction == TxnAction.ABORT_VALUE) {
+            return TxnState.ABORTED;
+        }
+        return null;
+    }
+
+    /**
+     * Enumerate {@code /txn/op} via {@link TxnMetadataStore#listOpsByTxn}, 
group by participant,
+     * and publish one event per participant. Writes are independent so we 
fire them in parallel.
+     */
+    private CompletableFuture<Void> fanOutEvents(TxnID txnId, String txnIdKey, 
TxnState decision) {
+        Set<String> writeSegments = ConcurrentHashMap.newKeySet();
+        Set<AckParticipant> ackParticipants = ConcurrentHashMap.newKeySet();
+        return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
+            @Override
+            public void onNext(GetResult r) {
+                TxnOp op = TxnMetadataStore.fromJson(r.getValue(), 
TxnOp.class);
+                if (op.getKind() == TxnOpKind.WRITE) {
+                    writeSegments.add(op.getSegment());
+                } else if (op.getKind() == TxnOpKind.ACK && 
op.getSubscription() != null) {
+                    ackParticipants.add(new AckParticipant(op.getSegment(), 
op.getSubscription()));
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                log.warn().attr("txnId", txnId).exception(throwable)
+                        .log("endTxn participant enumeration encountered an 
error");
+            }
+
+            @Override
+            public void onCompleted() {
+            }
+        }).thenCompose(__ -> {
+            TxnEvent event = new TxnEvent(txnIdKey, decision);
+            CompletableFuture<?>[] publishes = new CompletableFuture<?>[
+                    writeSegments.size() + ackParticipants.size()];
+            int i = 0;
+            for (String segment : writeSegments) {
+                publishes[i++] = txnStore.publishSegmentEvent(segment, event);
+            }
+            for (AckParticipant p : ackParticipants) {
+                publishes[i++] = 
txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event);
+            }
+            return CompletableFuture.allOf(publishes);
+        });
+    }
+
+    /** A {@code (segment, subscription)} ack participant; keys the ack 
fan-out de-dup set. */
+    private record AckParticipant(String segment, String subscription) {
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
new file mode 100644
index 00000000000..77f0b4c5164
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * PIP-473 v5 transaction coordinator: broker-side service that serves {@code 
NEW_TXN} /
+ * {@code END_TXN} wire commands against the metadata-store-backed
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore}. 
Replaces the
+ * legacy {@code TransactionMetadataStoreService} when
+ * {@code transactionCoordinatorScalableTopicsEnabled} is on.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
new file mode 100644
index 00000000000..f3fb1ea159f
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-tcId txnId-sequence counter. Stored at {@code /txn/tc-seq/<tcId>} and 
CAS-incremented
+ * by the v5 TC for each {@code newTxn}. The yielded value becomes a 
transaction's
+ * {@code leastSigBits} — monotonic per tcId so {@link 
org.apache.pulsar.client.api.transaction.TxnID}
+ * is never reused (avoiding aborted-set key collisions in the 
participant-side visibility state).
+ *
+ * @param next the next {@code leastSigBits} to assign (the most recently 
issued value + 1)
+ */
+public record TcSequence(long next) {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
index 381b6bbfbbc..11043eafe6c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
@@ -49,4 +49,12 @@ public class TxnHeader {
      * the CAS that flips {@link #state}. {@code null} while OPEN.
      */
     private Instant finalizedAt;
+
+    /**
+     * Principal that opened the transaction (from {@code NEW_TXN}). Used to 
authorize subsequent
+     * {@code END_TXN} / add-participant commands — only the owner (or a 
superuser) may operate on
+     * the txn. {@code null} when authentication is disabled, mirroring the 
legacy coordinator's
+     * "null owner ⟹ allowed" semantics.
+     */
+    private String owner;
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 323473ac8b1..ebf300b7725 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.function.Consumer;
 import lombok.CustomLog;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -118,13 +119,17 @@ public class TxnMetadataStore {
      * {@code path} carries the generated sequence key.
      */
     public CompletableFuture<Stat> appendOp(String txnId, TxnOp op) {
-        Option.SecondaryIndex idx = switch (op.getKind()) {
+        Option.SecondaryIndex participantIdx = switch (op.getKind()) {
             case WRITE -> new 
Option.SecondaryIndex(TxnPaths.IDX_WRITES_BY_SEGMENT,
                     TxnPaths.segmentKey(op.getSegment()));
             case ACK -> new 
Option.SecondaryIndex(TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
                     TxnPaths.ackIndexKey(op.getSegment(), 
op.getSubscription()));
         };
-        Set<Option> opts = Set.of(new Option.PartitionKey(txnId), idx, 
APPEND_DELTAS);
+        // Also index by txnId so the TC's endTxn can enumerate this txn's ops 
without scanning
+        // the whole /txn/op namespace.
+        Option.SecondaryIndex byTxnIdx = new 
Option.SecondaryIndex(TxnPaths.IDX_OPS_BY_TXN, txnId);
+        Set<Option> opts = Set.of(new Option.PartitionKey(txnId), 
participantIdx, byTxnIdx,
+                APPEND_DELTAS);
         return store.put(TxnPaths.opParent(txnId), toJson(op), 
Optional.empty(), opts);
     }
 
@@ -157,6 +162,18 @@ public class TxnMetadataStore {
                 consumer);
     }
 
+    /**
+     * Stream all {@code /txn/op} records for {@code txnId} via the {@link 
TxnPaths#IDX_OPS_BY_TXN}
+     * index. Used by the v5 TC at end-txn time to enumerate participants — 
distinct segments for
+     * writes, distinct {@code (segment, subscription)} pairs for acks.
+     */
+    public CompletableFuture<Void> listOpsByTxn(String txnId, ScanConsumer 
consumer) {
+        return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, 
TxnPaths.IDX_OPS_BY_TXN,
+                txnId, txnId,
+                gr -> 
txnId.equals(TxnPaths.txnIdFromOpPath(gr.getStat().getPath())),
+                consumer);
+    }
+
     /**
      * Delete every {@code /txn/op} write record for {@code (segment, txnId)} 
— used by the TB once
      * an event tells it the txn is terminal. Path extraction follows the 
layout in
@@ -381,6 +398,36 @@ public class TxnMetadataStore {
         return store.deleteIfExists(TxnPaths.segmentWatermarkPath(segment), 
Optional.empty(), opts);
     }
 
+    // ---- TC sequence counter ----------------------------------------------
+
+    /**
+     * Atomically increment the per-tc txnId sequence counter and return the 
assigned value.
+     * Retries on {@link MetadataStoreException.BadVersionException} so 
concurrent callers
+     * (within a TC partition's broker) serialise correctly. The returned 
value becomes a
+     * txn's {@code leastSigBits}; monotonic per {@code tcId} ⟹ no txnId reuse.
+     */
+    public CompletableFuture<Long> nextTxnSequence(long tcId) {
+        String path = TxnPaths.tcSequencePath(tcId);
+        return store.get(path).thenCompose(opt -> {
+            long current = opt.map(gr -> fromJson(gr.getValue(), 
TcSequence.class).next() - 1).orElse(-1L);
+            long assigned = current + 1;
+            TcSequence updated = new TcSequence(assigned + 1);
+            Optional<Long> expectedVersion = opt.map(gr -> 
gr.getStat().getVersion())
+                    .map(Optional::of).orElse(Optional.of(-1L));
+            return store.put(path, toJson(updated), expectedVersion, Set.of())
+                    .thenApply(stat -> assigned)
+                    .exceptionallyCompose(ex -> {
+                        Throwable cause = ex instanceof CompletionException && 
ex.getCause() != null
+                                ? ex.getCause() : ex;
+                        if (cause instanceof 
MetadataStoreException.BadVersionException) {
+                            // Concurrent write — retry.
+                            return nextTxnSequence(tcId);
+                        }
+                        return FutureUtil.failedFuture(cause);
+                    });
+        });
+    }
+
     // ---- JSON helpers ------------------------------------------------------
 
     /** @return UTF-8 JSON bytes for {@code value}. Wraps any I/O error as 
{@link CompletionException}. */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index f68d8d73bd1..7e25ebb33ba 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -92,6 +92,21 @@ public final class TxnPaths {
      */
     public static final String IDX_TXN_ABORTED_BY_POSITION = 
"idx:txn-aborted-by-position";
 
+    /**
+     * Index: list all {@code /txn/op} records for a txn. Key = {@code txnId}. 
Used by the v5 TC
+     * at end-txn time to enumerate the txn's participants without scanning 
the whole
+     * {@code /txn/op} namespace.
+     */
+    public static final String IDX_OPS_BY_TXN = "idx:ops-by-txn";
+
+    /** Path prefix for per-tcId txnId-sequence counter documents. */
+    public static final String TXN_TC_SEQ_PREFIX = "/txn/tc-seq";
+
+    /** @return {@code /txn/tc-seq/<tcId>} — the txnId-sequence counter doc 
for {@code tcId}. */
+    public static String tcSequencePath(long tcId) {
+        return TXN_TC_SEQ_PREFIX + "/" + tcId;
+    }
+
     /** Width used when formatting long values into 
lexicographically-orderable index keys. */
     public static final int LONG_KEY_WIDTH = 20;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 804e12d1bd5..8234eea3832 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -163,7 +163,7 @@ public class MetadataTransactionBufferTest {
         // Pre-set header to COMMITTED — txn is terminal before any append.
         Stat created = txnStore.createHeader(TxnIds.toKey(txnId),
                 new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
-                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
+                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000), null)).get();
         assertThat(created.getVersion()).isZero();
 
         MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic, 
txnStore);
@@ -393,14 +393,14 @@ public class MetadataTransactionBufferTest {
     private void createOpenHeader(TxnID txnId) throws Exception {
         txnStore.createHeader(TxnIds.toKey(txnId),
                 new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
-                        Instant.ofEpochMilli(1000), null)).get();
+                        Instant.ofEpochMilli(1000), null, null)).get();
     }
 
     private void commitTxn(TxnID txnId) throws Exception {
         var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
         var h = v.value();
         txnStore.updateHeader(TxnIds.toKey(txnId),
-                new TxnHeader(TxnState.COMMITTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                new TxnHeader(TxnState.COMMITTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now(), null),
                 v.version()).get();
     }
 
@@ -408,7 +408,7 @@ public class MetadataTransactionBufferTest {
         var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
         var h = v.value();
         txnStore.updateHeader(TxnIds.toKey(txnId),
-                new TxnHeader(TxnState.ABORTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                new TxnHeader(TxnState.ABORTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now(), null),
                 v.version()).get();
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
new file mode 100644
index 00000000000..38150874db9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link TransactionCoordinatorV5} against the in-memory 
metadata store. The
+ * {@link PulsarService} / {@link BrokerService} dependencies are mocked just 
enough for the TC
+ * to construct and to satisfy {@code handleClientConnect}'s ownership check.
+ */
+public class TransactionCoordinatorV5Test {
+
+    private static final TransactionCoordinatorID TC_ID = 
TransactionCoordinatorID.get(0L);
+
+    private MetadataStoreExtended store;
+    private TxnMetadataStore txnStore;
+    private PulsarService pulsar;
+    private TransactionCoordinatorV5 tc;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = MetadataStoreExtended.create("memory:local",
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+        txnStore = new TxnMetadataStore(store);
+        pulsar = mock(PulsarService.class);
+        when(pulsar.getLocalMetadataStore()).thenReturn(store);
+        BrokerService brokerService = mock(BrokerService.class);
+        when(pulsar.getBrokerService()).thenReturn(brokerService);
+        // Default: owned. Tests that want to assert the not-owned path can 
override.
+        
when(brokerService.checkTopicNsOwnership(any())).thenReturn(CompletableFuture.completedFuture(null));
+        tc = new TransactionCoordinatorV5(pulsar);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void newTransaction_createsHeader_withSequentialLeastBits() throws 
Exception {
+        TxnID t1 = tc.newTransaction(TC_ID, 60_000L, "owner-a").get();
+        TxnID t2 = tc.newTransaction(TC_ID, 60_000L, "owner-a").get();
+        TxnID t3 = tc.newTransaction(TC_ID, 60_000L, "owner-a").get();
+
+        // mostSigBits is the tcId.
+        assertThat(t1.getMostSigBits()).isEqualTo(TC_ID.getId());
+        // leastSigBits is monotonic per tcId.
+        assertThat(t1.getLeastSigBits()).isLessThan(t2.getLeastSigBits());
+        assertThat(t2.getLeastSigBits()).isLessThan(t3.getLeastSigBits());
+
+        // Header lives at /txn/id/<TxnIds.toKey> with state = OPEN and the 
opening principal.
+        var header = txnStore.getHeader(TxnIds.toKey(t1)).get().orElseThrow();
+        assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+        assertThat(header.value().getOwner()).isEqualTo("owner-a");
+    }
+
+    @Test
+    public void endTransaction_commit_casesHeaderAndFansOutSegmentEvent() 
throws Exception {
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        String txnIdKey = TxnIds.toKey(txnId);
+        String segment = "segment://public/default/topic/0000-ffff-0";
+
+        // Simulate a participant appending an op for the segment.
+        txnStore.appendOp(txnIdKey,
+                new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+        // Subscribe to segment events before endTxn — receive the published 
event.
+        List<String> received = new ArrayList<>();
+        try (var sub = txnStore.subscribeSegmentEvents(segment, 
received::add)) {
+            tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+            // Header is now COMMITTED.
+            var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+            
assertThat(header.value().getState()).isEqualTo(TxnState.COMMITTED);
+            // A segment-event for this segment was published.
+            Awaitility.await().untilAsserted(() -> 
assertThat(received).isNotEmpty());
+        }
+    }
+
+    @Test
+    public void endTransaction_abort_casesHeaderAndFansOutEvents() throws 
Exception {
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        String txnIdKey = TxnIds.toKey(txnId);
+        String segment = "segment://public/default/topic/0000-ffff-0";
+        String subscription = "sub-x";
+
+        // Two participants: a WRITE on the segment and an ACK on (segment, 
sub).
+        txnStore.appendOp(txnIdKey,
+                new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+        txnStore.appendOp(txnIdKey,
+                new TxnOp(TxnOpKind.ACK, segment, subscription, 5L, 2L, 
false)).get();
+
+        List<String> segReceived = new ArrayList<>();
+        List<String> subReceived = new ArrayList<>();
+        try (var s1 = txnStore.subscribeSegmentEvents(segment, 
segReceived::add);
+             var s2 = txnStore.subscribeSubscriptionEvents(segment, 
subscription, subReceived::add)) {
+            tc.endTransaction(txnId, TxnAction.ABORT_VALUE).get();
+
+            // Header is ABORTED.
+            var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+            assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+
+            // Both participant streams received an event.
+            Awaitility.await().untilAsserted(() -> 
assertThat(segReceived).isNotEmpty());
+            Awaitility.await().untilAsserted(() -> 
assertThat(subReceived).isNotEmpty());
+
+            // The published event's decision matches.
+            byte[] bytes = store.get(segReceived.get(segReceived.size() - 
1)).get().orElseThrow().getValue();
+            TxnEvent event = TxnMetadataStore.fromJson(bytes, TxnEvent.class);
+            assertThat(event.getDecision()).isEqualTo(TxnState.ABORTED);
+            assertThat(event.getTxnId()).isEqualTo(txnIdKey);
+        }
+    }
+
+    @Test
+    public void endTransaction_idempotent_onRetryWithSameAction() throws 
Exception {
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+        // Second call with the same action succeeds and leaves the header 
terminal-and-matching.
+        tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+
+        var header = 
txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+        assertThat(header.value().getState()).isEqualTo(TxnState.COMMITTED);
+    }
+
+    @Test
+    public void endTransaction_idempotentRetry_republishesEvents() throws 
Exception {
+        // A retry of a terminal-and-matching txn re-drives the fan-out, so a 
participant that
+        // missed the first event (e.g. a partial publish on the first 
attempt) still gets it.
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        String txnIdKey = TxnIds.toKey(txnId);
+        String segment = "segment://public/default/topic/0000-ffff-0";
+        txnStore.appendOp(txnIdKey,
+                new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+        List<String> received = new ArrayList<>();
+        try (var sub = txnStore.subscribeSegmentEvents(segment, 
received::add)) {
+            tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+            Awaitility.await().untilAsserted(() -> 
assertThat(received).hasSize(1));
+            // Retry re-publishes — a second event lands for the same segment.
+            tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+            Awaitility.await().untilAsserted(() -> 
assertThat(received).hasSize(2));
+        }
+    }
+
+    @Test
+    public void verifyTxnOwnership_matchesOwnerAndRejectsOthers() throws 
Exception {
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "alice").get();
+        assertThat(tc.verifyTxnOwnership(txnId, "alice").get()).isTrue();
+        assertThat(tc.verifyTxnOwnership(txnId, "bob").get()).isFalse();
+    }
+
+    @Test
+    public void verifyTxnOwnership_nullOwnerAlwaysAllowed() throws Exception {
+        // Authentication disabled — owner stored as null, mirroring the 
legacy "null ⟹ allowed".
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, null).get();
+        assertThat(tc.verifyTxnOwnership(txnId, "anyone").get()).isTrue();
+        assertThat(tc.verifyTxnOwnership(txnId, null).get()).isTrue();
+    }
+
+    @Test
+    public void verifyTxnOwnership_unknownTxnReturnsFalse() throws Exception {
+        assertThat(tc.verifyTxnOwnership(new TxnID(0L, 9999L), 
"alice").get()).isFalse();
+    }
+
+    @Test
+    public void endTransaction_failsOnMismatchedAction() throws Exception {
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+        // Trying to ABORT a COMMITTED txn → InvalidTxnStatusException.
+        assertThatThrownBy(() -> tc.endTransaction(txnId, 
TxnAction.ABORT_VALUE).get())
+                
.hasCauseInstanceOf(CoordinatorException.InvalidTxnStatusException.class);
+    }
+
+    @Test
+    public void endTransaction_failsForUnknownTxn() {
+        assertThatThrownBy(() -> tc.endTransaction(new TxnID(0L, 9999L), 
TxnAction.COMMIT_VALUE).get())
+                
.hasCauseInstanceOf(CoordinatorException.TransactionNotFoundException.class);
+    }
+
+    @Test
+    public void addPartitionAndAddSubscription_areNoOps() throws Exception {
+        // v5: participants advertise themselves via /txn/op writes; 
ADD_PARTITION /
+        // ADD_SUBSCRIPTION are wire-level no-ops in the TC.
+        TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+        tc.addProducedPartitionToTxn(txnId, 
List.of("persistent://public/default/topic1")).get();
+        tc.addAckedSubscriptionToTxn(txnId, List.of()).get();
+    }
+
+    @Test
+    public void newTransaction_collectionsAreScopedByTcId() throws Exception {
+        TransactionCoordinatorID tc1 = TransactionCoordinatorID.get(1L);
+        TransactionCoordinatorID tc2 = TransactionCoordinatorID.get(2L);
+        TxnID a = tc.newTransaction(tc1, 60_000L, "owner").get();
+        TxnID b = tc.newTransaction(tc2, 60_000L, "owner").get();
+        // Different tcIds → different mostSigBits, independent leastSigBits 
sequences.
+        assertThat(a.getMostSigBits()).isEqualTo(1L);
+        assertThat(b.getMostSigBits()).isEqualTo(2L);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
index 5a9a9849349..40c04ec0a8f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -51,12 +51,12 @@ public class TxnMetadataStoreTest {
 
     private static TxnHeader open(long createdMs, long timeoutMs) {
         return new TxnHeader(TxnState.OPEN, Duration.ofMillis(timeoutMs),
-                Instant.ofEpochMilli(createdMs), null);
+                Instant.ofEpochMilli(createdMs), null, null);
     }
 
     private static TxnHeader finalized(TxnState state, long createdMs, long 
timeoutMs, long finalizedMs) {
         return new TxnHeader(state, Duration.ofMillis(timeoutMs),
-                Instant.ofEpochMilli(createdMs), 
Instant.ofEpochMilli(finalizedMs));
+                Instant.ofEpochMilli(createdMs), 
Instant.ofEpochMilli(finalizedMs), null);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
index bb86ede033b..e9994fd3314 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
@@ -210,7 +210,7 @@ public class MetadataPendingAckStoreTest {
 
         txnStore.createHeader(TxnIds.toKey(committedTxn),
                 new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
-                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000))).get();
+                        Instant.ofEpochMilli(1000), 
Instant.ofEpochMilli(2000), null)).get();
         txnStore.appendOp(TxnIds.toKey(committedTxn),
                 new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 3L, false)).get();
 
@@ -253,7 +253,7 @@ public class MetadataPendingAckStoreTest {
     private void createOpenHeader(TxnID txnId) throws Exception {
         txnStore.createHeader(TxnIds.toKey(txnId),
                 new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
-                        Instant.ofEpochMilli(1000), null)).get();
+                        Instant.ofEpochMilli(1000), null, null)).get();
     }
 
     private void commitHeader(TxnID txnId) throws Exception {
@@ -261,7 +261,7 @@ public class MetadataPendingAckStoreTest {
         var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
         var h = v.value();
         created = txnStore.updateHeader(TxnIds.toKey(txnId),
-                new TxnHeader(TxnState.COMMITTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                new TxnHeader(TxnState.COMMITTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now(), null),
                 v.version()).get();
         assertThat(created.getVersion()).isPositive();
     }
@@ -270,7 +270,7 @@ public class MetadataPendingAckStoreTest {
         var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
         var h = v.value();
         txnStore.updateHeader(TxnIds.toKey(txnId),
-                new TxnHeader(TxnState.ABORTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now()),
+                new TxnHeader(TxnState.ABORTED, h.getTimeout(), 
h.getCreatedAt(), Instant.now(), null),
                 v.version()).get();
     }
 }

Reply via email to