+1 (binding)

Enrico

Il giorno sab 9 mag 2026 alle ore 09:36 Tao Jiuming <[email protected]> ha
scritto:

> +1 nonbinding
>
> Matteo Merli <[email protected]> 于2026年5月9日周六 06:26写道:
>
> > https://github.com/apache/pulsar/pull/25693
> >
> > ------
> >
> >
> > # PIP-471: Metadata-Driven Transactions for Scalable Topics
> >
> > *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
> >
> > ## Background
> >
> > ### Pulsar's existing transaction model
> >
> > Pulsar transactions today are realized through three components:
> >
> > - **Transaction Coordinator (TC)** — a per-broker service backed by a
> > system topic (`__transaction_log_*` in `pulsar/system`) that tracks
> > the lifecycle of every transaction (`OPEN`, `COMMITTING`, `COMMITTED`,
> > `ABORTING`, `ABORTED`, `TIME_OUT`) and orchestrates two-phase commit
> > across the topics that participate in each transaction.
> > - **TransactionBuffer (TB)** — a per-`PersistentTopic` component that
> > buffers transactional writes in the topic's data stream, tracks
> > aborted transaction IDs, and gates the dispatcher's read horizon
> > (`maxReadPosition`) so that uncommitted entries are not delivered. The
> > TB persists its state in a per-namespace system topic
> > (`__transaction_buffer_snapshot`).
> > - **PendingAckStore** — a per-(topic, subscription) component that
> > records transactional acknowledgments in a sibling persistent topic
> > (`<topic>-<sub>__transaction_pending_ack`), applying them to the
> > cursor only when the transaction commits.
> >
> > When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and
> > `END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then
> > writes a **commit or abort marker** as a regular entry in the topic's
> > managed ledger. The dispatcher discovers committed/aborted state by
> > replaying these markers and consulting the in-memory aborted-txn set.
> >
> > ### Scalable topics
> >
> > [PIP-460](pip-460.md) introduces scalable topics: a logical topic
> > backed by a DAG of range segments (`segment://...`) that can be split
> > or merged at runtime. Each segment is a regular `PersistentTopic` from
> > the broker's perspective, but the segment's lifetime is controlled by
> > the [scalable topic controller](pip-468.md) — segments get **sealed**
> > when split or merged, after which the segment's managed ledger no
> > longer accepts writes.
> >
> > ### How the two interact
> >
> > The current transaction implementation composes per-`PersistentTopic`.
> > With scalable topics, every segment carries its own TB. This
> > composition fails in two ways:
> >
> > 1. **End-of-transaction stalls on sealed segments.** The TC sends
> > `END_TXN_ON_PARTITION` to each segment that received writes. The
> > segment's TB tries to append a commit/abort marker — which is a write
> > — and the now-sealed segment rejects it. The end-txn RPC times out
> > (~30s).
> > 2. **Pending-ack topic naming collides with the segment-domain
> > parser.** The convention `<topic>-<sub>__transaction_pending_ack` is
> > unparseable when `<topic>` is a `segment://...` URI. (Worked around in
> > #25631 with a flat persistent name; see "Out of Scope" below.)
> >
> > The first issue is structural, not just a routing bug. As long as
> > commit/abort decisions need to be persisted **inside the topic's data
> > stream**, sealing the topic terminates any in-flight transaction.
> >
> > ---
> >
> > ## Motivation
> >
> > We need transactions that:
> >
> > 1. Provide atomicity across multiple writes and acknowledgments,
> > possibly spanning multiple topics across multiple namespaces.
> > 2. Compose correctly with the scalable-topic lifecycle — including
> > splits, merges, and segments sealed mid-transaction.
> > 3. Do not require duplicating data (each `producer.send` produces a
> > single managed-ledger append).
> > 4. Reuse as much of the existing transaction surface as possible —
> > interfaces, dispatcher integration, client API — so that we are not
> > re-litigating well-understood concerns.
> > 5. Coexist with v4 transactions on `persistent://` topics with no
> > behavior change for those topics.
> >
> > The structural mismatch between in-stream markers and a mutable
> > segment DAG cannot be papered over at the routing or the topic-naming
> > layer. It needs a transaction representation that does not put the
> > decision record inside the data stream.
> >
> > ---
> >
> > ## Goals
> >
> > ### In Scope
> >
> > - Atomic transactions over `segment://` topics (writes and acks),
> > including transactions whose lifetime spans split/merge.
> > - Multi-topic, multi-namespace, multi-segment transactions with the
> > same atomicity guarantees as today.
> > - Reuse of the existing `Transaction`, `TransactionCoordinator`,
> > `TransactionBuffer`, `PendingAckStore`, dispatcher, and client APIs.
> > New behavior arrives as alternative implementations behind the
> > existing interfaces.
> > - Coexistence with the legacy in-stream-marker implementation for
> > `persistent://` topics.
> >
> > ### Out of Scope
> >
> > - Replacing the legacy implementation for non-scalable topics. The new
> > implementation is opt-in per topic; `persistent://` topics keep their
> > current behavior, including the existing TC.
> > - Replacing the segment-aware pending-ack topic name introduced in
> > #25631 — that workaround becomes unnecessary as a side effect of this
> > PIP and is removed in the same change.
> > - Cross-cluster (geo-replicated) transactional semantics.
> >
> > ---
> >
> > ## High Level Design
> >
> > The proposal is one sentence:
> >
> > > **Move transactional state out of the data stream and into the metadata
> > store.**
> >
> > Concretely: keep all existing components and interfaces, and add a
> > parallel implementation of `TransactionBuffer`, `PendingAckStore`,
> > **and Transaction Coordinator** that writes nothing to any data
> > stream. Their state lives entirely in the metadata store. The legacy
> > in-stream-marker components remain, unchanged, for `persistent://`
> > topics; the new metadata-driven components handle `segment://` topics.
> > The dispatcher's contract is unchanged.
> >
> > Why introduce a v5 TC rather than reuse the legacy one: the legacy TC
> > stores its log in a system topic (`__transaction_log_*`), which
> > carries the operational concerns of any system topic — compaction can
> > lead to long recovery times, leadership has to be maintained, and
> > recovery is on the data path. With the metadata store available we can
> > have a TC whose state is just a few key-value records, no log, no
> > system topic, no per-broker in-memory replay. Running both TC
> > implementations in parallel keeps v4 transactions byte-for-byte
> > unchanged while the v5 path uses the simpler design.
> >
> > ### Why this works for scalable topics
> >
> > - **Sealing a segment is irrelevant.** Commit/abort no longer require
> > any append to the segment. End-txn becomes a metadata-store CAS on a
> > single record. Sealed segments materialize the decision (advance
> > cursors, evict cache entries) without writing anything.
> > - **The dispatcher does not change.** It already asks the topic's TB
> > for `maxReadPosition` and `isTxnAborted`. We swap the source.
> > - **Splits/merges do not strand transactions.** Sealed parents and
> > live children both consult the same metadata; the decision lives above
> > the segments.
> > - **No data is duplicated.** Each transactional `send` produces
> > exactly one managed-ledger append, same as today.
> >
> > ### Architecture Overview
> >
> > ```
> > ┌──────────────────────────────────────────────────────────────────┐
> > │   Client (V5)  -- producer.send(txn,...)                          │
> > │                -- consumer.acknowledge(id, txn)                   │
> > └─────────────────────────────────┬─────────────────────────────────┘
> >                                   │
> >             ┌─────────────────────┴─────────────────────┐
> >             │                                            │
> > ┌───────────▼────────────────┐         ┌────────────────▼──────────────┐
> > │   Transaction Coordinator   │         │   Transaction Coordinator V5
>  │
> > │   (legacy, BK-backed log)   │         │   (metadata-store records)
>  │
> > │   → v4 / persistent:// txns │         │   → v5 / segment:// txns
>  │
> > └───────────┬────────────────┘         └────────────────┬──────────────┘
> >             │                                            │
> >             │ END_TXN_ON_PARTITION / SUBSCRIPTION        │
> >             ▼                                            ▼
> > ┌─────────────────────────────────────────────────────────────────────┐
> > │   Per-topic broker components                                        │
> > │                                                                      │
> > │   ┌──────────────────────────┐  ┌────────────────────────┐          │
> > │   │ TopicTransactionBuffer    │  │ MLPendingAckStore      │          │
> > │   │ (in-stream markers)       │  │ (sibling topic)        │          │
> > │   │  → persistent:// topics   │  │  → persistent:// topics│          │
> > │   └──────────────────────────┘  └────────────────────────┘          │
> > │                                                                      │
> > │   ┌──────────────────────────┐  ┌────────────────────────┐          │
> > │   │ MetadataTransactionBuffer │  │ MetadataPendingAckStore│          │
> > │   │ (metadata-store records)  │  │ (metadata-store records)          │
> > │   │  → segment:// topics      │  │  → segment:// topics   │          │
> > │   └────────┬─────────────────┘  └─────────┬──────────────┘          │
> > └────────────┼──────────────────────────────┼─────────────────────────┘
> >              │                              │
> >              ▼                              ▼
> >            Metadata Store — txn coordinator state + txn-op records +
> > secondary indexes
> > ```
> >
> > The `TransactionBufferProvider` and
> > `TransactionPendingAckStoreProvider` SPIs already exist. The new TB /
> > PendingAckStore implementations slot in behind them. The v5 TC is a
> > parallel coordinator selected by the client when it is configured for
> > the new path. Selection on the participant side is per-topic, based on
> > the topic's domain.
> >
> > ---
> >
> > ## Detailed Design
> >
> > ### Data Model
> >
> > The metadata store holds two classes of records and four secondary
> > indexes. All records for a given transaction share the same
> > **partition key** (`txnId`) so they are co-located — this makes
> > per-txn scans (e.g. listing all ops to apply at end-txn time) a
> > single-partition operation rather than a fan-out.
> >
> > > **A note on metadata-store backends.** The design is
> > `MetadataStore`-agnostic. It depends on three capabilities —
> partition-key
> > co-location, sequential keys, and secondary indexes with range queries
> and
> > range-watch — that the `MetadataStore` interface does not expose today.
> We
> > extend the interface to surface them; backends that natively support
> these
> > (notably Oxia, the intended default) implement them directly, while
> > backends that don't (e.g. ZooKeeper) can implement them in a less
> efficient
> > way (client-side counters for sequential IDs; client-maintained index
> > records; periodic re-list in lieu of range-watch). Correctness does not
> > depend on backend choice; throughput and recovery latency may.
> >
> > #### Header — one per transaction. Linearization point.
> >
> > ```
> > /txn/<txnId>                        partitionKey = txnId
> >   =  {
> >        state:       OPEN | COMMITTED | ABORTED,
> >        timeout_ms:  <abs epoch ms>,
> >        created_ms:  <abs epoch ms>
> >      }
> > ```
> >
> > State transitions are conditional puts (CAS on version) issued by the
> > v5 TC. `OPEN → COMMITTED` and `OPEN → ABORTED` are the only allowed
> > transitions; `COMMITTED` and `ABORTED` are terminal.
> >
> > #### Operation records — one per transactional write or ack. Unbounded.
> >
> > ```
> > /txn-op/<txnId>/<seq>               partitionKey = txnId,
> >                                     sequential   = true     #
> > server-assigned <seq>
> >   =  {
> >        kind:         "write" | "ack",
> >        segment:      "segment://t/n/x/<descriptor>",  # always present
> >        subscription: "<sub-fqn>",                     # ack only
> >        position:     <ledgerId>:<entryId>
> >      }
> > ```
> >
> > Each operation is its own record, so a transaction has no size limit
> > and concurrent participants do not contend on a single record. With
> > **sequential keys** the server (or, on backends that lack them, a
> > `MetadataStore`-side counter) assigns `<seq>`, eliminating client-side
> > collisions.
> >
> > #### Secondary indexes (auto-maintained by the metadata store)
> >
> > ```
> > idx:writes-by-segment              on /txn-op/* where kind=write
> >                                    key = segment
> >                                    →  range query "writes touching
> segment
> > S"
> >
> > idx:acks-by-segment-subscription   on /txn-op/* where kind=ack
> >                                    key = (segment, subscription)
> >                                    →  range query "acks on (segment S,
> > subscription SU)"
> >
> > idx:txn-by-deadline                on /txn/* where state=OPEN
> >                                    key = timeout_ms
> >                                    →  range query "open txns past
> deadline"
> >                                    →  used by TC for timeout-driven abort
> >
> > idx:txn-by-final-state             on /txn/* where state ∈ {COMMITTED,
> > ABORTED}
> >                                    key = (state, finalized_ms)
> >                                    →  range query "finalized txns ready
> > for GC"
> >                                    →  used by GC sweep to find
> > finalized txns whose op records can be deleted
> > ```
> >
> > #### Garbage collection
> >
> > A finalized transaction (`COMMITTED` or `ABORTED`) is removed in two
> > phases:
> >
> > 1. **Per-participant materialization.** When the TC fans out end-txn,
> > each participant broker materializes the decision (commit: advance
> > subscription cursors for acks, evict header cache; abort: drop ops).
> > Once a participant has finished its materialization for `<txnId>`, it
> > deletes its op records (`/txn-op/<txnId>/<seq>` for ops it owns).
> > 2. **Header GC sweep.** A periodic sweep scans
> > `idx:txn-by-final-state` for entries past a configurable retention
> > window (e.g. 60 s after `finalized_ms`). For each, it verifies no
> > `/txn-op/<txnId>/*` records remain (orphan check from a participant
> > crash), forces deletion of any leftovers, and finally deletes the
> > header `/txn/<txnId>`.
> >
> > Because all of a txn's records share the same partition (`partitionKey
> > = txnId`), the GC sweep's per-txn cleanup stays in one partition: list
> > `/txn-op/<txnId>/`, delete, then delete the header.
> >
> > Indexes update transactionally with the underlying records, so they
> > self-clean.
> >
> > ### Components
> >
> > #### `MetadataTransactionBuffer` (new)
> >
> > Implements the existing `TransactionBuffer` interface. Used for
> > `segment://` topics.
> >
> > | Method | Behavior |
> > |---|---|
> > | `appendBufferToTxn(txnId, buf)` | `ML.asyncAddEntry(buf)`; on
> > success, append a sequential `/txn-op/<txnId>/<seq>`
> > (`partitionKey=txnId`) with `kind="write", segment, position`. The
> > publish ack waits for both. |
> > | `commit(txnId, position)` / `abort(...)` | Not invoked by the v5 TC
> > (which does not RPC participants). The TB's header watch fires when
> > `/txn/<txnId>.state` changes; the TB then materializes locally (evict
> > / mark-aborted) and deletes its owned op records. |
> > | `getMaxReadPosition()` | Read from in-memory cache. Cache is
> > populated by a watch on `idx:writes-by-segment == <my-segment>` joined
> > against the header cache. Result: `min(position over OPEN txns) - 1`,
> > capped at LAC. |
> > | `isTxnAborted(msg)` | Look up `/txn/<txnId>.state` from header cache. |
> > | `recover()` | Open the index watch and the header cache; populate
> > from the current snapshot. No log replay, no snapshot topic. |
> >
> > #### `MetadataPendingAckStore` (new)
> >
> > Implements the existing `PendingAckStore` interface. Used for
> > `segment://` topic subscriptions.
> >
> > | Method | Behavior |
> > |---|---|
> > | `appendIndividualAck(txnId, positions)` | Append sequential
> > `/txn-op/<txnId>/<seq>` records with `kind="ack", segment,
> > subscription, position`. |
> > | `appendCumulativeAck(...)` | Same shape, single op record carrying
> > the cumulative position. |
> > | `commit(txnId)` / `abort(txnId)` | Not invoked by the v5 TC.
> > Triggered locally when the header watch on `/txn/<txnId>.state` fires.
> > Commit: range-query `idx:acks-by-segment-subscription ==
> > (<my-segment>, <my-subscription>)` filtered to `<txnId>`; apply to
> > cursor (`markDelete` or `individualAck`); range-delete the op records.
> > Abort: range-delete the op records, no cursor work. |
> > | `replayAsync()` (recovery) | Range-query
> > `idx:acks-by-segment-subscription == (<my-segment>,
> > <my-subscription>)`, group by `txnId`, hydrate in-memory state. |
> >
> > #### Transaction Coordinator V5 (new)
> >
> > A parallel coordinator selected by the v5 client. Same client-facing
> > wire commands (`NEW_TXN`, `ADD_PARTITION_TO_TXN`,
> > `ADD_SUBSCRIPTION_TO_TXN`, `END_TXN`), but no system-topic log: every
> > operation reads or CAS's a metadata-store record. **The TC does not
> > RPC participants** — see "Notification mechanism" below.
> >
> > | Operation | Behavior |
> > |---|---|
> > | `newTxn(timeoutMs)` | Create `/txn/<txnId>` with `state=OPEN`,
> > `timeout_ms=now+timeoutMs`. |
> > | `addPartitionToTxn` / `addSubscriptionToTxn` | No-op at the
> > coordinator. The participant broker writes its own op records when the
> > actual write/ack arrives; the TC never needs to enumerate
> > participants. |
> > | `endTxn(COMMIT\|ABORT)` | A single CAS on `/txn/<txnId>.state`.
> > After it returns, the TC sets `finalized_ms` on the header and acks
> > the client. No fan-out, no waiting on participants. |
> > | Timeout sweep | Range-query `idx:txn-by-deadline` for entries with
> > `timeout_ms ≤ now`, abort each (same single-CAS flow). |
> > | GC sweep | Range-query `idx:txn-by-final-state` for entries past
> > retention; for each, verify `/txn-op/<txnId>/*` is empty (force-delete
> > leftovers); delete header. |
> >
> > Why parallel rather than reusing the legacy TC: the legacy TC's
> > per-shard system topic (`__transaction_log_*`) requires leadership
> > election, runs compaction over its own log, and pays a recovery cost
> > on every broker restart proportional to the live transaction set. The
> > v5 TC's state is just per-txn KV records — there is no log to compact
> > and no cold-start replay. Running both in parallel keeps v4
> > transactions byte-for-byte unchanged while the v5 path uses the
> > simpler design. A v5 client routes its `NEW_TXN` to the v5 TC; v4
> > clients route to the legacy TC. A single transaction does not span the
> > two.
> >
> > #### Notification mechanism (TC → participants)
> >
> > The legacy TC needs to RPC each participant (`END_TXN_ON_PARTITION`,
> > `END_TXN_ON_SUBSCRIPTION`) because the participants have no other way
> > to learn the decision — the TC's log is the only source of truth, and
> > only the TC reads it.
> >
> > In the v5 design **the metadata store is the source of truth**, and
> > every participant already reads from it. Participants therefore learn
> > about state transitions directly from the store, without any
> > TC-to-broker RPC:
> >
> > - A `MetadataTransactionBuffer` keeps an in-memory header cache for
> > txns it has writes from. The cache entries are populated when a write
> > op record is appended (the broker reads the header to authorize the
> > write) and **kept up to date by point-watches on the headers it has
> > cached**.
> > - A `MetadataPendingAckStore` maintains the same pattern for txns it
> > has acks from.
> > - When the TC CAS's `/txn/<txnId>.state` from OPEN →
> > COMMITTED/ABORTED, every cached watcher fires. Each participant
> > materializes locally:
> >   - **Commit** — TB evicts its cache entry (the txn no longer pins
> > `maxReadPosition` back); PendingAckStore applies the buffered acks to
> > the cursor.
> >   - **Abort** — TB marks the txn aborted in its cache (the
> > dispatcher's `isTxnAborted` will skip those entries); PendingAckStore
> > drops the buffered acks.
> > - After materialization, the participant deletes the op records it
> > owns (`/txn-op/<txnId>/<seq>` for ops on its segment / subscription).
> > - The TC's GC sweep (above) detects when all participants have done
> > their cleanup — the prefix `/txn-op/<txnId>/*` is empty — and deletes
> > the header.
> >
> > Consequences:
> >
> > - **End-txn latency.** From the client's perspective, `commit` returns
> > as soon as the header CAS lands. From a consumer's perspective,
> > freshly-committed entries become visible after the participant's
> > header watch fires + materialization runs. That's typically tens of
> > milliseconds; bounded by metadata-store watch propagation. (If we ever
> > care about a tighter bound — e.g. for a given workload — the TC can
> > issue an optional `nudge` RPC to participants in parallel with the
> > CAS. Not needed for correctness; not in this PIP.)
> > - **No RPC fan-out from TC.** End-txn is `O(1)` work at the TC: one
> > CAS. The cost of fan-out is paid by the metadata store's
> > watch-delivery infrastructure, which already exists for other Pulsar
> > uses.
> > - **Crash idempotence.** A participant that crashes during
> > materialization restarts, observes the (already-final) header state
> > via its watch, and finishes materialization. The TC need not retry
> > anything.
> >
> > #### Dispatcher
> >
> > Unchanged. It already asks
> > `topic.getTransactionBuffer().getMaxReadPosition()` and
> > `topic.getTransactionBuffer().isTxnAborted(...)`. The new TB
> > implements both.
> >
> > ### Flows
> >
> > #### Publish (transactional)
> >
> > ```mermaid
> > sequenceDiagram
> >     participant C as Client
> >     participant B as Segment broker
> >     participant ML as Managed Ledger
> >     participant M as Metadata Store
> >
> >     C->>B: send(txnId, payload)
> >     B->>M: read /txn/<txnId>.state  (cached)
> >     alt state != OPEN
> >         B-->>C: TxnConflict
> >     else state == OPEN
> >         B->>ML: asyncAddEntry(payload)
> >         ML-->>B: position
> >         B->>M: put /txn-op/<txnId>/<seq> {kind=write, segment, position}
> >         M-->>B: ack
> >         B-->>C: send-ack
> >     end
> > ```
> >
> > The header read is cache-first; the cache is invalidated by the same
> > watch the TB already maintains on the header. The op-record put is the
> > only synchronous metadata-store write on the publish path.
> >
> > #### End-txn (commit or abort)
> >
> > ```mermaid
> > sequenceDiagram
> >     participant Cl as Client
> >     participant TC as Transaction Coordinator V5
> >     participant M as Metadata Store
> >     participant P as Participant brokers
> >
> >     Cl->>TC: commit(txnId)
> >     TC->>M: CAS /txn/<txnId>.state OPEN→COMMITTED, set finalized_ms
> >     M-->>TC: ack
> >     TC-->>Cl: ack
> >
> >     Note over M,P: Independently, asynchronously:
> >     M-->>P: header watch fires
> >     P->>P: materialize (cursor advance / cache evict)
> >     P->>M: delete owned /txn-op/<txnId>/<seq> records
> >
> >     Note over TC,M: Later, GC sweep:
> >     TC->>M: list /txn-op/<txnId>/*  (empty? then delete header)
> > ```
> >
> > The CAS on the header is the linearization point — that is when the
> > transaction's outcome is decided. Notification of participants is not
> > part of the linearization; it propagates via the watches every
> > participant already maintains on the headers it has cached. Sealed
> > segments are fine — materialization is metadata + cursor work, no
> > managed-ledger writes.
> >
> > #### Subscribe / dispatch
> >
> > Unchanged. The dispatcher polls `tb.getMaxReadPosition()` and filters
> > by `tb.isTxnAborted(msg)`. The `MetadataTransactionBuffer` answers
> > both from its in-memory caches, fed by metadata-store watches.
> >
> > #### Late-write race
> >
> > The TC is mid-end-txn when the client publishes once more inside the
> > same transaction. The header CAS may have already flipped to
> > `COMMITTED`/`ABORTED`. The publish-path header check on the
> > participant broker rejects with `TxnConflict`. This mirrors today's TC
> > behavior (the TC marks transactions as ENDING and brokers reject new
> > writes); the only difference is that the rejection criterion is now
> > read from the metadata store rather than from a TC RPC.
> >
> > ### Recovery
> >
> > - **Broker startup.** Each `MetadataTransactionBuffer` opens its index
> > watch and header cache. The first watch event delivers the snapshot;
> > the TB is ready as soon as the snapshot has been applied. No log
> > replay, no system-topic reader, no snapshot topic.
> > - **Broker crash mid-publish.** If the broker appended the entry but
> > crashed before writing the op record, the entry exists in the segment
> > but no metadata claims it. On txn timeout the TC aborts the txn; the
> > dispatcher's `isTxnAborted` check (which falls back to "abort" for
> > unknown txnIds at retention horizon) discards the entry.
> > - **Broker crash mid-end-txn.** If the header CAS landed but
> > materialization on a participant did not complete, the participant
> > re-derives state from the header on restart and finishes
> > materialization. End-txn is idempotent.
> > - **TC failover.** The v5 TC has no in-memory log to replay — its
> > state lives in the metadata store. Whichever broker takes over
> > coordinator duty for a TC partition resumes operations directly from
> > the metadata-store records. Cold-start cost is bounded by
> > `idx:txn-by-deadline` and `idx:txn-by-final-state` scans, not by
> > replay of an entire transaction log.
> >
> > ### Concurrency and contention
> >
> > - Each transactional publish writes a unique `/txn-op/<txnId>/<seq>`
> > record (server-assigned sequential key). There is no contention
> > between concurrent participants of the same transaction.
> > - The header is CAS'd at most twice per transaction lifetime (open +
> > finalize), so contention there is bounded.
> > - All records for a given txn share `partitionKey=txnId`, so per-txn
> > operations (list, range-delete) stay on a single partition.
> > - Index updates are managed by the metadata store; their scaling is
> > the store's concern.
> >
> > ---
> >
> > ## Public-facing Changes
> >
> > ### Public API
> >
> > No changes. The client-facing `Transaction` API is unchanged.
> >
> > ### Binary protocol
> >
> > No changes to client-facing wire commands (`NEW_TXN`,
> > `ADD_PARTITION_TO_TXN`, `ADD_SUBSCRIPTION_TO_TXN`, `END_TXN`) — the v5
> > TC accepts them with the same semantics as the legacy TC.
> >
> > The broker-to-broker commands `END_TXN_ON_PARTITION` and
> > `END_TXN_ON_SUBSCRIPTION` are **not used** by the v5 path: participant
> > brokers learn about the decision by watching the metadata-store header
> > rather than by receiving an RPC from the TC. The legacy TC still uses
> > these commands for v4 transactions; they remain in the protocol
> > unchanged.
> >
> > ### Configuration
> >
> > A per-namespace or per-broker setting selects the TB implementation.
> > Default for `segment://` topics: metadata-driven. Default for
> > `persistent://` topics: in-stream markers (unchanged). Override is
> > possible per-namespace for debugging / migration.
> >
> > ### Metrics
> >
> > Existing transaction metrics remain. The metadata-driven implementation
> > adds:
> >
> > - `pulsar_txn_metadata_store_op_writes_total` (counter) — op records
> > written.
> > -
> `pulsar_txn_metadata_store_header_cas_total{result="ok|conflict|reject"}`
> > (counter) — header CAS attempts and outcomes.
> > - `pulsar_txn_metadata_store_index_query_seconds` (histogram) —
> > latency of the index range queries on `idx:writes-by-segment` /
> > `idx:acks-by-segment-subscription`.
> > - `pulsar_txn_metadata_store_outstanding_op_records` (gauge) —
> > uncollected op records (a proxy for txn GC backlog).
> >
> > Existing `pulsar_txn_tb_*` snapshot/replay metrics are not emitted by
> > the new implementation (no snapshots, no replay).
> >
> > ---
> >
> > ## Backward & Forward Compatibility
> >
> > ### Upgrade
> >
> > - Existing `persistent://` topic behavior is unchanged. v4 clients see
> > no difference.
> > - Brokers running this PIP can interoperate with brokers that do not,
> > as long as a given **topic** is consistently served by brokers of one
> > kind. Since topic ownership is bundle-based and migration via
> > load-balancer transfers TB state across, this is satisfied
> > automatically.
> > - Per-segment pending-ack topics created by the workaround in #25631
> >
> >
> (`persistent://t/n/<localName>-<descriptor>-<sub>__transaction_pending_ack`)
> > are no longer used. They are deleted as part of upgrade. Since the
> > workaround was only ever exercised by V5 transactional consumer flows,
> > the upgrade path is safe.
> >
> > ### Downgrade / Rollback
> >
> > Not applicable. Scalable topics are introduced as a new feature in
> > Pulsar 5.0 ([PIP-460](pip-460.md)); this PIP defines transactional
> > support for that feature from the start. There is no prior version to
> > roll back to.
> >
> > ### Pulsar Geo-Replication
> >
> > Out of scope. Transactional geo-replication is not supported in either
> > model.
> >
> > ---
> >
> > ## Alternatives Considered
> >
> > ### A. Move TB to the scalable-topic level (one TB per logical topic)
> >
> > Earlier draft of this design. Architecturally clean — decisions live
> > above segments — but introduces a new broker-side singleton per
> > scalable topic, adds new failover semantics, and complicates the TC's
> > wire protocol (end-txn would need redirection from segment to
> > scalable-topic owner). Replacing the per-topic TB **implementation**
> > with a metadata-driven one achieves the same correctness without any
> > of that surface area.
> >
> > ### B. Per-segment TB but using an off-segment marker stream
> >
> > Keep the per-segment TB; have it write commit/abort markers to a
> > **separate** managed ledger (e.g. a shadow topic) rather than into the
> > segment's own data. Sealed segments would no longer block end-txn.
> > Rejected because: (1) it doubles the data path (every txn needs a
> > write to the segment **and** to the shadow topic), (2) it requires a
> > new system-topic-per-segment, and (3) it does not eliminate the
> > snapshot/replay machinery that the metadata-driven approach removes
> > outright.
> >
> > ### C. Skip transactional support on scalable topics
> >
> > Document scalable topics as non-transactional. Rejected: the
> > transactional consume-and-produce pattern is a primary use case for
> > scalable streaming workloads (Kafka Streams analogue), and PIP-460's
> > roadmap explicitly calls out transactions across range segments as a
> > Phase 4 deliverable.
> >
> > ---
> >
> > ## General Notes
> >
> > The shape of the change is *one new `TransactionBuffer`
> > implementation, one new `PendingAckStore` implementation, one new
> > `TransactionCoordinator` implementation, and the `MetadataStore`
> > extensions to support partition-key co-location, sequential keys, and
> > secondary indexes with range-watch*. The complexity is in the
> > interaction of the metadata schema with the dispatcher's existing
> > assumptions, not in any new system component on the broker.
> >
> > ## Links
> >
> > - [PIP-460: Scalable Topics](pip-460.md)
> > - [PIP-468: Scalable Topic Controller](pip-468.md)
> >
> >
> >
> > --
> > Matteo Merli
> > <[email protected]>
> >
> >
>

Reply via email to