lhotari commented on code in PR #25754: URL: https://github.com/apache/pulsar/pull/25754#discussion_r3229847960
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java: ########## @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.metadata; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Option; +import org.apache.pulsar.metadata.api.ScanConsumer; +import org.apache.pulsar.metadata.api.Stat; + +/** + * Typed façade over a {@link MetadataStore} that implements the PIP-473 transaction data layout. + * + * <p>Responsibilities: + * <ul> + * <li>JSON serde for {@link TxnHeader}, {@link TxnOp}, {@link TxnEvent}.</li> + * <li>Attach {@link Option.PartitionKey} so all records for one txnId / segment / (segment, sub) + * co-locate on sharded backends (Oxia).</li> + * <li>Attach {@link Option.SecondaryIndex} entries so the runtime queries hit native indexes when + * available, with a deserializing fallback predicate for stores that don't have them.</li> + * <li>Append op-log and event records via {@link Option.SequenceKeysDeltas}.</li> + * </ul> + * + * <p>The façade is stateless apart from holding the store reference — index population happens via + * options on writes, so there is no explicit registration step. + */ +public class TxnMetadataStore { + + /** Sequence-keys delta used by all append-only streams in this layout. */ + private static final Option.SequenceKeysDeltas APPEND_DELTAS = + new Option.SequenceKeysDeltas(List.of(1L)); + + private final MetadataStore store; + + public TxnMetadataStore(MetadataStore store) { + this.store = store; + } + + // ---- Header CRUD ------------------------------------------------------- + + /** @return the header at {@code /txn/<txnId>} with its version, or empty if not present. */ + public CompletableFuture<Optional<Versioned<TxnHeader>>> getHeader(String txnId) { + return store.get(TxnPaths.header(txnId), Set.of(new Option.PartitionKey(txnId))) + .thenApply(opt -> opt.map(gr -> + new Versioned<>(fromJson(gr.getValue(), TxnHeader.class), gr.getStat().getVersion()))); + } + + /** + * Create the txn header at version -1 (must not exist). Adds the deadline secondary-index entry + * so the timeout sweeper can range-scan open transactions. + */ + public CompletableFuture<Stat> createHeader(String txnId, TxnHeader header) { + return store.put(TxnPaths.header(txnId), toJson(header), Optional.of(-1L), + headerOptions(txnId, header)); + } + + /** + * CAS-update the txn header. Pass the {@code version} returned by a previous {@link #getHeader}. + * Index entries are recomputed by the store based on the options on this write. + */ + public CompletableFuture<Stat> updateHeader(String txnId, TxnHeader header, long expectedVersion) { + return store.put(TxnPaths.header(txnId), toJson(header), Optional.of(expectedVersion), + headerOptions(txnId, header)); + } + + /** Delete the txn header with CAS on the expected version. Tolerates a NotFound result. */ + public CompletableFuture<Void> deleteHeader(String txnId, long expectedVersion) { + return store.deleteIfExists(TxnPaths.header(txnId), Optional.of(expectedVersion), + Set.of(new Option.PartitionKey(txnId))); + } + + private static Set<Option> headerOptions(String txnId, TxnHeader header) { + Option.SecondaryIndex idx; + if (header.getState().isTerminal()) { + long finalizedMs = header.getFinalizedAt() == null ? 0L : header.getFinalizedAt().toEpochMilli(); + idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_FINAL_STATE, + TxnPaths.finalStateKey(header.getState(), finalizedMs)); + } else { + long deadlineMs = header.getCreatedAt().toEpochMilli() + header.getTimeout().toMillis(); + idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_DEADLINE, TxnPaths.longKey(deadlineMs)); + } + return Set.of(new Option.PartitionKey(txnId), idx); + } + + // ---- Op-log append ----------------------------------------------------- + + /** + * Append a {@link TxnOp} under {@code /txn-op/<txnId>-<seq>}. Adds the per-kind secondary-index + * entry — {@link TxnPaths#IDX_WRITES_BY_SEGMENT} for writes, + * {@link TxnPaths#IDX_ACKS_BY_SEGMENT_SUBSCRIPTION} for acks. Returns the {@link Stat} whose + * {@code path} carries the generated sequence key. + */ + public CompletableFuture<Stat> appendOp(String txnId, TxnOp op) { + Option.SecondaryIndex idx = switch (op.getKind()) { + case WRITE -> new Option.SecondaryIndex(TxnPaths.IDX_WRITES_BY_SEGMENT, op.getSegment()); + case ACK -> new Option.SecondaryIndex(TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION, + TxnPaths.ackIndexKey(op.getSegment(), op.getSubscription())); + }; + Set<Option> opts = Set.of(new Option.PartitionKey(txnId), idx, APPEND_DELTAS); + return store.put(TxnPaths.opParent(txnId), toJson(op), Optional.empty(), opts); + } + + // ---- Index queries ----------------------------------------------------- + + /** Stream all write ops targeting {@code segment}. */ + public CompletableFuture<Void> listWritesBySegment(String segment, ScanConsumer consumer) { + return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, TxnPaths.IDX_WRITES_BY_SEGMENT, + segment, segment, + gr -> { + TxnOp op = fromJson(gr.getValue(), TxnOp.class); + return op.getKind() == TxnOpKind.WRITE && segment.equals(op.getSegment()); + }, + consumer); + } + + /** Stream all ack ops targeting {@code (segment, subscription)}. */ + public CompletableFuture<Void> listAcksBySegmentSubscription(String segment, String subscription, + ScanConsumer consumer) { + String key = TxnPaths.ackIndexKey(segment, subscription); + return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION, + key, key, + gr -> { + TxnOp op = fromJson(gr.getValue(), TxnOp.class); + return op.getKind() == TxnOpKind.ACK + && segment.equals(op.getSegment()) + && subscription.equals(op.getSubscription()); + }, + consumer); + } + + /** + * Stream open transactions whose deadline falls in {@code [fromMsInclusive, toMsInclusive]}. + * Pass {@code null} on either bound for an unbounded range. + */ + public CompletableFuture<Void> listOpenByDeadlineRange(Long fromMsInclusive, Long toMsInclusive, + ScanConsumer consumer) { + String from = fromMsInclusive == null ? null : TxnPaths.longKey(fromMsInclusive); + String to = toMsInclusive == null ? null : TxnPaths.longKey(toMsInclusive); + return store.scanByIndex(TxnPaths.TXN_HEADER_PREFIX, TxnPaths.IDX_TXN_BY_DEADLINE, + from, to, + gr -> { + TxnHeader h = fromJson(gr.getValue(), TxnHeader.class); + if (h.getState().isTerminal()) { + return false; + } + long deadline = h.getCreatedAt().toEpochMilli() + h.getTimeout().toMillis(); + return (fromMsInclusive == null || deadline >= fromMsInclusive) + && (toMsInclusive == null || deadline <= toMsInclusive); + }, + consumer); + } + + /** + * Stream terminal transactions in {@code state} whose finalization time falls in + * {@code [fromMsInclusive, toMsInclusive]}. Pass {@code null} on either bound for unbounded. + */ + public CompletableFuture<Void> listFinalizedByStateAndTimeRange(TxnState state, + Long fromMsInclusive, Long toMsInclusive, + ScanConsumer consumer) { + String from = fromMsInclusive == null ? state.name() + ":" : TxnPaths.finalStateKey(state, fromMsInclusive); + String to = toMsInclusive == null + ? state.name() + ":" + "9".repeat(TxnPaths.LONG_KEY_WIDTH) Review Comment: use constant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
