merlimat commented on code in PR #25863: URL: https://github.com/apache/pulsar/pull/25863#discussion_r3311102962
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.coordinator.v5; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import lombok.CustomLog; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException; +import org.apache.pulsar.broker.transaction.metadata.TxnEvent; +import org.apache.pulsar.broker.transaction.metadata.TxnHeader; +import org.apache.pulsar.broker.transaction.metadata.TxnIds; +import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore; +import org.apache.pulsar.broker.transaction.metadata.TxnOp; +import org.apache.pulsar.broker.transaction.metadata.TxnOpKind; +import org.apache.pulsar.broker.transaction.metadata.TxnState; +import org.apache.pulsar.broker.transaction.metadata.Versioned; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.ScanConsumer; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionSubscription; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; + +/** + * PIP-473 v5 transaction coordinator — broker-side service. + * + * <p>Per-partition coordinator. A broker runs the v5 TC for partition {@code N} iff it owns + * partition {@code N} of {@code SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same + * leader-election mechanism the legacy {@code TransactionMetadataStoreService} uses; reusing + * it keeps the client-side discovery surface unchanged. + * + * <p>Wire commands handled (routed by {@code ServerCnx} when + * {@code transactionCoordinatorScalableTopicsEnabled} is on): + * <ul> + * <li>{@code TC_CLIENT_CONNECT} → {@link #handleClientConnect}</li> + * <li>{@code NEW_TXN} → {@link #newTransaction}</li> + * <li>{@code ADD_PARTITION_TO_TXN}, {@code ADD_SUBSCRIPTION_TO_TXN} — no-ops per PIP; the v5 + * participants advertise themselves by writing {@code /txn/op} records, so the TC doesn't + * need a pre-registration step.</li> + * <li>{@code END_TXN} → {@link #endTransaction}</li> + * </ul> + * + * <p>{@code endTransaction} CAS-updates the header to the terminal state, enumerates + * {@code /txn/op/<txnId>-*} via {@link TxnPaths#IDX_OPS_BY_TXN}, and publishes one + * segment-event per affected segment + one subscription-event per affected + * {@code (segment, subscription)} pair. The fan-out is metadata-store writes (not RPCs) and + * is bounded by the txn's participant count. + * + * <p>P5.1 scope: happy-path newTxn / endTxn. No timeout sweep, no GC sweep — those land in + * P5.2. + */ +@CustomLog +public class TransactionCoordinatorV5 { + + private final PulsarService pulsar; + private final TxnMetadataStore txnStore; + + public TransactionCoordinatorV5(PulsarService pulsar) { + this.pulsar = pulsar; + this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore()); + } + + // ---- TC client connect ------------------------------------------------ + + /** + * Verify this broker is the leader for {@code tcId} (owns the corresponding partition of + * {@code transaction_coordinator_assign}). Mirrors the ownership check the legacy + * {@code TransactionMetadataStoreService.handleTcClientConnect} performs — the same + * topic-ownership mechanism serves as our leader-election surface. + */ + public CompletableFuture<Void> handleClientConnect(TransactionCoordinatorID tcId) { + String assignPartition = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN + .getPartition((int) tcId.getId()).toString(); + return pulsar.getBrokerService().checkTopicNsOwnership(assignPartition); + } + + // ---- newTransaction --------------------------------------------------- + + /** + * Create a new transaction header at {@code /txn/id/<tcId>_<seq>}. The {@code leastSigBits} + * is drawn from the per-tcId monotonic sequence counter ({@link TxnMetadataStore#nextTxnSequence}) + * so txnIds are never reused — the participant-side aborted-set is keyed by txnId, and reuse + * would break that. + */ + public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMillis, + String owner) { + return txnStore.nextTxnSequence(tcId.getId()).thenCompose(seq -> { + TxnID txnId = new TxnID(tcId.getId(), seq); + TxnHeader header = new TxnHeader(TxnState.OPEN, + Duration.ofMillis(timeoutInMillis), Instant.now(), null); + return txnStore.createHeader(TxnIds.toKey(txnId), header).thenApply(stat -> txnId); + }); + } + + // ---- addPartition / addSubscription (no-op in v5) --------------------- + + /** + * No-op per PIP-473 — in v5, participants advertise themselves by writing {@code /txn/op} + * records when they actually apply ops. The pre-registration step is unnecessary. + */ + public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) { + return CompletableFuture.completedFuture(null); + } + + /** No-op (see {@link #addProducedPartitionToTxn}). */ + public CompletableFuture<Void> addAckedSubscriptionToTxn(TxnID txnId, + List<TransactionSubscription> subscriptions) { + return CompletableFuture.completedFuture(null); + } + + // ---- endTransaction --------------------------------------------------- + + /** + * Finalise a transaction: CAS the header to {@code COMMITTED}/{@code ABORTED}, enumerate + * the txn's participants via {@link TxnMetadataStore#listOpsByTxn}, and publish one + * segment-event per affected segment and one subscription-event per affected + * {@code (segment, subscription)} pair. Idempotent against retries — a header already in + * the requested terminal state short-circuits without republishing. + */ + public CompletableFuture<Void> endTransaction(TxnID txnId, int txnAction) { + TxnState newState = newStateFor(txnAction); + if (newState == null) { + return FutureUtil.failedFuture( + new TransactionCoordinatorException.UnsupportedTxnActionException(txnId, txnAction)); + } + String txnIdKey = TxnIds.toKey(txnId); + return txnStore.getHeader(txnIdKey).thenCompose(opt -> { + if (opt.isEmpty()) { + return FutureUtil.failedFuture( + new CoordinatorException.TransactionNotFoundException( + "Transaction not found: " + txnId)); + } + Versioned<TxnHeader> v = opt.get(); + TxnHeader current = v.value(); + if (current.getState() == newState) { + // Idempotent retry — already in the requested terminal state. Re-publish events + // is safe but skip for simplicity; participants tolerate missing events via the + // header re-read in their reconcile path. + return CompletableFuture.completedFuture(null); + } + if (current.getState() != TxnState.OPEN) { + return FutureUtil.failedFuture( + new CoordinatorException.InvalidTxnStatusException( + "Transaction " + txnId + " is " + current.getState() + + ", cannot transition to " + newState)); + } + TxnHeader updated = new TxnHeader(newState, current.getTimeout(), + current.getCreatedAt(), Instant.now()); + return txnStore.updateHeader(txnIdKey, updated, v.version()) + .thenCompose(stat -> fanOutEvents(txnId, txnIdKey, newState)); + }); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
