This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d45e405fe50 [improve] Add Option.SequenceKeysDeltas +
subscribeSequence (#25724)
d45e405fe50 is described below
commit d45e405fe50c5d0d08cf132622ef80703c856935
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 13:51:58 2026 -0700
[improve] Add Option.SequenceKeysDeltas + subscribeSequence (#25724)
---
.../apache/pulsar/metadata/api/MetadataStore.java | 30 ++++
.../org/apache/pulsar/metadata/api/Option.java | 38 +++++
.../apache/pulsar/metadata/api/OptionsHelper.java | 17 +++
.../metadata/impl/AbstractMetadataStore.java | 155 +++++++++++++++++++++
.../pulsar/metadata/impl/DualMetadataStore.java | 11 ++
.../metadata/impl/FaultInjectionMetadataStore.java | 6 +
.../metadata/impl/oxia/OxiaMetadataStore.java | 43 +++++-
.../pulsar/metadata/OxiaSequenceKeysTest.java | 152 ++++++++++++++++++++
.../apache/pulsar/metadata/SequenceKeysTest.java | 136 ++++++++++++++++++
9 files changed, 585 insertions(+), 3 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index d258934bec9..528c9fe1bbd 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -402,6 +402,36 @@ public interface MetadataStore extends AutoCloseable {
return scanChildren(parentPath, consumer, Set.of());
}
+ /**
+ * Subscribe to updates on a sequence-key prefix written via {@link
Option.SequenceKeysDeltas}.
+ *
+ * <p>The {@code listener} receives the latest assigned sequence key (the
full path with
+ * sequence suffix) as new sequence records are created under {@code
prefix}. Multiple updates
+ * may be collapsed into a single event with the highest sequence —
callers must treat the
+ * stream as monotonic but not exhaustive.
+ *
+ * <p>Closing the returned handle unsubscribes. On Oxia this delegates to
the native
+ * sequence-update channel; other backends synthesize the stream from
change notifications on
+ * the prefix's parent path.
+ *
+ * @param prefix the sequence-key prefix (the same string passed as
{@code path} to a
+ * {@code put} with {@link Option.SequenceKeysDeltas})
+ * @param listener callback receiving the full path of the latest sequence
key
+ * @param opts the set of {@link Option options} for this subscription
+ * @return a handle whose {@link AutoCloseable#close} cancels the
subscription
+ * @throws MetadataStoreException if the store doesn't support sequence
subscriptions
+ */
+ default AutoCloseable subscribeSequence(String prefix, Consumer<String>
listener, Set<Option> opts)
+ throws MetadataStoreException {
+ throw new MetadataStoreException("Sequence subscriptions not supported
by this store");
+ }
+
+ /** Like {@link #subscribeSequence(String, Consumer, Set)} with no
options. */
+ default AutoCloseable subscribeSequence(String prefix, Consumer<String>
listener)
+ throws MetadataStoreException {
+ return subscribeSequence(prefix, listener, Set.of());
+ }
+
/**
* Returns the default metadata cache config.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
index faee2a3a1e4..c1013b7fc92 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/Option.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.metadata.api;
+import java.util.List;
+
/**
* An option attached to a {@link MetadataStore} operation.
*
@@ -64,4 +66,40 @@ public sealed interface Option {
* @param key the partition key (treated opaquely; equality-routed)
*/
record PartitionKey(String key) implements Option {}
+
+ /**
+ * Request server-assigned multi-dimensional sequence keys on {@code put}.
The {@code path}
+ * argument to {@code put} is treated as a key prefix; the actual stored
key is
+ * {@code prefix-{seq0}-{seq1}-...} where each sequence is zero-padded
20-digit decimal and
+ * each dimension increments atomically by its delta.
+ *
+ * <p>The {@code Stat} returned from the {@code put} carries the actual
generated path. Pair
+ * with {@link MetadataStore#subscribeSequence} to receive notifications
as new sequence keys
+ * are created.
+ *
+ * <p>Constraints: {@code deltas} must be non-empty, the first delta must
be {@code > 0}, and
+ * the rest must be {@code >= 0}. On Oxia a {@link PartitionKey} must also
be provided.
+ * Backends without native sequence-key support synthesize the same key
format using a
+ * sidecar counter document and CAS.
+ *
+ * @param deltas per-dimension increments
+ */
+ record SequenceKeysDeltas(List<Long> deltas) implements Option {
+
+ public SequenceKeysDeltas {
+ if (deltas == null || deltas.isEmpty()) {
+ throw new IllegalArgumentException("SequenceKeysDeltas
requires at least one delta");
+ }
+ if (deltas.get(0) <= 0) {
+ throw new IllegalArgumentException("first delta must be > 0,
got " + deltas.get(0));
+ }
+ for (int i = 1; i < deltas.size(); i++) {
+ if (deltas.get(i) < 0) {
+ throw new IllegalArgumentException(
+ "delta at index " + i + " must be >= 0, got " +
deltas.get(i));
+ }
+ }
+ deltas = List.copyOf(deltas);
+ }
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
index 13c9ff57b8e..c17ecc3c773 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/OptionsHelper.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata.api;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -73,4 +74,20 @@ public final class OptionsHelper {
}
return map == null ? Collections.emptyMap() : map;
}
+
+ /**
+ * @return the per-dimension increments from a {@link
Option.SequenceKeysDeltas} entry, or
+ * {@code null} if no such option is present.
+ */
+ public static List<Long> sequenceKeysDeltas(Set<Option> opts) {
+ if (opts == null) {
+ return null;
+ }
+ for (Option o : opts) {
+ if (o instanceof Option.SequenceKeysDeltas sk) {
+ return sk.deltas();
+ }
+ }
+ return null;
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 72e8a26afb8..02454bd5570 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.OpenTelemetry;
+import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
@@ -48,6 +49,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -629,6 +631,13 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() -
start);
return FutureUtil.failedFuture(new
MetadataStoreException.InvalidPathException(path));
}
+ // Sequence-key compatibility layer: when the backend doesn't have
native sequence-keys
+ // (everything except Oxia today), synthesize the actual key with a
CAS-incremented
+ // counter document, then recurse into the regular put path with the
synthesized key.
+ List<Long> deltas = OptionsHelper.sequenceKeysDeltas(opts);
+ if (deltas != null && !supportsNativeSequenceKeys()) {
+ return putWithSequenceKeysCompat(path, data, optExpectedVersion,
opts, deltas);
+ }
if (getMetadataEventSynchronizer().isPresent()) {
Long version = optExpectedVersion.isPresent() &&
optExpectedVersion.get() < 0 ? null
: optExpectedVersion.orElse(null);
@@ -659,6 +668,152 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
}
+ /**
+ * Whether this store has a native multi-dimensional atomic sequence-keys
implementation
+ * (Oxia). Backends that return {@code true} are expected to interpret
+ * {@link Option.SequenceKeysDeltas} themselves in {@code storePut} and
surface the
+ * server-assigned key in the returned {@link Stat}. Backends that return
{@code false} fall
+ * back to the synthesized counter+CAS path in this class.
+ */
+ protected boolean supportsNativeSequenceKeys() {
+ return false;
+ }
+
+ private CompletableFuture<Stat> putWithSequenceKeysCompat(
+ String prefix, byte[] data, Optional<Long> version, Set<Option>
opts, List<Long> deltas) {
+ if (version.isPresent() && version.get() != -1L) {
+ return FutureUtil.failedFuture(new MetadataStoreException(
+ "Can't have expectedVersion and SequenceKeysDeltas at the
same time"));
+ }
+ return atomicIncrementSequenceCounter(prefix, deltas).thenCompose(seqs
-> {
+ String synthesized = formatSequenceKey(prefix, seqs);
+ Set<Option> remainingOpts = stripSequenceKeysDeltas(opts);
+ // Recurse into the regular put path with the synthesized key.
expectedVersion = -1
+ // guarantees we only succeed on a fresh insert — defensive
against stale counters.
+ return put(synthesized, data, Optional.of(-1L), remainingOpts);
+ });
+ }
+
+ private CompletableFuture<long[]> atomicIncrementSequenceCounter(String
prefix, List<Long> deltas) {
+ String counterPath = sequenceCounterPath(prefix);
+ return get(counterPath, Set.of()).thenCompose(currentOpt -> {
+ long[] currentSeqs = currentOpt.isPresent()
+ ? decodeSequenceCounter(currentOpt.get().getValue())
+ : new long[0];
+ long[] newSeqs = new long[deltas.size()];
+ for (int i = 0; i < deltas.size(); i++) {
+ long current = i < currentSeqs.length ? currentSeqs[i] : 0L;
+ newSeqs[i] = current + deltas.get(i);
+ }
+ Optional<Long> expectedVersion = currentOpt.isPresent()
+ ? Optional.of(currentOpt.get().getStat().getVersion())
+ : Optional.of(-1L);
+ byte[] encoded = encodeSequenceCounter(newSeqs);
+ return put(counterPath, encoded, expectedVersion, Set.of())
+ .thenApply(s -> newSeqs)
+ .exceptionallyCompose(ex -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof
MetadataStoreException.BadVersionException) {
+ // Concurrent writer beat us — read the new
counter value and retry.
+ return atomicIncrementSequenceCounter(prefix,
deltas);
+ }
+ return FutureUtil.failedFuture(cause);
+ });
+ });
+ }
+
+ /** Counter-document path for a sequence prefix. Sibling of the prefix at
the parent level. */
+ static String sequenceCounterPath(String prefix) {
+ return prefix + "__seq_counter__";
+ }
+
+ /** Format a synthesized sequence key matching Oxia's native format:
{@code prefix-{seq:%020d}-...}. */
+ static String formatSequenceKey(String prefix, long[] seqs) {
+ StringBuilder sb = new StringBuilder(prefix);
+ for (long s : seqs) {
+ sb.append('-').append(String.format("%020d", s));
+ }
+ return sb.toString();
+ }
+
+ private static byte[] encodeSequenceCounter(long[] seqs) {
+ ByteBuffer buf = ByteBuffer.allocate(seqs.length * Long.BYTES);
+ for (long s : seqs) {
+ buf.putLong(s);
+ }
+ return buf.array();
+ }
+
+ private static long[] decodeSequenceCounter(byte[] bytes) {
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ long[] seqs = new long[bytes.length / Long.BYTES];
+ for (int i = 0; i < seqs.length; i++) {
+ seqs[i] = buf.getLong();
+ }
+ return seqs;
+ }
+
+ private static Set<Option> stripSequenceKeysDeltas(Set<Option> opts) {
+ if (opts == null || opts.isEmpty()) {
+ return Set.of();
+ }
+ Set<Option> result = new HashSet<>();
+ for (Option o : opts) {
+ if (!(o instanceof Option.SequenceKeysDeltas)) {
+ result.add(o);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public AutoCloseable subscribeSequence(String prefix, Consumer<String>
listener, Set<Option> opts) {
+ SequenceWatcher watcher = new SequenceWatcher(prefix, listener);
+ listeners.add(watcher);
+ return () -> listeners.remove(watcher);
+ }
+
+ /**
+ * Listener-based subscription bridge: filter notifications matching a
sequence prefix and
+ * deliver only paths that strictly increase the latest seen sequence.
Multiple updates may
+ * collapse — only the highest path observed so far is delivered, matching
Oxia's contract.
+ */
+ private static final class SequenceWatcher implements
Consumer<Notification> {
+ private final String prefixDash;
+ private final Consumer<String> listener;
+ private final AtomicReference<String> latest = new AtomicReference<>();
+
+ SequenceWatcher(String prefix, Consumer<String> listener) {
+ this.prefixDash = prefix + "-";
+ this.listener = listener;
+ }
+
+ @Override
+ public void accept(Notification n) {
+ if (n.getType() != NotificationType.Created) {
+ return;
+ }
+ String path = n.getPath();
+ if (!path.startsWith(prefixDash)) {
+ return;
+ }
+ while (true) {
+ String previous = latest.get();
+ if (previous != null && path.compareTo(previous) <= 0) {
+ return;
+ }
+ if (latest.compareAndSet(previous, path)) {
+ try {
+ listener.accept(path);
+ } catch (Throwable t) {
+ log.warn().attr("path",
path).exception(t).log("Sequence subscription listener failed");
+ }
+ return;
+ }
+ }
+ }
+ }
+
/**
* Translate {@link Option.Ephemeral}/{@link Option.Sequential} entries
from {@code opts} into the
* legacy {@link CreateOption} set carried by {@link MetadataEvent} for
sync replication. Other
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 38a3bc6013a..04b56883a40 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -324,6 +324,17 @@ public class DualMetadataStore implements
MetadataStoreExtended {
};
}
+ @Override
+ public AutoCloseable subscribeSequence(String prefix, Consumer<String>
listener, Set<Option> opts)
+ throws MetadataStoreException {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
+ sourceStore.subscribeSequence(prefix, listener, opts);
+ case COMPLETED ->
+ targetStore.subscribeSequence(prefix, listener, opts);
+ };
+ }
+
@Override
public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion, Set<Option> opts) {
switch (migrationState.getPhase()) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 552f0e817b5..e52a434d9c3 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -157,6 +157,12 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
return store.scanChildren(parentPath, consumer, opts);
}
+ @Override
+ public AutoCloseable subscribeSequence(String prefix, Consumer<String>
listener, Set<Option> opts)
+ throws MetadataStoreException {
+ return store.subscribeSequence(prefix, listener, opts);
+ }
+
@Override
public void registerListener(Consumer<Notification> listener) {
store.registerListener(listener);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 105332bf74e..07a327b3b39 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -28,9 +28,11 @@ import
io.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
import io.oxia.client.api.options.DeleteOption;
import io.oxia.client.api.options.GetOption;
+import io.oxia.client.api.options.GetSequenceUpdatesOption;
import io.oxia.client.api.options.ListOption;
import io.oxia.client.api.options.PutOption;
import io.oxia.client.api.options.RangeScanOption;
+import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
@@ -42,6 +44,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import lombok.CustomLog;
import lombok.NonNull;
@@ -270,15 +273,22 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
String path, byte[] data, Optional<Long> optExpectedVersion,
Set<Option> opts) {
boolean sequential = OptionsHelper.isSequential(opts);
boolean ephemeral = OptionsHelper.isEphemeral(opts);
+ List<Long> sequenceKeysDeltas = OptionsHelper.sequenceKeysDeltas(opts);
Map<String, String> secondaryIndexes =
OptionsHelper.secondaryIndexes(opts);
+ if (sequential && sequenceKeysDeltas != null) {
+ return CompletableFuture.failedFuture(new MetadataStoreException(
+ "Sequential and SequenceKeysDeltas cannot be combined"));
+ }
CompletableFuture<Void> parentsCreated = createParents(path);
return parentsCreated.thenCompose(
__ -> {
var expectedVersion = optExpectedVersion;
- if (expectedVersion.isPresent() && expectedVersion.get()
!= -1L && sequential) {
+ if (expectedVersion.isPresent() && expectedVersion.get()
!= -1L
+ && (sequential || sequenceKeysDeltas != null)) {
return CompletableFuture.failedFuture(
new MetadataStoreException(
- "Can't have expectedVersion and
Sequential at the same time"));
+ "Can't have expectedVersion and
Sequential/SequenceKeysDeltas at the "
+ + "same time"));
}
CompletableFuture<String> actualPath;
if (sequential) {
@@ -312,6 +322,9 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
if (partitionKey != null) {
putOptions.add(PutOption.PartitionKey(partitionKey));
}
+ if (sequenceKeysDeltas != null) {
+
putOptions.add(PutOption.SequenceKeysDeltas(sequenceKeysDeltas));
+ }
var parentPath = parent(path);
var parentPrefix = parentPath == null ? "" : parentPath;
secondaryIndexes.forEach((indexName, secondaryKey) ->
@@ -324,7 +337,9 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
client
.put(aPath, data,
putOptions)
.thenApply(res -> new
PathWithPutResult(aPath, res)))
- .thenApply(res -> convertStat(res.path(),
res.result().version()))
+ // Use the effective key returned by Oxia — for
SequenceKeysDeltas this is
+ // the server-assigned key with sequence suffixes
appended.
+ .thenApply(res -> convertStat(res.result().key(),
res.result().version()))
.exceptionallyCompose(this::convertException);
});
}
@@ -364,6 +379,28 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
return partitionKey == null ? Set.of() :
Set.of(RangeScanOption.PartitionKey(partitionKey));
}
+ /**
+ * Build the Oxia {@link GetSequenceUpdatesOption} set from {@code opts},
currently routing the
+ * partition key.
+ */
+ private static Set<GetSequenceUpdatesOption>
sequenceUpdatesOptions(Set<Option> opts) {
+ String partitionKey = OptionsHelper.partitionKey(opts);
+ return partitionKey == null
+ ? Set.of()
+ : Set.of(GetSequenceUpdatesOption.PartitionKey(partitionKey));
+ }
+
+ @Override
+ public AutoCloseable subscribeSequence(String prefix, Consumer<String>
listener, Set<Option> opts) {
+ Closeable handle = client.getSequenceUpdates(prefix, listener,
sequenceUpdatesOptions(opts));
+ return handle::close;
+ }
+
+ @Override
+ protected boolean supportsNativeSequenceKeys() {
+ return true;
+ }
+
private <T> CompletionStage<T> convertException(Throwable ex) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof UnexpectedVersionIdException || actEx instanceof
KeyAlreadyExistsException) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaSequenceKeysTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaSequenceKeysTest.java
new file mode 100644
index 00000000000..d0fcf694f43
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/OxiaSequenceKeysTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertNotEquals;
+import io.oxia.testcontainers.OxiaContainer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Native sequence-key tests against Oxia.
+ *
+ * <p>Oxia must receive {@link Option.PartitionKey} alongside {@link
Option.SequenceKeysDeltas} —
+ * sequence allocation is shard-local, so all writes that share a sequence
prefix have to land in
+ * the same shard. The tests use a multi-shard cluster to keep the routing
path honest.
+ */
+public class OxiaSequenceKeysTest {
+
+ private static final int SHARDS = 3;
+ private OxiaContainer oxiaServer;
+
+ @BeforeClass
+ public void start() {
+ oxiaServer = new
OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME).withShards(SHARDS);
+ oxiaServer.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void stop() {
+ if (oxiaServer != null) {
+ oxiaServer.close();
+ oxiaServer = null;
+ }
+ }
+
+ private MetadataStore newStore() throws Exception {
+ return MetadataStoreFactory.create(
+ "oxia://" + oxiaServer.getServiceAddress(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+ }
+
+ @Test
+ public void singleDimensionSequence() throws Exception {
+ @Cleanup
+ MetadataStore store = newStore();
+
+ String prefix = "/seq-single-" + System.nanoTime();
+ Set<Option> opts = Set.of(
+ new Option.PartitionKey("seq-pk"),
+ new Option.SequenceKeysDeltas(List.of(1L)));
+
+ Stat first = store.put(prefix, "a".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+ Stat second = store.put(prefix, "b".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+ Stat third = store.put(prefix, "c".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+
+ // Each call yields a distinct, monotonically-increasing key derived
from the prefix.
+ assertThat(first.getPath()).startsWith(prefix);
+ assertNotEquals(first.getPath(), prefix, "Stat path should be the
synthesized key, not the prefix");
+ assertThat(second.getPath()).isGreaterThan(first.getPath());
+ assertThat(third.getPath()).isGreaterThan(second.getPath());
+
+ // The synthesized records actually exist and round-trip with the same
partition key.
+ Set<Option> readOpts = Set.of(new Option.PartitionKey("seq-pk"));
+ assertThat(store.get(first.getPath(), readOpts).get()).isPresent();
+ assertThat(store.get(third.getPath(), readOpts).get()).isPresent();
+ }
+
+ @Test
+ public void multiDimensionSequence() throws Exception {
+ @Cleanup
+ MetadataStore store = newStore();
+
+ String prefix = "/seq-multi-" + System.nanoTime();
+
+ // Two-dimension sequence: dim 0 always increments, dim 1 increments
only when caller asks
+ // for it (delta=0 means "keep dim 1 unchanged").
+ Set<Option> bumpBoth = Set.of(
+ new Option.PartitionKey("seq-pk"),
+ new Option.SequenceKeysDeltas(List.of(1L, 1L)));
+ Set<Option> bumpFirstOnly = Set.of(
+ new Option.PartitionKey("seq-pk"),
+ new Option.SequenceKeysDeltas(List.of(1L, 0L)));
+
+ Stat r0 = store.put(prefix, new byte[]{0}, Optional.empty(),
bumpBoth).get();
+ Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(),
bumpFirstOnly).get();
+ Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(),
bumpBoth).get();
+
+ // All three should be lexicographically increasing — dim 0 ticks
every call.
+ assertThat(r0.getPath()).startsWith(prefix);
+ assertThat(r1.getPath()).isGreaterThan(r0.getPath());
+ assertThat(r2.getPath()).isGreaterThan(r1.getPath());
+ }
+
+ @Test
+ public void subscribeSequence() throws Exception {
+ @Cleanup
+ MetadataStore store = newStore();
+
+ String prefix = "/seq-watch-" + System.nanoTime();
+ Set<Option> opts = Set.of(
+ new Option.PartitionKey("seq-pk"),
+ new Option.SequenceKeysDeltas(List.of(1L)));
+ Set<Option> subOpts = Set.of(new Option.PartitionKey("seq-pk"));
+
+ ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+ @Cleanup
+ AutoCloseable handle = store.subscribeSequence(prefix, received::add,
subOpts);
+
+ Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(),
opts).get();
+ Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(),
opts).get();
+ Stat r3 = store.put(prefix, new byte[]{3}, Optional.empty(),
opts).get();
+
+ // The listener may collapse intermediate updates, but the final value
it ever reports must
+ // be the latest sequence key — i.e. r3.getPath().
+ Awaitility.await().untilAsserted(() ->
+
assertThat(received).isNotEmpty().last().asString().isEqualTo(r3.getPath()));
+ // Earlier emissions, when present, must point to one of the writes we
performed.
+ for (String s : received) {
+ assertThat(s).isIn(r1.getPath(), r2.getPath(), r3.getPath());
+ }
+ }
+}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/SequenceKeysTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/SequenceKeysTest.java
new file mode 100644
index 00000000000..dc8abecf080
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/SequenceKeysTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.Option;
+import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Cross-backend tests for {@link Option.SequenceKeysDeltas} and
+ * {@link MetadataStore#subscribeSequence}.
+ *
+ * <p>For Oxia these are the native primitives; for the other backends the
same behavior is
+ * synthesized via the CAS counter sidecar + listener bridge in {@code
AbstractMetadataStore}.
+ * Both paths must produce monotonically increasing keys with the Oxia
byte-format and deliver
+ * subscription updates on writes.
+ */
+public class SequenceKeysTest extends BaseMetadataStoreTest {
+
+ @Test(dataProvider = "impl")
+ public void singleDimensionSequence(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String prefix = newKey();
+ Set<Option> opts = optsFor(provider, new
Option.SequenceKeysDeltas(List.of(1L)));
+
+ Stat r1 = store.put(prefix, "a".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+ Stat r2 = store.put(prefix, "b".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+ Stat r3 = store.put(prefix, "c".getBytes(StandardCharsets.UTF_8),
Optional.empty(), opts).get();
+
+ // Key shape: <prefix>-<seq:%020d> for both native (Oxia) and the
compat layer.
+ assertThat(r1.getPath()).matches("\\Q" + prefix + "\\E-\\d{20}");
+ assertThat(r2.getPath()).isGreaterThan(r1.getPath());
+ assertThat(r3.getPath()).isGreaterThan(r2.getPath());
+
+ Set<Option> readOpts = readOptsFor(provider);
+ assertThat(store.get(r1.getPath(), readOpts).get()).isPresent();
+ assertThat(store.get(r3.getPath(), readOpts).get()).isPresent();
+ }
+
+ @Test(dataProvider = "impl")
+ public void multiDimensionSequence(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String prefix = newKey();
+ // Two dimensions: dim 0 always ticks, dim 1 ticks only when caller
asks.
+ Set<Option> bumpBoth = optsFor(provider, new
Option.SequenceKeysDeltas(List.of(1L, 1L)));
+ Set<Option> bumpFirstOnly = optsFor(provider, new
Option.SequenceKeysDeltas(List.of(1L, 0L)));
+
+ Stat r0 = store.put(prefix, new byte[]{0}, Optional.empty(),
bumpBoth).get();
+ Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(),
bumpFirstOnly).get();
+ Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(),
bumpBoth).get();
+
+ assertThat(r0.getPath()).matches("\\Q" + prefix +
"\\E-\\d{20}-\\d{20}");
+ assertThat(r1.getPath()).isGreaterThan(r0.getPath());
+ assertThat(r2.getPath()).isGreaterThan(r1.getPath());
+ }
+
+ @Test(dataProvider = "impl")
+ public void subscribeSequence(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().fsyncEnable(false).build());
+
+ String prefix = newKey();
+ Set<Option> putOpts = optsFor(provider, new
Option.SequenceKeysDeltas(List.of(1L)));
+ Set<Option> subOpts = readOptsFor(provider);
+
+ ConcurrentLinkedQueue<String> received = new ConcurrentLinkedQueue<>();
+ @Cleanup
+ AutoCloseable handle = store.subscribeSequence(prefix, received::add,
subOpts);
+
+ Stat r1 = store.put(prefix, new byte[]{1}, Optional.empty(),
putOpts).get();
+ Stat r2 = store.put(prefix, new byte[]{2}, Optional.empty(),
putOpts).get();
+ Stat r3 = store.put(prefix, new byte[]{3}, Optional.empty(),
putOpts).get();
+
+ // Updates may be collapsed; the final emission must be the latest
sequence key.
+ Awaitility.await().untilAsserted(() ->
+
assertThat(received).isNotEmpty().last().asString().isEqualTo(r3.getPath()));
+ // Every emission must point to one of the writes we performed (no
spurious paths).
+ for (String s : received) {
+ assertThat(s).isIn(r1.getPath(), r2.getPath(), r3.getPath());
+ }
+ }
+
+ /**
+ * Add the routing-hint partition key for Oxia (which requires it for
sequence-keys); other
+ * backends ignore it.
+ */
+ private static Set<Option> optsFor(String provider,
Option.SequenceKeysDeltas deltas) {
+ if ("Oxia".equals(provider)) {
+ return Set.of(deltas, new Option.PartitionKey("seq-pk"));
+ }
+ return Set.of(deltas);
+ }
+
+ private static Set<Option> readOptsFor(String provider) {
+ if ("Oxia".equals(provider)) {
+ return Set.of(new Option.PartitionKey("seq-pk"));
+ }
+ return Set.of();
+ }
+
+}