This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d45e405fe50 [improve] Add Option.SequenceKeysDeltas + 
subscribeSequence (#25724)
d45e405fe50 is described below

commit d45e405fe50c5d0d08cf132622ef80703c856935
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 13:51:58 2026 -0700

    [improve] Add Option.SequenceKeysDeltas + subscribeSequence (#25724)
---
 .../apache/pulsar/metadata/api/MetadataStore.java  |  30 ++++
 .../org/apache/pulsar/metadata/api/Option.java     |  38 +++++
 .../apache/pulsar/metadata/api/OptionsHelper.java  |  17 +++
 .../metadata/impl/AbstractMetadataStore.java       | 155 +++++++++++++++++++++
 .../pulsar/metadata/impl/DualMetadataStore.java    |  11 ++
 .../metadata/impl/FaultInjectionMetadataStore.java |   6 +
 .../metadata/impl/oxia/OxiaMetadataStore.java      |  43 +++++-
 .../pulsar/metadata/OxiaSequenceKeysTest.java      | 152 ++++++++++++++++++++
 .../apache/pulsar/metadata/SequenceKeysTest.java   | 136 ++++++++++++++++++
 9 files changed, 585 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index d258934bec9..528c9fe1bbd 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -402,6 +402,36 @@ public interface MetadataStore extends AutoCloseable {
         return scanChildren(parentPath, consumer, Set.of());
     }
 
+    /**
+     * Subscribe to updates on a sequence-key prefix written via {@link 
Option.SequenceKeysDeltas}.
+     *
+     * <p>The {@code listener} receives the latest assigned sequence key (the 
full path with
+     * sequence suffix) as new sequence records are created under {@code 
prefix}. Multiple updates
+     * may be collapsed into a single event with the highest sequence — 
callers must treat the
+     * stream as monotonic but not exhaustive.
+     *
+     * <p>Closing the returned handle unsubscribes. On Oxia this delegates to 
the native
+     * sequence-update channel; other backends synthesize the stream from 
change notifications on
+     * the prefix's parent path.
+     *
+     * @param prefix   the sequence-key prefix (the same string passed as 
{@code path} to a
+     *                 {@code put} with {@link Option.SequenceKeysDeltas})
+     * @param listener callback receiving the full path of the latest sequence 
key
+     * @param opts     the set of {@link Option options} for this subscription
+     * @return a handle whose {@link AutoCloseable#close} cancels the 
subscription
+     * @throws MetadataStoreException if the store doesn't support sequence 
subscriptions
+     */
+    default AutoCloseable subscribeSequence(String prefix, Consumer<String> 
listener, Set<Option> opts)
+            throws MetadataStoreException {
+        throw new MetadataStoreException("Sequence subscriptions not supported 
by this store");
+    }
+
+    /** Like {@link #subscribeSequence(String, Consumer, Set)} with no 
options. */
+    default AutoCloseable subscribeSequence(String prefix, Consumer<String> 
listener)
+            throws MetadataStoreException {
+        return subscribeSequence(prefix, listener, Set.of());
+    }
+
     /**
      * Returns the default metadata cache config.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
index faee2a3a1e4..c1013b7fc92 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.metadata.api;
 
+import java.util.List;
+
 /**
  * An option attached to a {@link MetadataStore} operation.
  *
@@ -64,4 +66,40 @@ public sealed interface Option {
      * @param key the partition key (treated opaquely; equality-routed)
      */
     record PartitionKey(String key) implements Option {}
+
+    /**
+     * Request server-assigned multi-dimensional sequence keys on {@code put}. 
The {@code path}
+     * argument to {@code put} is treated as a key prefix; the actual stored 
key is
+     * {@code prefix-{seq0}-{seq1}-...} where each sequence is zero-padded 
20-digit decimal and
+     * each dimension increments atomically by its delta.
+     *
+     * <p>The {@code Stat} returned from the {@code put} carries the actual 
generated path. Pair
+     * with {@link MetadataStore#subscribeSequence} to receive notifications 
as new sequence keys
+     * are created.
+     *
+     * <p>Constraints: {@code deltas} must be non-empty, the first delta must 
be {@code > 0}, and
+     * the rest must be {@code >= 0}. On Oxia a {@link PartitionKey} must also 
be provided.
+     * Backends without native sequence-key support synthesize the same key 
format using a
+     * sidecar counter document and CAS.
+     *
+     * @param deltas per-dimension increments
+     */
+    record SequenceKeysDeltas(List<Long> deltas) implements Option {
+
+        public SequenceKeysDeltas {
+            if (deltas == null || deltas.isEmpty()) {
+                throw new IllegalArgumentException("SequenceKeysDeltas 
requires at least one delta");
+            }
+            if (deltas.get(0) <= 0) {
+                throw new IllegalArgumentException("first delta must be > 0, 
got " + deltas.get(0));
+            }
+            for (int i = 1; i < deltas.size(); i++) {
+                if (deltas.get(i) < 0) {
+                    throw new IllegalArgumentException(
+                            "delta at index " + i + " must be >= 0, got " + 
deltas.get(i));
+                }
+            }
+            deltas = List.copyOf(deltas);
+        }
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
index 13c9ff57b8e..c17ecc3c773 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata.api;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -73,4 +74,20 @@ public final class OptionsHelper {
         }
         return map == null ? Collections.emptyMap() : map;
     }
+
+    /**
+     * @return the per-dimension increments from a {@link 
Option.SequenceKeysDeltas} entry, or
+     *     {@code null} if no such option is present.
+     */
+    public static List<Long> sequenceKeysDeltas(Set<Option> opts) {
+        if (opts == null) {
+            return null;
+        }
+        for (Option o : opts) {
+            if (o instanceof Option.SequenceKeysDeltas sk) {
+                return sk.deltas();
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 72e8a26afb8..02454bd5570 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.OpenTelemetry;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.HashSet;
@@ -48,6 +49,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -629,6 +631,13 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
             metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - 
start);
             return FutureUtil.failedFuture(new 
MetadataStoreException.InvalidPathException(path));
         }
+        // Sequence-key compatibility layer: when the backend doesn't have 
native sequence-keys
+        // (everything except Oxia today), synthesize the actual key with a 
CAS-incremented
+        // counter document, then recurse into the regular put path with the 
synthesized key.
+        List<Long> deltas = OptionsHelper.sequenceKeysDeltas(opts);
+        if (deltas != null && !supportsNativeSequenceKeys()) {
+            return putWithSequenceKeysCompat(path, data, optExpectedVersion, 
opts, deltas);
+        }
         if (getMetadataEventSynchronizer().isPresent()) {
             Long version = optExpectedVersion.isPresent() && 
optExpectedVersion.get() < 0 ? null
                     : optExpectedVersion.orElse(null);
@@ -659,6 +668,152 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     }
 
+    /**
+     * Whether this store has a native multi-dimensional atomic sequence-keys 
implementation
+     * (Oxia). Backends that return {@code true} are expected to interpret
+     * {@link Option.SequenceKeysDeltas} themselves in {@code storePut} and 
surface the
+     * server-assigned key in the returned {@link Stat}. Backends that return 
{@code false} fall
+     * back to the synthesized counter+CAS path in this class.
+     */
+    protected boolean supportsNativeSequenceKeys() {
+        return false;
+    }
+
+    private CompletableFuture<Stat> putWithSequenceKeysCompat(
+            String prefix, byte[] data, Optional<Long> version, Set<Option> 
opts, List<Long> deltas) {
+        if (version.isPresent() && version.get() != -1L) {
+            return FutureUtil.failedFuture(new MetadataStoreException(
+                    "Can't have expectedVersion and SequenceKeysDeltas at the 
same time"));
+        }
+        return atomicIncrementSequenceCounter(prefix, deltas).thenCompose(seqs 
-> {
+            String synthesized = formatSequenceKey(prefix, seqs);
+            Set<Option> remainingOpts = stripSequenceKeysDeltas(opts);
+            // Recurse into the regular put path with the synthesized key. 
expectedVersion = -1
+            // guarantees we only succeed on a fresh insert — defensive 
against stale counters.
+            return put(synthesized, data, Optional.of(-1L), remainingOpts);
+        });
+    }
+
+    private CompletableFuture<long[]> atomicIncrementSequenceCounter(String 
prefix, List<Long> deltas) {
+        String counterPath = sequenceCounterPath(prefix);
+        return get(counterPath, Set.of()).thenCompose(currentOpt -> {
+            long[] currentSeqs = currentOpt.isPresent()
+                    ? decodeSequenceCounter(currentOpt.get().getValue())
+                    : new long[0];
+            long[] newSeqs = new long[deltas.size()];
+            for (int i = 0; i < deltas.size(); i++) {
+                long current = i < currentSeqs.length ? currentSeqs[i] : 0L;
+                newSeqs[i] = current + deltas.get(i);
+            }
+            Optional<Long> expectedVersion = currentOpt.isPresent()
+                    ? Optional.of(currentOpt.get().getStat().getVersion())
+                    : Optional.of(-1L);
+            byte[] encoded = encodeSequenceCounter(newSeqs);
+            return put(counterPath, encoded, expectedVersion, Set.of())
+                    .thenApply(s -> newSeqs)
+                    .exceptionallyCompose(ex -> {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
MetadataStoreException.BadVersionException) {
+                            // Concurrent writer beat us — read the new 
counter value and retry.
+                            return atomicIncrementSequenceCounter(prefix, 
deltas);
+                        }
+                        return FutureUtil.failedFuture(cause);
+                    });
+        });
+    }
+
+    /** Counter-document path for a sequence prefix. Sibling of the prefix at 
the parent level. */
+    static String sequenceCounterPath(String prefix) {
+        return prefix + "__seq_counter__";
+    }
+
+    /** Format a synthesized sequence key matching Oxia's native format: 
{@code prefix-{seq:%020d}-...}. */
+    static String formatSequenceKey(String prefix, long[] seqs) {
+        StringBuilder sb = new StringBuilder(prefix);
+        for (long s : seqs) {
+            sb.append('-').append(String.format("%020d", s));
+        }
+        return sb.toString();
+    }
+
+    private static byte[] encodeSequenceCounter(long[] seqs) {
+        ByteBuffer buf = ByteBuffer.allocate(seqs.length * Long.BYTES);
+        for (long s : seqs) {
+            buf.putLong(s);
+        }
+        return buf.array();
+    }
+
+    private static long[] decodeSequenceCounter(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        long[] seqs = new long[bytes.length / Long.BYTES];
+        for (int i = 0; i < seqs.length; i++) {
+            seqs[i] = buf.getLong();
+        }
+        return seqs;
+    }
+
+    private static Set<Option> stripSequenceKeysDeltas(Set<Option> opts) {
+        if (opts == null || opts.isEmpty()) {
+            return Set.of();
+        }
+        Set<Option> result = new HashSet<>();
+        for (Option o : opts) {
+            if (!(o instanceof Option.SequenceKeysDeltas)) {
+                result.add(o);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public AutoCloseable subscribeSequence(String prefix, Consumer<String> 
listener, Set<Option> opts) {
+        SequenceWatcher watcher = new SequenceWatcher(prefix, listener);
+        listeners.add(watcher);
+        return () -> listeners.remove(watcher);
+    }
+
+    /**
+     * Listener-based subscription bridge: filter notifications matching a 
sequence prefix and
+     * deliver only paths that strictly increase the latest seen sequence. 
Multiple updates may
+     * collapse — only the highest path observed so far is delivered, matching 
Oxia's contract.
+     */
+    private static final class SequenceWatcher implements 
Consumer<Notification> {
+        private final String prefixDash;
+        private final Consumer<String> listener;
+        private final AtomicReference<String> latest = new AtomicReference<>();
+
+        SequenceWatcher(String prefix, Consumer<String> listener) {
+            this.prefixDash = prefix + "-";
+            this.listener = listener;
+        }
+
+        @Override
+        public void accept(Notification n) {
+            if (n.getType() != NotificationType.Created) {
+                return;
+            }
+            String path = n.getPath();
+            if (!path.startsWith(prefixDash)) {
+                return;
+            }
+            while (true) {
+                String previous = latest.get();
+                if (previous != null && path.compareTo(previous) <= 0) {
+                    return;
+                }
+                if (latest.compareAndSet(previous, path)) {
+                    try {
+                        listener.accept(path);
+                    } catch (Throwable t) {
+                        log.warn().attr("path", 
path).exception(t).log("Sequence subscription listener failed");
+                    }
+                    return;
+                }
+            }
+        }
+    }
+
     /**
      * Translate {@link Option.Ephemeral}/{@link Option.Sequential} entries 
from {@code opts} into the
      * legacy {@link CreateOption} set carried by {@link MetadataEvent} for 
sync replication. Other
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 38a3bc6013a..04b56883a40 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -324,6 +324,17 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
         };
     }
 
+    @Override
+    public AutoCloseable subscribeSequence(String prefix, Consumer<String> 
listener, Set<Option> opts)
+            throws MetadataStoreException {
+        return switch (migrationState.getPhase()) {
+            case NOT_STARTED, PREPARATION, COPYING, FAILED ->
+                    sourceStore.subscribeSequence(prefix, listener, opts);
+            case COMPLETED ->
+                    targetStore.subscribeSequence(prefix, listener, opts);
+        };
+    }
+
     @Override
     public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion, Set<Option> opts) {
         switch (migrationState.getPhase()) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 552f0e817b5..e52a434d9c3 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -157,6 +157,12 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
         return store.scanChildren(parentPath, consumer, opts);
     }
 
+    @Override
+    public AutoCloseable subscribeSequence(String prefix, Consumer<String> 
listener, Set<Option> opts)
+            throws MetadataStoreException {
+        return store.subscribeSequence(prefix, listener, opts);
+    }
+
     @Override
     public void registerListener(Consumer<Notification> listener) {
         store.registerListener(listener);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 105332bf74e..07a327b3b39 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -28,9 +28,11 @@ import 
io.oxia.client.api.exceptions.KeyAlreadyExistsException;
 import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
 import io.oxia.client.api.options.DeleteOption;
 import io.oxia.client.api.options.GetOption;
+import io.oxia.client.api.options.GetSequenceUpdatesOption;
 import io.oxia.client.api.options.ListOption;
 import io.oxia.client.api.options.PutOption;
 import io.oxia.client.api.options.RangeScanOption;
+import java.io.Closeable;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashSet;
@@ -42,6 +44,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import lombok.CustomLog;
 import lombok.NonNull;
@@ -270,15 +273,22 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
             String path, byte[] data, Optional<Long> optExpectedVersion, 
Set<Option> opts) {
         boolean sequential = OptionsHelper.isSequential(opts);
         boolean ephemeral = OptionsHelper.isEphemeral(opts);
+        List<Long> sequenceKeysDeltas = OptionsHelper.sequenceKeysDeltas(opts);
         Map<String, String> secondaryIndexes = 
OptionsHelper.secondaryIndexes(opts);
+        if (sequential && sequenceKeysDeltas != null) {
+            return CompletableFuture.failedFuture(new MetadataStoreException(
+                    "Sequential and SequenceKeysDeltas cannot be combined"));
+        }
         CompletableFuture<Void> parentsCreated = createParents(path);
         return parentsCreated.thenCompose(
                 __ -> {
                     var expectedVersion = optExpectedVersion;
-                    if (expectedVersion.isPresent() && expectedVersion.get() 
!= -1L && sequential) {
+                    if (expectedVersion.isPresent() && expectedVersion.get() 
!= -1L
+                            && (sequential || sequenceKeysDeltas != null)) {
                         return CompletableFuture.failedFuture(
                                 new MetadataStoreException(
-                                        "Can't have expectedVersion and 
Sequential at the same time"));
+                                        "Can't have expectedVersion and 
Sequential/SequenceKeysDeltas at the "
+                                                + "same time"));
                     }
                     CompletableFuture<String> actualPath;
                     if (sequential) {
@@ -312,6 +322,9 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                     if (partitionKey != null) {
                         putOptions.add(PutOption.PartitionKey(partitionKey));
                     }
+                    if (sequenceKeysDeltas != null) {
+                        
putOptions.add(PutOption.SequenceKeysDeltas(sequenceKeysDeltas));
+                    }
                     var parentPath = parent(path);
                     var parentPrefix = parentPath == null ? "" : parentPath;
                     secondaryIndexes.forEach((indexName, secondaryKey) ->
@@ -324,7 +337,9 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
                                             client
                                                     .put(aPath, data, 
putOptions)
                                                     .thenApply(res -> new 
PathWithPutResult(aPath, res)))
-                            .thenApply(res -> convertStat(res.path(), 
res.result().version()))
+                            // Use the effective key returned by Oxia — for 
SequenceKeysDeltas this is
+                            // the server-assigned key with sequence suffixes 
appended.
+                            .thenApply(res -> convertStat(res.result().key(), 
res.result().version()))
                             .exceptionallyCompose(this::convertException);
                 });
     }
@@ -364,6 +379,28 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
         return partitionKey == null ? Set.of() : 
Set.of(RangeScanOption.PartitionKey(partitionKey));
     }
 
+    /**
+     * Build the Oxia {@link GetSequenceUpdatesOption} set from {@code opts}, 
currently routing the
+     * partition key.
+     */
+    private static Set<GetSequenceUpdatesOption> 
sequenceUpdatesOptions(Set<Option> opts) {
+        String partitionKey = OptionsHelper.partitionKey(opts);
+        return partitionKey == null
+                ? Set.of()
+                : Set.of(GetSequenceUpdatesOption.PartitionKey(partitionKey));
+    }
+
+    @Override
+    public AutoCloseable subscribeSequence(String prefix, Consumer<String> 
listener, Set<Option> opts) {
+        Closeable handle = client.getSequenceUpdates(prefix, listener, 
sequenceUpdatesOptions(opts));
+        return handle::close;
+    }
+
+    @Override
+    protected boolean supportsNativeSequenceKeys() {
+        return true;
+    }
+
     private <T> CompletionStage<T> convertException(Throwable ex) {
         Throwable actEx = FutureUtil.unwrapCompletionException(ex);
         if (actEx instanceof UnexpectedVersionIdException || actEx instanceof 
KeyAlreadyExistsException) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaSequenceKeysTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaSequenceKeysTest.java
new file mode 100644
index 00000000000..d0fcf694f43
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaSequenceKeysTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertNotEquals;
+import io.oxia.testcontainers.OxiaContainer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Native sequence-key tests against Oxia.
+ *
+ * <p>Oxia must receive {@link Option.PartitionKey} alongside {@link 
Option.SequenceKeysDeltas} —
+ * sequence allocation is shard-local, so all writes that share a sequence 
prefix have to land in
+ * the same shard. The tests use a multi-shard cluster to keep the routing 
path honest.
+ */
+public class OxiaSequenceKeysTest {
+
+    private static final int SHARDS = 3;
+    private OxiaContainer oxiaServer;
+
+    @BeforeClass
+    public void start() {
+        oxiaServer = new 
OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME).withShards(SHARDS);
+        oxiaServer.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void stop() {
+        if (oxiaServer != null) {
+            oxiaServer.close();
+            oxiaServer = null;
+        }
+    }
+
+    private MetadataStore newStore() throws Exception {
+        return MetadataStoreFactory.create(
+                "oxia://" + oxiaServer.getServiceAddress(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+    }
+
+    @Test
+    public void singleDimensionSequence() throws Exception {
+        @Cleanup
+        MetadataStore store = newStore();
+
+        String prefix = "/seq-single-" + System.nanoTime();
+        Set<Option> opts = Set.of(
+                new Option.PartitionKey("seq-pk"),
+                new Option.SequenceKeysDeltas(List.of(1L)));
+
+        Stat first = store.put(prefix, "a".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+        Stat second = store.put(prefix, "b".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+        Stat third = store.put(prefix, "c".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+
+        // Each call yields a distinct, monotonically-increasing key derived 
from the prefix.
+        assertThat(first.getPath()).startsWith(prefix);
+        assertNotEquals(first.getPath(), prefix, "Stat path should be the 
synthesized key, not the prefix");
+        assertThat(second.getPath()).isGreaterThan(first.getPath());
+        assertThat(third.getPath()).isGreaterThan(second.getPath());
+
+        // The synthesized records actually exist and round-trip with the same 
partition key.
+        Set<Option> readOpts = Set.of(new Option.PartitionKey("seq-pk"));
+        assertThat(store.get(first.getPath(), readOpts).get()).isPresent();
+        assertThat(store.get(third.getPath(), readOpts).get()).isPresent();
+    }
+
+    @Test
+    public void multiDimensionSequence() throws Exception {
+        @Cleanup
+        MetadataStore store = newStore();
+
+        String prefix = "/seq-multi-" + System.nanoTime();
+
+        // Two-dimension sequence: dim 0 always increments, dim 1 increments 
only when caller asks
+        // for it (delta=0 means "keep dim 1 unchanged").
+        Set<Option> bumpBoth = Set.of(
+                new Option.PartitionKey("seq-pk"),
+                new Option.SequenceKeysDeltas(List.of(1L, 1L)));
+        Set<Option> bumpFirstOnly = Set.of(
+                new Option.PartitionKey("seq-pk"),
+                new Option.SequenceKeysDeltas(List.of(1L, 0L)));
+
+        Stat r0 = store.put(prefix, new byte[]{0}, Optional.empty(), 
bumpBoth).get();
+        Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(), 
bumpFirstOnly).get();
+        Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(), 
bumpBoth).get();
+
+        // All three should be lexicographically increasing — dim 0 ticks 
every call.
+        assertThat(r0.getPath()).startsWith(prefix);
+        assertThat(r1.getPath()).isGreaterThan(r0.getPath());
+        assertThat(r2.getPath()).isGreaterThan(r1.getPath());
+    }
+
+    @Test
+    public void subscribeSequence() throws Exception {
+        @Cleanup
+        MetadataStore store = newStore();
+
+        String prefix = "/seq-watch-" + System.nanoTime();
+        Set<Option> opts = Set.of(
+                new Option.PartitionKey("seq-pk"),
+                new Option.SequenceKeysDeltas(List.of(1L)));
+        Set<Option> subOpts = Set.of(new Option.PartitionKey("seq-pk"));
+
+        ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+        @Cleanup
+        AutoCloseable handle = store.subscribeSequence(prefix, received::add, 
subOpts);
+
+        Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(), 
opts).get();
+        Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(), 
opts).get();
+        Stat r3 = store.put(prefix, new byte[]{3}, Optional.empty(), 
opts).get();
+
+        // The listener may collapse intermediate updates, but the final value 
it ever reports must
+        // be the latest sequence key — i.e. r3.getPath().
+        Awaitility.await().untilAsserted(() ->
+                
assertThat(received).isNotEmpty().last().asString().isEqualTo(r3.getPath()));
+        // Earlier emissions, when present, must point to one of the writes we 
performed.
+        for (String s : received) {
+            assertThat(s).isIn(r1.getPath(), r2.getPath(), r3.getPath());
+        }
+    }
+}
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/SequenceKeysTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/SequenceKeysTest.java
new file mode 100644
index 00000000000..dc8abecf080
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/SequenceKeysTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Cross-backend tests for {@link Option.SequenceKeysDeltas} and
+ * {@link MetadataStore#subscribeSequence}.
+ *
+ * <p>For Oxia these are the native primitives; for the other backends the 
same behavior is
+ * synthesized via the CAS counter sidecar + listener bridge in {@code 
AbstractMetadataStore}.
+ * Both paths must produce monotonically increasing keys with the Oxia 
byte-format and deliver
+ * subscription updates on writes.
+ */
+public class SequenceKeysTest extends BaseMetadataStoreTest {
+
+    @Test(dataProvider = "impl")
+    public void singleDimensionSequence(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String prefix = newKey();
+        Set<Option> opts = optsFor(provider, new 
Option.SequenceKeysDeltas(List.of(1L)));
+
+        Stat r1 = store.put(prefix, "a".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+        Stat r2 = store.put(prefix, "b".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+        Stat r3 = store.put(prefix, "c".getBytes(StandardCharsets.UTF_8), 
Optional.empty(), opts).get();
+
+        // Key shape: <prefix>-<seq:%020d> for both native (Oxia) and the 
compat layer.
+        assertThat(r1.getPath()).matches("\\Q" + prefix + "\\E-\\d{20}");
+        assertThat(r2.getPath()).isGreaterThan(r1.getPath());
+        assertThat(r3.getPath()).isGreaterThan(r2.getPath());
+
+        Set<Option> readOpts = readOptsFor(provider);
+        assertThat(store.get(r1.getPath(), readOpts).get()).isPresent();
+        assertThat(store.get(r3.getPath(), readOpts).get()).isPresent();
+    }
+
+    @Test(dataProvider = "impl")
+    public void multiDimensionSequence(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String prefix = newKey();
+        // Two dimensions: dim 0 always ticks, dim 1 ticks only when caller 
asks.
+        Set<Option> bumpBoth = optsFor(provider, new 
Option.SequenceKeysDeltas(List.of(1L, 1L)));
+        Set<Option> bumpFirstOnly = optsFor(provider, new 
Option.SequenceKeysDeltas(List.of(1L, 0L)));
+
+        Stat r0 = store.put(prefix, new byte[]{0}, Optional.empty(), 
bumpBoth).get();
+        Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(), 
bumpFirstOnly).get();
+        Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(), 
bumpBoth).get();
+
+        assertThat(r0.getPath()).matches("\\Q" + prefix + 
"\\E-\\d{20}-\\d{20}");
+        assertThat(r1.getPath()).isGreaterThan(r0.getPath());
+        assertThat(r2.getPath()).isGreaterThan(r1.getPath());
+    }
+
+    @Test(dataProvider = "impl")
+    public void subscribeSequence(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+        String prefix = newKey();
+        Set<Option> putOpts = optsFor(provider, new 
Option.SequenceKeysDeltas(List.of(1L)));
+        Set<Option> subOpts = readOptsFor(provider);
+
+        ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+        @Cleanup
+        AutoCloseable handle = store.subscribeSequence(prefix, received::add, 
subOpts);
+
+        Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(), 
putOpts).get();
+        Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(), 
putOpts).get();
+        Stat r3 = store.put(prefix, new byte[]{3}, Optional.empty(), 
putOpts).get();
+
+        // Updates may be collapsed; the final emission must be the latest 
sequence key.
+        Awaitility.await().untilAsserted(() ->
+                
assertThat(received).isNotEmpty().last().asString().isEqualTo(r3.getPath()));
+        // Every emission must point to one of the writes we performed (no 
spurious paths).
+        for (String s : received) {
+            assertThat(s).isIn(r1.getPath(), r2.getPath(), r3.getPath());
+        }
+    }
+
+    /**
+     * Add the routing-hint partition key for Oxia (which requires it for 
sequence-keys); other
+     * backends ignore it.
+     */
+    private static Set<Option> optsFor(String provider, 
Option.SequenceKeysDeltas deltas) {
+        if ("Oxia".equals(provider)) {
+            return Set.of(deltas, new Option.PartitionKey("seq-pk"));
+        }
+        return Set.of(deltas);
+    }
+
+    private static Set<Option> readOptsFor(String provider) {
+        if ("Oxia".equals(provider)) {
+            return Set.of(new Option.PartitionKey("seq-pk"));
+        }
+        return Set.of();
+    }
+
+}


Reply via email to