lhotari commented on code in PR #25754:
URL: https://github.com/apache/pulsar/pull/25754#discussion_r3229847960


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnMetadataStore.java:
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.Stat;
+
+/**
+ * Typed façade over a {@link MetadataStore} that implements the PIP-473 
transaction data layout.
+ *
+ * <p>Responsibilities:
+ * <ul>
+ *   <li>JSON serde for {@link TxnHeader}, {@link TxnOp}, {@link 
TxnEvent}.</li>
+ *   <li>Attach {@link Option.PartitionKey} so all records for one txnId / 
segment / (segment, sub)
+ *       co-locate on sharded backends (Oxia).</li>
+ *   <li>Attach {@link Option.SecondaryIndex} entries so the runtime queries 
hit native indexes when
+ *       available, with a deserializing fallback predicate for stores that 
don't have them.</li>
+ *   <li>Append op-log and event records via {@link 
Option.SequenceKeysDeltas}.</li>
+ * </ul>
+ *
+ * <p>The façade is stateless apart from holding the store reference — index 
population happens via
+ * options on writes, so there is no explicit registration step.
+ */
+public class TxnMetadataStore {
+
+    /** Sequence-keys delta used by all append-only streams in this layout. */
+    private static final Option.SequenceKeysDeltas APPEND_DELTAS =
+            new Option.SequenceKeysDeltas(List.of(1L));
+
+    private final MetadataStore store;
+
+    public TxnMetadataStore(MetadataStore store) {
+        this.store = store;
+    }
+
+    // ---- Header CRUD -------------------------------------------------------
+
+    /** @return the header at {@code /txn/<txnId>} with its version, or empty 
if not present. */
+    public CompletableFuture<Optional<Versioned<TxnHeader>>> getHeader(String 
txnId) {
+        return store.get(TxnPaths.header(txnId), Set.of(new 
Option.PartitionKey(txnId)))
+                .thenApply(opt -> opt.map(gr ->
+                        new Versioned<>(fromJson(gr.getValue(), 
TxnHeader.class), gr.getStat().getVersion())));
+    }
+
+    /**
+     * Create the txn header at version -1 (must not exist). Adds the deadline 
secondary-index entry
+     * so the timeout sweeper can range-scan open transactions.
+     */
+    public CompletableFuture<Stat> createHeader(String txnId, TxnHeader 
header) {
+        return store.put(TxnPaths.header(txnId), toJson(header), 
Optional.of(-1L),
+                headerOptions(txnId, header));
+    }
+
+    /**
+     * CAS-update the txn header. Pass the {@code version} returned by a 
previous {@link #getHeader}.
+     * Index entries are recomputed by the store based on the options on this 
write.
+     */
+    public CompletableFuture<Stat> updateHeader(String txnId, TxnHeader 
header, long expectedVersion) {
+        return store.put(TxnPaths.header(txnId), toJson(header), 
Optional.of(expectedVersion),
+                headerOptions(txnId, header));
+    }
+
+    /** Delete the txn header with CAS on the expected version. Tolerates a 
NotFound result. */
+    public CompletableFuture<Void> deleteHeader(String txnId, long 
expectedVersion) {
+        return store.deleteIfExists(TxnPaths.header(txnId), 
Optional.of(expectedVersion),
+                Set.of(new Option.PartitionKey(txnId)));
+    }
+
+    private static Set<Option> headerOptions(String txnId, TxnHeader header) {
+        Option.SecondaryIndex idx;
+        if (header.getState().isTerminal()) {
+            long finalizedMs = header.getFinalizedAt() == null ? 0L : 
header.getFinalizedAt().toEpochMilli();
+            idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_FINAL_STATE,
+                    TxnPaths.finalStateKey(header.getState(), finalizedMs));
+        } else {
+            long deadlineMs = header.getCreatedAt().toEpochMilli() + 
header.getTimeout().toMillis();
+            idx = new Option.SecondaryIndex(TxnPaths.IDX_TXN_BY_DEADLINE, 
TxnPaths.longKey(deadlineMs));
+        }
+        return Set.of(new Option.PartitionKey(txnId), idx);
+    }
+
+    // ---- Op-log append -----------------------------------------------------
+
+    /**
+     * Append a {@link TxnOp} under {@code /txn-op/<txnId>-<seq>}. Adds the 
per-kind secondary-index
+     * entry — {@link TxnPaths#IDX_WRITES_BY_SEGMENT} for writes,
+     * {@link TxnPaths#IDX_ACKS_BY_SEGMENT_SUBSCRIPTION} for acks. Returns the 
{@link Stat} whose
+     * {@code path} carries the generated sequence key.
+     */
+    public CompletableFuture<Stat> appendOp(String txnId, TxnOp op) {
+        Option.SecondaryIndex idx = switch (op.getKind()) {
+            case WRITE -> new 
Option.SecondaryIndex(TxnPaths.IDX_WRITES_BY_SEGMENT, op.getSegment());
+            case ACK -> new 
Option.SecondaryIndex(TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
+                    TxnPaths.ackIndexKey(op.getSegment(), 
op.getSubscription()));
+        };
+        Set<Option> opts = Set.of(new Option.PartitionKey(txnId), idx, 
APPEND_DELTAS);
+        return store.put(TxnPaths.opParent(txnId), toJson(op), 
Optional.empty(), opts);
+    }
+
+    // ---- Index queries -----------------------------------------------------
+
+    /** Stream all write ops targeting {@code segment}. */
+    public CompletableFuture<Void> listWritesBySegment(String segment, 
ScanConsumer consumer) {
+        return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, 
TxnPaths.IDX_WRITES_BY_SEGMENT,
+                segment, segment,
+                gr -> {
+                    TxnOp op = fromJson(gr.getValue(), TxnOp.class);
+                    return op.getKind() == TxnOpKind.WRITE && 
segment.equals(op.getSegment());
+                },
+                consumer);
+    }
+
+    /** Stream all ack ops targeting {@code (segment, subscription)}. */
+    public CompletableFuture<Void> listAcksBySegmentSubscription(String 
segment, String subscription,
+                                                                 ScanConsumer 
consumer) {
+        String key = TxnPaths.ackIndexKey(segment, subscription);
+        return store.scanByIndex(TxnPaths.TXN_OP_PREFIX, 
TxnPaths.IDX_ACKS_BY_SEGMENT_SUBSCRIPTION,
+                key, key,
+                gr -> {
+                    TxnOp op = fromJson(gr.getValue(), TxnOp.class);
+                    return op.getKind() == TxnOpKind.ACK
+                            && segment.equals(op.getSegment())
+                            && subscription.equals(op.getSubscription());
+                },
+                consumer);
+    }
+
+    /**
+     * Stream open transactions whose deadline falls in {@code 
[fromMsInclusive, toMsInclusive]}.
+     * Pass {@code null} on either bound for an unbounded range.
+     */
+    public CompletableFuture<Void> listOpenByDeadlineRange(Long 
fromMsInclusive, Long toMsInclusive,
+                                                           ScanConsumer 
consumer) {
+        String from = fromMsInclusive == null ? null : 
TxnPaths.longKey(fromMsInclusive);
+        String to = toMsInclusive == null ? null : 
TxnPaths.longKey(toMsInclusive);
+        return store.scanByIndex(TxnPaths.TXN_HEADER_PREFIX, 
TxnPaths.IDX_TXN_BY_DEADLINE,
+                from, to,
+                gr -> {
+                    TxnHeader h = fromJson(gr.getValue(), TxnHeader.class);
+                    if (h.getState().isTerminal()) {
+                        return false;
+                    }
+                    long deadline = h.getCreatedAt().toEpochMilli() + 
h.getTimeout().toMillis();
+                    return (fromMsInclusive == null || deadline >= 
fromMsInclusive)
+                            && (toMsInclusive == null || deadline <= 
toMsInclusive);
+                },
+                consumer);
+    }
+
+    /**
+     * Stream terminal transactions in {@code state} whose finalization time 
falls in
+     * {@code [fromMsInclusive, toMsInclusive]}. Pass {@code null} on either 
bound for unbounded.
+     */
+    public CompletableFuture<Void> listFinalizedByStateAndTimeRange(TxnState 
state,
+                                                                    Long 
fromMsInclusive, Long toMsInclusive,
+                                                                    
ScanConsumer consumer) {
+        String from = fromMsInclusive == null ? state.name() + ":" : 
TxnPaths.finalStateKey(state, fromMsInclusive);
+        String to = toMsInclusive == null
+                ? state.name() + ":" + "9".repeat(TxnPaths.LONG_KEY_WIDTH)

Review Comment:
   use constant?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to