This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5991f6e8cea [improve][broker] PIP-473 P5.1: metadata-driven
transaction coordinator (NEW_TXN / END_TXN) (#25863)
5991f6e8cea is described below
commit 5991f6e8ceadc1ae2024966e11c44f57cfa912dc
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 27 13:05:03 2026 -0700
[improve][broker] PIP-473 P5.1: metadata-driven transaction coordinator
(NEW_TXN / END_TXN) (#25863)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 11 +
.../org/apache/pulsar/broker/PulsarService.java | 6 +
.../apache/pulsar/broker/service/ServerCnx.java | 105 ++++++++-
.../coordinator/v5/TransactionCoordinatorV5.java | 251 +++++++++++++++++++++
.../transaction/coordinator/v5/package-info.java | 27 +++
.../broker/transaction/metadata/TcSequence.java | 30 +++
.../broker/transaction/metadata/TxnHeader.java | 8 +
.../transaction/metadata/TxnMetadataStore.java | 51 ++++-
.../broker/transaction/metadata/TxnPaths.java | 15 ++
.../buffer/impl/MetadataTransactionBufferTest.java | 8 +-
.../v5/TransactionCoordinatorV5Test.java | 243 ++++++++++++++++++++
.../transaction/metadata/TxnMetadataStoreTest.java | 4 +-
.../impl/MetadataPendingAckStoreTest.java | 8 +-
13 files changed, 753 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 96f2908a74d..e5ecea2c44b 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3795,6 +3795,17 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean transactionCoordinatorEnabled = false;
+ @FieldContext(
+ category = CATEGORY_TRANSACTION,
+ doc = "Enable the metadata-driven transaction coordinator used by
scalable topics."
+ + " When true, wire commands (NEW_TXN / END_TXN / etc.)
are served by the"
+ + " metadata-store-backed coordinator instead of the
legacy"
+ + " TransactionMetadataStoreService. Requires
transactionCoordinatorEnabled"
+ + " = true, and must be enabled together with the
scalable-topic transaction"
+ + " buffer and pending-ack store providers."
+ )
+ private boolean transactionCoordinatorScalableTopicsEnabled = false;
+
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Class name for transaction metadata store provider"
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6a46fb3760d..1af1d3a39f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -134,6 +134,7 @@ import
org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import
org.apache.pulsar.broker.transaction.coordinator.v5.TransactionCoordinatorV5;
import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.validator.BindAddressValidator;
@@ -287,6 +288,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
private OpenTelemetryTransactionPendingAckStoreStats
openTelemetryTransactionPendingAckStoreStats;
private TransactionMetadataStoreService transactionMetadataStoreService;
+ private TransactionCoordinatorV5 transactionCoordinatorV5;
private TransactionBufferProvider transactionBufferProvider;
private TransactionBufferClient transactionBufferClient;
private HashedWheelTimer transactionTimer;
@@ -1043,6 +1045,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);
+ if (config.isTransactionCoordinatorScalableTopicsEnabled()) {
+ transactionCoordinatorV5 = new
TransactionCoordinatorV5(this);
+ }
+
transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
transactionPendingAckStoreProvider =
TransactionPendingAckStoreProvider
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fa7e919880f..11ce47e06db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3348,6 +3348,21 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+
service.pulsar().getTransactionCoordinatorV5().handleClientConnect(tcId)
+ .whenComplete((__, e) -> {
+ if (e == null) {
+
commandSender.sendTcClientConnectResponse(requestId);
+ } else {
+ log.error().attr("requestId",
requestId).attr("tcId", tcId).exception(e)
+ .log("v5 TC client connect failed");
+
commandSender.sendTcClientConnectResponse(requestId,
+
BrokerServiceException.getClientErrorCode(e), e.getMessage());
+ }
+ });
+ return;
+ }
+
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
@@ -3414,6 +3429,22 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ final String v5Owner = getPrincipal();
+ service.pulsar().getTransactionCoordinatorV5()
+ .newTransaction(tcId, command.getTxnTtlSeconds() * 1000L,
v5Owner)
+ .whenComplete((txnId, e) -> {
+ if (e == null) {
+ commandSender.sendNewTxnResponse(requestId, txnId,
tcId.getId());
+ } else {
+ Throwable cause = handleTxnException(e,
BaseCommand.Type.NEW_TXN.name(), requestId);
+ commandSender.sendNewTxnErrorResponse(requestId,
tcId.getId(),
+
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
+ }
+ });
+ return;
+ }
+
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
final String owner = getPrincipal();
@@ -3465,6 +3496,28 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ // v5: TC doesn't need pre-registration — participants advertise
themselves by writing
+ // /txn/op records when they actually apply ops. Still verify
ownership before acking,
+ // matching the legacy authorization surface.
+ verifyTxnOwnership(txnID)
+ .thenCompose(isOwner -> isOwner ?
CompletableFuture.<Void>completedFuture(null)
+ : failedFutureTxnNotOwned(txnID))
+ .whenComplete((v, ex) -> {
+ if (ex == null) {
+
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ } else {
+ Throwable cause = handleTxnException(ex,
+
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
+
writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits(),
+
BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+ }
+ });
+ return;
+ }
+
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
verifyTxnOwnership(txnID)
@@ -3525,6 +3578,27 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ verifyTxnOwnership(txnID)
+ .thenCompose(isOwner -> {
+ if (!isOwner) {
+ return failedFutureTxnNotOwned(txnID);
+ }
+ return service.pulsar().getTransactionCoordinatorV5()
+ .endTransaction(txnID, txnAction);
+ })
+ .whenComplete((__, e) -> {
+ if (e == null) {
+ commandSender.sendEndTxnResponse(requestId, txnID,
txnAction);
+ } else {
+ Throwable cause = handleTxnException(e,
BaseCommand.Type.END_TXN.name(), requestId);
+ commandSender.sendEndTxnErrorResponse(requestId,
txnID,
+
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
+ }
+ });
+ return;
+ }
+
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
@@ -3569,8 +3643,13 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
assert ctx.executor().inEventLoop();
- return service.pulsar().getTransactionMetadataStoreService()
- .verifyTxnOwnership(txnID, getPrincipal())
+ CompletableFuture<Boolean> ownerCheck =
+
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()
+ ? service.pulsar().getTransactionCoordinatorV5()
+ .verifyTxnOwnership(txnID, getPrincipal())
+ : service.pulsar().getTransactionMetadataStoreService()
+ .verifyTxnOwnership(txnID, getPrincipal());
+ return ownerCheck
.thenComposeAsync(isOwner -> {
if (isOwner) {
return CompletableFuture.completedFuture(true);
@@ -3839,6 +3918,28 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
return;
}
+ if
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
{
+ // v5: TC doesn't need pre-registration — participants advertise
themselves by writing
+ // /txn/op records when they actually apply ops. Still verify
ownership before acking,
+ // matching the legacy authorization surface.
+ verifyTxnOwnership(txnID)
+ .thenCompose(isOwner -> isOwner ?
CompletableFuture.<Void>completedFuture(null)
+ : failedFutureTxnNotOwned(txnID))
+ .whenComplete((v, ex) -> {
+ if (ex == null) {
+
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ } else {
+ Throwable cause = handleTxnException(ex,
+
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
+
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits(),
+
BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
+ }
+ });
+ return;
+ }
+
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
new file mode 100644
index 00000000000..1d3df7071ec
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import
org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
+import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.broker.transaction.metadata.Versioned;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+
+/**
+ * PIP-473 v5 transaction coordinator — broker-side service.
+ *
+ * <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code
N} iff it owns
+ * partition {@code N} of {@code
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
+ * leader-election mechanism the legacy {@code
TransactionMetadataStoreService} uses; reusing
+ * it keeps the client-side discovery surface unchanged.
+ *
+ * <p>Wire commands handled (routed by {@code ServerCnx} when
+ * {@code transactionCoordinatorScalableTopicsEnabled} is on):
+ * <ul>
+ * <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li>
+ * <li>{@code NEW_TXN} → {@link #newTransaction}</li>
+ * <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} —
no-ops per PIP; the v5
+ * participants advertise themselves by writing {@code /txn/op} records,
so the TC doesn't
+ * need a pre-registration step.</li>
+ * <li>{@code END_TXN} → {@link #endTransaction}</li>
+ * </ul>
+ *
+ * <p>{@code endTransaction} CAS-updates the header to the terminal state,
enumerates
+ * {@code /txn/op/<txnId>-*} via {@link TxnPaths#IDX_OPS_BY_TXN}, and
publishes one
+ * segment-event per affected segment + one subscription-event per affected
+ * {@code (segment, subscription)} pair. The fan-out is metadata-store writes
(not RPCs) and
+ * is bounded by the txn's participant count.
+ *
+ * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep —
those land in
+ * P5.2.
+ */
+@CustomLog
+public class TransactionCoordinatorV5 {
+
+ private final PulsarService pulsar;
+ private final TxnMetadataStore txnStore;
+
+ public TransactionCoordinatorV5(PulsarService pulsar) {
+ this.pulsar = pulsar;
+ this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
+ }
+
+ // ---- TC client connect ------------------------------------------------
+
+ /**
+ * Verify this broker is the leader for {@code tcId} (owns the
corresponding partition of
+ * {@code transaction_coordinator_assign}). Mirrors the ownership check
the legacy
+ * {@code TransactionMetadataStoreService.handleTcClientConnect} performs
— the same
+ * topic-ownership mechanism serves as our leader-election surface.
+ */
+ public CompletableFuture<Void>
handleClientConnect(TransactionCoordinatorID tcId) {
+ String assignPartition =
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+ .getPartition((int) tcId.getId()).toString();
+ return
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition);
+ }
+
+ // ---- newTransaction ---------------------------------------------------
+
+ /**
+ * Create a new transaction header at {@code /txn/id/<tcId>_<seq>}. The
{@code leastSigBits}
+ * is drawn from the per-tcId monotonic sequence counter ({@link
TxnMetadataStore#nextTxnSequence})
+ * so txnIds are never reused — the participant-side aborted-set is keyed
by txnId, and reuse
+ * would break that.
+ */
+ public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID
tcId, long timeoutInMillis,
+ String owner) {
+ return txnStore.nextTxnSequence(tcId.getId()).thenCompose(seq -> {
+ TxnID txnId = new TxnID(tcId.getId(), seq);
+ TxnHeader header = new TxnHeader(TxnState.OPEN,
+ Duration.ofMillis(timeoutInMillis), Instant.now(), null,
owner);
+ return txnStore.createHeader(TxnIds.toKey(txnId),
header).thenApply(stat -> txnId);
+ });
+ }
+
+ /**
+ * Verify {@code principal} owns {@code txnId}. Mirrors the legacy
coordinator's
+ * {@code TransactionMetadataStoreService.verifyTxnOwnership} semantics: a
{@code null} stored
+ * owner (authentication disabled, or a legacy txn) is always allowed;
otherwise the principal
+ * must match. The superuser fallback lives in {@code
ServerCnx#verifyTxnOwnership}, same as the
+ * legacy path. A missing header resolves to {@code false} (not owned).
+ */
+ public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnId, String
principal) {
+ return txnStore.getHeader(TxnIds.toKey(txnId)).thenApply(opt -> opt
+ .map(v -> {
+ String owner = v.value().getOwner();
+ return owner == null || owner.equals(principal);
+ })
+ .orElse(false));
+ }
+
+ // ---- addPartition / addSubscription (no-op in v5) ---------------------
+
+ /**
+ * No-op per PIP-473 — in v5, participants advertise themselves by writing
{@code /txn/op}
+ * records when they actually apply ops. The pre-registration step is
unnecessary.
+ */
+ public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId,
List<String> partitions) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** No-op (see {@link #addProducedPartitionToTxn}). */
+ public CompletableFuture<Void> addAckedSubscriptionToTxn(TxnID txnId,
+
List<TransactionSubscription> subscriptions) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ // ---- endTransaction ---------------------------------------------------
+
+ /**
+ * Finalise a transaction: CAS the header to {@code COMMITTED}/{@code
ABORTED}, enumerate
+ * the txn's participants via {@link TxnMetadataStore#listOpsByTxn}, and
publish one
+ * segment-event per affected segment and one subscription-event per
affected
+ * {@code (segment, subscription)} pair. Idempotent against retries — a
header already in
+ * the requested terminal state short-circuits without republishing.
+ */
+ public CompletableFuture<Void> endTransaction(TxnID txnId, int txnAction) {
+ TxnState newState = newStateFor(txnAction);
+ if (newState == null) {
+ return FutureUtil.failedFuture(
+ new
TransactionCoordinatorException.UnsupportedTxnActionException(txnId,
txnAction));
+ }
+ String txnIdKey = TxnIds.toKey(txnId);
+ return txnStore.getHeader(txnIdKey).thenCompose(opt -> {
+ if (opt.isEmpty()) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.TransactionNotFoundException(
+ "Transaction not found: " + txnId));
+ }
+ Versioned<TxnHeader> v = opt.get();
+ TxnHeader current = v.value();
+ if (current.getState() == newState) {
+ // Idempotent retry — header already terminal-and-matching.
Re-drive the fan-out
+ // rather than short-circuiting: if a previous attempt CAS'd
the header but failed
+ // (partially or fully) before publishing, the only way a
participant ever learns
+ // the decision is the event, and a terminal header gives the
reconcile path nothing
+ // to act on. Re-publishing is safe — participants key on
(txnId, decision) and the
+ // decision can't change once terminal.
+ return fanOutEvents(txnId, txnIdKey, newState);
+ }
+ if (current.getState() != TxnState.OPEN) {
+ return FutureUtil.failedFuture(
+ new CoordinatorException.InvalidTxnStatusException(
+ "Transaction " + txnId + " is " +
current.getState()
+ + ", cannot transition to " +
newState));
+ }
+ TxnHeader updated = new TxnHeader(newState, current.getTimeout(),
+ current.getCreatedAt(), Instant.now(), current.getOwner());
+ return txnStore.updateHeader(txnIdKey, updated, v.version())
+ .thenCompose(stat -> fanOutEvents(txnId, txnIdKey,
newState));
+ });
+ }
+
+ private static TxnState newStateFor(int txnAction) {
+ if (txnAction == TxnAction.COMMIT_VALUE) {
+ return TxnState.COMMITTED;
+ } else if (txnAction == TxnAction.ABORT_VALUE) {
+ return TxnState.ABORTED;
+ }
+ return null;
+ }
+
+ /**
+ * Enumerate {@code /txn/op} via {@link TxnMetadataStore#listOpsByTxn},
group by participant,
+ * and publish one event per participant. Writes are independent so we
fire them in parallel.
+ */
+ private CompletableFuture<Void> fanOutEvents(TxnID txnId, String txnIdKey,
TxnState decision) {
+ Set<String> writeSegments = ConcurrentHashMap.newKeySet();
+ Set<AckParticipant> ackParticipants = ConcurrentHashMap.newKeySet();
+ return txnStore.listOpsByTxn(txnIdKey, new ScanConsumer() {
+ @Override
+ public void onNext(GetResult r) {
+ TxnOp op = TxnMetadataStore.fromJson(r.getValue(),
TxnOp.class);
+ if (op.getKind() == TxnOpKind.WRITE) {
+ writeSegments.add(op.getSegment());
+ } else if (op.getKind() == TxnOpKind.ACK &&
op.getSubscription() != null) {
+ ackParticipants.add(new AckParticipant(op.getSegment(),
op.getSubscription()));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn().attr("txnId", txnId).exception(throwable)
+ .log("endTxn participant enumeration encountered an
error");
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ }).thenCompose(__ -> {
+ TxnEvent event = new TxnEvent(txnIdKey, decision);
+ CompletableFuture<?>[] publishes = new CompletableFuture<?>[
+ writeSegments.size() + ackParticipants.size()];
+ int i = 0;
+ for (String segment : writeSegments) {
+ publishes[i++] = txnStore.publishSegmentEvent(segment, event);
+ }
+ for (AckParticipant p : ackParticipants) {
+ publishes[i++] =
txnStore.publishSubscriptionEvent(p.segment(), p.subscription(), event);
+ }
+ return CompletableFuture.allOf(publishes);
+ });
+ }
+
+ /** A {@code (segment, subscription)} ack participant; keys the ack
fan-out de-dup set. */
+ private record AckParticipant(String segment, String subscription) {
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
new file mode 100644
index 00000000000..77f0b4c5164
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * PIP-473 v5 transaction coordinator: broker-side service that serves {@code
NEW_TXN} /
+ * {@code END_TXN} wire commands against the metadata-store-backed
+ * {@link org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore}.
Replaces the
+ * legacy {@code TransactionMetadataStoreService} when
+ * {@code transactionCoordinatorScalableTopicsEnabled} is on.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
new file mode 100644
index 00000000000..f3fb1ea159f
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcSequence.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Per-tcId txnId-sequence counter. Stored at {@code /txn/tc-seq/<tcId>} and
CAS-incremented
+ * by the v5 TC for each {@code newTxn}. The yielded value becomes a
transaction's
+ * {@code leastSigBits} — monotonic per tcId so {@link
org.apache.pulsar.client.api.transaction.TxnID}
+ * is never reused (avoiding aborted-set key collisions in the
participant-side visibility state).
+ *
+ * @param next the next {@code leastSigBits} to assign (the most recently
issued value + 1)
+ */
+public record TcSequence(long next) {
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
index 381b6bbfbbc..11043eafe6c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnHeader.java
@@ -49,4 +49,12 @@ public class TxnHeader {
* the CAS that flips {@link #state}. {@code null} while OPEN.
*/
private Instant finalizedAt;
+
+ /**
+ * Principal that opened the transaction (from {@code NEW_TXN}). Used to
authorize subsequent
+ * {@code END_TXN} / add-participant commands — only the owner (or a
superuser) may operate on
+ * the txn. {@code null} when authentication is disabled, mirroring the
legacy coordinator's
+ * "null owner ⟹ allowed" semantics.
+ */
+ private String owner;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
index 323473ac8b1..ebf300b7725 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import lombok.CustomLog;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -118,13 +119,17 @@ public class TxnMetadataStore {
* {@code path} carries the generated sequence key.
*/
public CompletableFuture<Stat> appendOp(String txnId, TxnOp op) {
- Option.SecondaryIndex idx = switch (op.getKind()) {
+ Option.SecondaryIndex participantIdx = switch (op.getKind()) {
case WRITE -> new
Option.SecondaryIndex(TxnPaths.IDX_WRITES_BY_SEGMENT,
TxnPaths.segmentKey(op.getSegment()));
case ACK -> new
Option.SecondaryIndex(TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
TxnPaths.ackIndexKey(op.getSegment(),
op.getSubscription()));
};
- Set<Option> opts = Set.of(new Option.PartitionKey(txnId), idx,
APPEND_DELTAS);
+ // Also index by txnId so the TC's endTxn can enumerate this txn's ops
without scanning
+ // the whole /txn/op namespace.
+ Option.SecondaryIndex byTxnIdx = new
Option.SecondaryIndex(TxnPaths.IDX_OPS_BY_TXN, txnId);
+ Set<Option> opts = Set.of(new Option.PartitionKey(txnId),
participantIdx, byTxnIdx,
+ APPEND_DELTAS);
return store.put(TxnPaths.opParent(txnId), toJson(op),
Optional.empty(), opts);
}
@@ -157,6 +162,18 @@ public class TxnMetadataStore {
consumer);
}
+ /**
+ * Stream all {@code /txn/op} records for {@code txnId} via the {@link
TxnPaths#IDX_OPS_BY_TXN}
+ * index. Used by the v5 TC at end-txn time to enumerate participants —
distinct segments for
+ * writes, distinct {@code (segment, subscription)} pairs for acks.
+ */
+ public CompletableFuture<Void> listOpsByTxn(String txnId, ScanConsumer
consumer) {
+ return store.scanByIndex(TxnPaths.TXN_OP_PREFIX,
TxnPaths.IDX_OPS_BY_TXN,
+ txnId, txnId,
+ gr ->
txnId.equals(TxnPaths.txnIdFromOpPath(gr.getStat().getPath())),
+ consumer);
+ }
+
/**
* Delete every {@code /txn/op} write record for {@code (segment, txnId)}
— used by the TB once
* an event tells it the txn is terminal. Path extraction follows the
layout in
@@ -381,6 +398,36 @@ public class TxnMetadataStore {
return store.deleteIfExists(TxnPaths.segmentWatermarkPath(segment),
Optional.empty(), opts);
}
+ // ---- TC sequence counter ----------------------------------------------
+
+ /**
+ * Atomically increment the per-tc txnId sequence counter and return the
assigned value.
+ * Retries on {@link MetadataStoreException.BadVersionException} so
concurrent callers
+ * (within a TC partition's broker) serialise correctly. The returned
value becomes a
+ * txn's {@code leastSigBits}; monotonic per {@code tcId} ⟹ no txnId reuse.
+ */
+ public CompletableFuture<Long> nextTxnSequence(long tcId) {
+ String path = TxnPaths.tcSequencePath(tcId);
+ return store.get(path).thenCompose(opt -> {
+ long current = opt.map(gr -> fromJson(gr.getValue(),
TcSequence.class).next() - 1).orElse(-1L);
+ long assigned = current + 1;
+ TcSequence updated = new TcSequence(assigned + 1);
+ Optional<Long> expectedVersion = opt.map(gr ->
gr.getStat().getVersion())
+ .map(Optional::of).orElse(Optional.of(-1L));
+ return store.put(path, toJson(updated), expectedVersion, Set.of())
+ .thenApply(stat -> assigned)
+ .exceptionallyCompose(ex -> {
+ Throwable cause = ex instanceof CompletionException &&
ex.getCause() != null
+ ? ex.getCause() : ex;
+ if (cause instanceof
MetadataStoreException.BadVersionException) {
+ // Concurrent write — retry.
+ return nextTxnSequence(tcId);
+ }
+ return FutureUtil.failedFuture(cause);
+ });
+ });
+ }
+
// ---- JSON helpers ------------------------------------------------------
/** @return UTF-8 JSON bytes for {@code value}. Wraps any I/O error as
{@link CompletionException}. */
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index f68d8d73bd1..7e25ebb33ba 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -92,6 +92,21 @@ public final class TxnPaths {
*/
public static final String IDX_TXN_ABORTED_BY_POSITION =
"idx:txn-aborted-by-position";
+ /**
+ * Index: list all {@code /txn/op} records for a txn. Key = {@code txnId}.
Used by the v5 TC
+ * at end-txn time to enumerate the txn's participants without scanning
the whole
+ * {@code /txn/op} namespace.
+ */
+ public static final String IDX_OPS_BY_TXN = "idx:ops-by-txn";
+
+ /** Path prefix for per-tcId txnId-sequence counter documents. */
+ public static final String TXN_TC_SEQ_PREFIX = "/txn/tc-seq";
+
+ /** @return {@code /txn/tc-seq/<tcId>} — the txnId-sequence counter doc
for {@code tcId}. */
+ public static String tcSequencePath(long tcId) {
+ return TXN_TC_SEQ_PREFIX + "/" + tcId;
+ }
+
/** Width used when formatting long values into
lexicographically-orderable index keys. */
public static final int LONG_KEY_WIDTH = 20;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
index 804e12d1bd5..8234eea3832 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBufferTest.java
@@ -163,7 +163,7 @@ public class MetadataTransactionBufferTest {
// Pre-set header to COMMITTED — txn is terminal before any append.
Stat created = txnStore.createHeader(TxnIds.toKey(txnId),
new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
- Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
+ Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000), null)).get();
assertThat(created.getVersion()).isZero();
MetadataTransactionBuffer tb = new MetadataTransactionBuffer(topic,
txnStore);
@@ -393,14 +393,14 @@ public class MetadataTransactionBufferTest {
private void createOpenHeader(TxnID txnId) throws Exception {
txnStore.createHeader(TxnIds.toKey(txnId),
new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
- Instant.ofEpochMilli(1000), null)).get();
+ Instant.ofEpochMilli(1000), null, null)).get();
}
private void commitTxn(TxnID txnId) throws Exception {
var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
var h = v.value();
txnStore.updateHeader(TxnIds.toKey(txnId),
- new TxnHeader(TxnState.COMMITTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ new TxnHeader(TxnState.COMMITTED, h.getTimeout(),
h.getCreatedAt(), Instant.now(), null),
v.version()).get();
}
@@ -408,7 +408,7 @@ public class MetadataTransactionBufferTest {
var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
var h = v.value();
txnStore.updateHeader(TxnIds.toKey(txnId),
- new TxnHeader(TxnState.ABORTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ new TxnHeader(TxnState.ABORTED, h.getTimeout(),
h.getCreatedAt(), Instant.now(), null),
v.version()).get();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
new file mode 100644
index 00000000000..38150874db9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.coordinator.v5;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
+import org.apache.pulsar.broker.transaction.metadata.TxnIds;
+import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
+import org.apache.pulsar.broker.transaction.metadata.TxnOp;
+import org.apache.pulsar.broker.transaction.metadata.TxnOpKind;
+import org.apache.pulsar.broker.transaction.metadata.TxnState;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests for {@link TransactionCoordinatorV5} against the in-memory
metadata store. The
+ * {@link PulsarService} / {@link BrokerService} dependencies are mocked just
enough for the TC
+ * to construct and to satisfy {@code handleClientConnect}'s ownership check.
+ */
+public class TransactionCoordinatorV5Test {
+
+ private static final TransactionCoordinatorID TC_ID =
TransactionCoordinatorID.get(0L);
+
+ private MetadataStoreExtended store;
+ private TxnMetadataStore txnStore;
+ private PulsarService pulsar;
+ private TransactionCoordinatorV5 tc;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = MetadataStoreExtended.create("memory:local",
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ txnStore = new TxnMetadataStore(store);
+ pulsar = mock(PulsarService.class);
+ when(pulsar.getLocalMetadataStore()).thenReturn(store);
+ BrokerService brokerService = mock(BrokerService.class);
+ when(pulsar.getBrokerService()).thenReturn(brokerService);
+ // Default: owned. Tests that want to assert the not-owned path can
override.
+
when(brokerService.checkTopicNsOwnership(any())).thenReturn(CompletableFuture.completedFuture(null));
+ tc = new TransactionCoordinatorV5(pulsar);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ @Test
+ public void newTransaction_createsHeader_withSequentialLeastBits() throws
Exception {
+ TxnID t1 = tc.newTransaction(TC_ID, 60_000L, "owner-a").get();
+ TxnID t2 = tc.newTransaction(TC_ID, 60_000L, "owner-a").get();
+ TxnID t3 = tc.newTransaction(TC_ID, 60_000L, "owner-a").get();
+
+ // mostSigBits is the tcId.
+ assertThat(t1.getMostSigBits()).isEqualTo(TC_ID.getId());
+ // leastSigBits is monotonic per tcId.
+ assertThat(t1.getLeastSigBits()).isLessThan(t2.getLeastSigBits());
+ assertThat(t2.getLeastSigBits()).isLessThan(t3.getLeastSigBits());
+
+ // Header lives at /txn/id/<TxnIds.toKey> with state = OPEN and the
opening principal.
+ var header = txnStore.getHeader(TxnIds.toKey(t1)).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+ assertThat(header.value().getOwner()).isEqualTo("owner-a");
+ }
+
+ @Test
+ public void endTransaction_commit_casesHeaderAndFansOutSegmentEvent()
throws Exception {
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ String txnIdKey = TxnIds.toKey(txnId);
+ String segment = "segment://public/default/topic/0000-ffff-0";
+
+ // Simulate a participant appending an op for the segment.
+ txnStore.appendOp(txnIdKey,
+ new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+ // Subscribe to segment events before endTxn — receive the published
event.
+ List<String> received = new ArrayList<>();
+ try (var sub = txnStore.subscribeSegmentEvents(segment,
received::add)) {
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+ // Header is now COMMITTED.
+ var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+
assertThat(header.value().getState()).isEqualTo(TxnState.COMMITTED);
+ // A segment-event for this segment was published.
+ Awaitility.await().untilAsserted(() ->
assertThat(received).isNotEmpty());
+ }
+ }
+
+ @Test
+ public void endTransaction_abort_casesHeaderAndFansOutEvents() throws
Exception {
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ String txnIdKey = TxnIds.toKey(txnId);
+ String segment = "segment://public/default/topic/0000-ffff-0";
+ String subscription = "sub-x";
+
+ // Two participants: a WRITE on the segment and an ACK on (segment,
sub).
+ txnStore.appendOp(txnIdKey,
+ new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+ txnStore.appendOp(txnIdKey,
+ new TxnOp(TxnOpKind.ACK, segment, subscription, 5L, 2L,
false)).get();
+
+ List<String> segReceived = new ArrayList<>();
+ List<String> subReceived = new ArrayList<>();
+ try (var s1 = txnStore.subscribeSegmentEvents(segment,
segReceived::add);
+ var s2 = txnStore.subscribeSubscriptionEvents(segment,
subscription, subReceived::add)) {
+ tc.endTransaction(txnId, TxnAction.ABORT_VALUE).get();
+
+ // Header is ABORTED.
+ var header = txnStore.getHeader(txnIdKey).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.ABORTED);
+
+ // Both participant streams received an event.
+ Awaitility.await().untilAsserted(() ->
assertThat(segReceived).isNotEmpty());
+ Awaitility.await().untilAsserted(() ->
assertThat(subReceived).isNotEmpty());
+
+ // The published event's decision matches.
+ byte[] bytes = store.get(segReceived.get(segReceived.size() -
1)).get().orElseThrow().getValue();
+ TxnEvent event = TxnMetadataStore.fromJson(bytes, TxnEvent.class);
+ assertThat(event.getDecision()).isEqualTo(TxnState.ABORTED);
+ assertThat(event.getTxnId()).isEqualTo(txnIdKey);
+ }
+ }
+
+ @Test
+ public void endTransaction_idempotent_onRetryWithSameAction() throws
Exception {
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+ // Second call with the same action succeeds and leaves the header
terminal-and-matching.
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+
+ var header =
txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
+ assertThat(header.value().getState()).isEqualTo(TxnState.COMMITTED);
+ }
+
+ @Test
+ public void endTransaction_idempotentRetry_republishesEvents() throws
Exception {
+ // A retry of a terminal-and-matching txn re-drives the fan-out, so a
participant that
+ // missed the first event (e.g. a partial publish on the first
attempt) still gets it.
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ String txnIdKey = TxnIds.toKey(txnId);
+ String segment = "segment://public/default/topic/0000-ffff-0";
+ txnStore.appendOp(txnIdKey,
+ new TxnOp(TxnOpKind.WRITE, segment, null, 5L, 1L, null)).get();
+
+ List<String> received = new ArrayList<>();
+ try (var sub = txnStore.subscribeSegmentEvents(segment,
received::add)) {
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+ Awaitility.await().untilAsserted(() ->
assertThat(received).hasSize(1));
+ // Retry re-publishes — a second event lands for the same segment.
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+ Awaitility.await().untilAsserted(() ->
assertThat(received).hasSize(2));
+ }
+ }
+
+ @Test
+ public void verifyTxnOwnership_matchesOwnerAndRejectsOthers() throws
Exception {
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "alice").get();
+ assertThat(tc.verifyTxnOwnership(txnId, "alice").get()).isTrue();
+ assertThat(tc.verifyTxnOwnership(txnId, "bob").get()).isFalse();
+ }
+
+ @Test
+ public void verifyTxnOwnership_nullOwnerAlwaysAllowed() throws Exception {
+ // Authentication disabled — owner stored as null, mirroring the
legacy "null ⟹ allowed".
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, null).get();
+ assertThat(tc.verifyTxnOwnership(txnId, "anyone").get()).isTrue();
+ assertThat(tc.verifyTxnOwnership(txnId, null).get()).isTrue();
+ }
+
+ @Test
+ public void verifyTxnOwnership_unknownTxnReturnsFalse() throws Exception {
+ assertThat(tc.verifyTxnOwnership(new TxnID(0L, 9999L),
"alice").get()).isFalse();
+ }
+
+ @Test
+ public void endTransaction_failsOnMismatchedAction() throws Exception {
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ tc.endTransaction(txnId, TxnAction.COMMIT_VALUE).get();
+ // Trying to ABORT a COMMITTED txn → InvalidTxnStatusException.
+ assertThatThrownBy(() -> tc.endTransaction(txnId,
TxnAction.ABORT_VALUE).get())
+
.hasCauseInstanceOf(CoordinatorException.InvalidTxnStatusException.class);
+ }
+
+ @Test
+ public void endTransaction_failsForUnknownTxn() {
+ assertThatThrownBy(() -> tc.endTransaction(new TxnID(0L, 9999L),
TxnAction.COMMIT_VALUE).get())
+
.hasCauseInstanceOf(CoordinatorException.TransactionNotFoundException.class);
+ }
+
+ @Test
+ public void addPartitionAndAddSubscription_areNoOps() throws Exception {
+ // v5: participants advertise themselves via /txn/op writes;
ADD_PARTITION /
+ // ADD_SUBSCRIPTION are wire-level no-ops in the TC.
+ TxnID txnId = tc.newTransaction(TC_ID, 60_000L, "owner").get();
+ tc.addProducedPartitionToTxn(txnId,
List.of("persistent://public/default/topic1")).get();
+ tc.addAckedSubscriptionToTxn(txnId, List.of()).get();
+ }
+
+ @Test
+ public void newTransaction_collectionsAreScopedByTcId() throws Exception {
+ TransactionCoordinatorID tc1 = TransactionCoordinatorID.get(1L);
+ TransactionCoordinatorID tc2 = TransactionCoordinatorID.get(2L);
+ TxnID a = tc.newTransaction(tc1, 60_000L, "owner").get();
+ TxnID b = tc.newTransaction(tc2, 60_000L, "owner").get();
+ // Different tcIds → different mostSigBits, independent leastSigBits
sequences.
+ assertThat(a.getMostSigBits()).isEqualTo(1L);
+ assertThat(b.getMostSigBits()).isEqualTo(2L);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
index 5a9a9849349..40c04ec0a8f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStoreTest.java
@@ -51,12 +51,12 @@ public class TxnMetadataStoreTest {
private static TxnHeader open(long createdMs, long timeoutMs) {
return new TxnHeader(TxnState.OPEN, Duration.ofMillis(timeoutMs),
- Instant.ofEpochMilli(createdMs), null);
+ Instant.ofEpochMilli(createdMs), null, null);
}
private static TxnHeader finalized(TxnState state, long createdMs, long
timeoutMs, long finalizedMs) {
return new TxnHeader(state, Duration.ofMillis(timeoutMs),
- Instant.ofEpochMilli(createdMs),
Instant.ofEpochMilli(finalizedMs));
+ Instant.ofEpochMilli(createdMs),
Instant.ofEpochMilli(finalizedMs), null);
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
index bb86ede033b..e9994fd3314 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStoreTest.java
@@ -210,7 +210,7 @@ public class MetadataPendingAckStoreTest {
txnStore.createHeader(TxnIds.toKey(committedTxn),
new TxnHeader(TxnState.COMMITTED, Duration.ofMillis(5000),
- Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000))).get();
+ Instant.ofEpochMilli(1000),
Instant.ofEpochMilli(2000), null)).get();
txnStore.appendOp(TxnIds.toKey(committedTxn),
new TxnOp(TxnOpKind.ACK, SEGMENT, SUB, 7L, 3L, false)).get();
@@ -253,7 +253,7 @@ public class MetadataPendingAckStoreTest {
private void createOpenHeader(TxnID txnId) throws Exception {
txnStore.createHeader(TxnIds.toKey(txnId),
new TxnHeader(TxnState.OPEN, Duration.ofMillis(60_000),
- Instant.ofEpochMilli(1000), null)).get();
+ Instant.ofEpochMilli(1000), null, null)).get();
}
private void commitHeader(TxnID txnId) throws Exception {
@@ -261,7 +261,7 @@ public class MetadataPendingAckStoreTest {
var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
var h = v.value();
created = txnStore.updateHeader(TxnIds.toKey(txnId),
- new TxnHeader(TxnState.COMMITTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ new TxnHeader(TxnState.COMMITTED, h.getTimeout(),
h.getCreatedAt(), Instant.now(), null),
v.version()).get();
assertThat(created.getVersion()).isPositive();
}
@@ -270,7 +270,7 @@ public class MetadataPendingAckStoreTest {
var v = txnStore.getHeader(TxnIds.toKey(txnId)).get().orElseThrow();
var h = v.value();
txnStore.updateHeader(TxnIds.toKey(txnId),
- new TxnHeader(TxnState.ABORTED, h.getTimeout(),
h.getCreatedAt(), Instant.now()),
+ new TxnHeader(TxnState.ABORTED, h.getTimeout(),
h.getCreatedAt(), Instant.now(), null),
v.version()).get();
}
}