This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9c5ab38dd68 [improve][metadata] Replace findByIndex with streaming
scanByIndex (point + range) (#25731)
9c5ab38dd68 is described below
commit 9c5ab38dd68317dd1e8cba3135ddc3a59319b1fa
Author: Matteo Merli <[email protected]>
AuthorDate: Mon May 11 00:33:56 2026 -0700
[improve][metadata] Replace findByIndex with streaming scanByIndex (point +
range) (#25731)
---
.../broker/resources/ScalableTopicResources.java | 36 +++++----
.../apache/pulsar/metadata/api/MetadataStore.java | 66 ++++++++++------
.../apache/pulsar/metadata/api/ScanConsumer.java | 35 ++++++++-
.../metadata/impl/AbstractMetadataStore.java | 79 +++++++++++++------
.../pulsar/metadata/impl/DualMetadataStore.java | 14 ++--
.../metadata/impl/oxia/OxiaMetadataStore.java | 56 ++++++++++----
.../metadata/MetadataCacheSecondaryIndexTest.java | 34 ++++++---
.../metadata/MetadataStoreSecondaryIndexTest.java | 89 ++++++++++++++++++++--
8 files changed, 312 insertions(+), 97 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 7867fb90509..5647feef52a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.broker.resources;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import lombok.CustomLog;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -33,10 +35,12 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
@@ -297,7 +301,7 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
// know index cardinalities up front). The predicate then enforces AND
across
// every filter on the loaded record.
Map.Entry<String, String> indexFilter =
propertyFilters.entrySet().iterator().next();
- java.util.function.Predicate<org.apache.pulsar.metadata.api.GetResult>
matchesAll = result -> {
+ Predicate<GetResult> matchesAll = result -> {
try {
ScalableTopicMetadata md =
mapper.readValue(result.getValue(),
ScalableTopicMetadata.class);
@@ -315,19 +319,23 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
return false;
}
};
- return getStore().findByIndex(scanPathPrefix,
- indexFilter.getKey(), indexFilter.getValue(),
matchesAll)
- // Native-index implementations don't apply the fallback
predicate, so
- // re-check here. On the fallback path this is a no-op
(predicate already
- // applied) but cheap.
- .thenApply(results -> results.stream()
- .filter(matchesAll)
- .map(r -> {
- String path = r.getStat().getPath();
- String encoded =
path.substring(path.lastIndexOf('/') + 1);
- return TopicName.get("topic", ns,
Codec.decode(encoded)).toString();
- })
- .collect(Collectors.toList()));
+ List<GetResult> results = new ArrayList<>();
+ return getStore().scanByIndex(scanPathPrefix,
+ indexFilter.getKey(), indexFilter.getValue(),
indexFilter.getValue(),
+ matchesAll,
+ ScanConsumer.collectInto(results))
+ .thenApply(__ ->
+ // Native-index implementations don't apply the
fallback predicate, so
+ // re-check here. On the fallback path this is a no-op
(predicate already
+ // applied) but cheap.
+ results.stream()
+ .filter(matchesAll)
+ .map(r -> {
+ String path = r.getStat().getPath();
+ String encoded =
path.substring(path.lastIndexOf('/') + 1);
+ return TopicName.get("topic", ns,
Codec.decode(encoded)).toString();
+ })
+ .collect(Collectors.toList()));
}
// --- Subscriptions ---
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 528c9fe1bbd..00c8e205e0e 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -338,33 +338,55 @@ public interface MetadataStore extends AutoCloseable {
}
/**
- * Find all records matching a secondary index.
- *
- * <p>On stores that support secondary indexes natively (e.g. Oxia), this
uses the index
- * efficiently. On stores that don't (e.g. ZooKeeper), it falls back to
listing all children
- * under {@code scanPathPrefix}, fetching each record, and applying {@code
fallbackFilter}.
- *
- * @param scanPathPrefix path prefix for fallback scan (used by stores
without native index support)
- * @param indexName the secondary index name
- * @param secondaryKey the secondary key to look up
- * @param fallbackFilter predicate to filter results during fallback scan;
ignored by native implementations
- * @param opts the set of {@link Option options} for this
operation
- * @return list of matching {@link GetResult} entries
+ * Stream records matching a secondary-index value or range.
+ *
+ * <p>One method serves both point lookup and range queries:
+ * <ul>
+ * <li>Point lookup: {@code fromKey == toKey == key}.</li>
+ * <li>Range: pass {@code null} on either side for an unbounded bound,
or specific values for
+ * a closed range. Both bounds are <b>inclusive</b>.</li>
+ * </ul>
+ *
+ * <p>On stores that support secondary indexes natively (e.g. Oxia), this
is a single store-side
+ * range scan over the index. On stores that don't (e.g. ZooKeeper), it
falls back to listing
+ * all children under {@code scanPathPrefix}, fetching each record, and
applying
+ * {@code fallbackFilter}; the fallback also enforces the {@code [fromKey,
toKey]} bounds.
+ *
+ * <p>Results are streamed to {@code consumer} as they become available —
{@link ScanConsumer#onNext}
+ * once per match, then exactly one of {@link ScanConsumer#onCompleted} or
+ * {@link ScanConsumer#onError}. The returned future completes when the
scan terminates and
+ * mirrors the terminal callback.
+ *
+ * @param scanPathPrefix path prefix scoping the scan (and used by the
fallback list)
+ * @param indexName the secondary-index name
+ * @param fromKeyInclusive lower bound on the secondary-key value, or
{@code null} for unbounded
+ * @param toKeyInclusive upper bound on the secondary-key value, or
{@code null} for unbounded
+ * @param fallbackFilter additional predicate applied during fallback
scan; ignored by native implementations
+ * @param consumer callback receiving records, completion, or an
error
+ * @param opts the set of {@link Option options} for this
operation
+ * @return a future that completes when the scan terminates
*/
- default CompletableFuture<List<GetResult>> findByIndex(
- String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter, Set<Option> opts) {
- return CompletableFuture.failedFuture(
- new MetadataStoreException("Secondary index queries not
supported by this store"));
+ default CompletableFuture<Void> scanByIndex(
+ String scanPathPrefix, String indexName,
+ String fromKeyInclusive, String toKeyInclusive,
+ Predicate<GetResult> fallbackFilter,
+ ScanConsumer consumer, Set<Option> opts) {
+ MetadataStoreException ex =
+ new MetadataStoreException("Secondary index queries not
supported by this store");
+ consumer.onError(ex);
+ return CompletableFuture.failedFuture(ex);
}
/**
- * Like {@link #findByIndex(String, String, String, Predicate, Set)} with
no options.
+ * Like {@link #scanByIndex(String, String, String, String, Predicate,
ScanConsumer, Set)} with no options.
*/
- default CompletableFuture<List<GetResult>> findByIndex(
- String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter) {
- return findByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter, Set.of());
+ default CompletableFuture<Void> scanByIndex(
+ String scanPathPrefix, String indexName,
+ String fromKeyInclusive, String toKeyInclusive,
+ Predicate<GetResult> fallbackFilter,
+ ScanConsumer consumer) {
+ return scanByIndex(scanPathPrefix, indexName, fromKeyInclusive,
toKeyInclusive,
+ fallbackFilter, consumer, Set.of());
}
/**
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
index 24ce6c15ac5..ddb8475c39a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
@@ -19,9 +19,10 @@
package org.apache.pulsar.metadata.api;
/**
- * Streaming consumer for {@link MetadataStore#scanChildren} results.
+ * Streaming consumer for {@link MetadataStore#scanChildren} and {@link
MetadataStore#scanByIndex}
+ * results.
*
- * <p>The store invokes {@link #onNext} for each child record (in key order),
and then either
+ * <p>The store invokes {@link #onNext} for each record (in key order), and
then either
* {@link #onCompleted} (success) or {@link #onError} (failure) exactly once.
Implementations must
* be safe to invoke from a metadata-store internal thread; back-pressure is
the consumer's
* responsibility (long blocking work in {@code onNext} can stall the scan).
@@ -47,4 +48,34 @@ public interface ScanConsumer {
* callbacks are made.
*/
void onCompleted();
+
+ /**
+ * A {@link ScanConsumer} that accumulates emitted records into the given
list. Useful when a
+ * caller wants the convenience of a {@code
CompletableFuture<List<GetResult>>} on top of the
+ * streaming API — failure / completion are signaled by the future
returned from
+ * {@code scanByIndex} / {@code scanChildren}.
+ *
+ * <pre>{@code
+ * List<GetResult> out = new ArrayList<>();
+ * store.scanByIndex(prefix, indexName, key, key, filter,
ScanConsumer.collectInto(out)).join();
+ * }</pre>
+ */
+ static ScanConsumer collectInto(java.util.List<GetResult> out) {
+ return new ScanConsumer() {
+ @Override
+ public void onNext(GetResult result) {
+ out.add(result);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // Caller observes failure via the scan's CompletableFuture.
+ }
+
+ @Override
+ public void onCompleted() {
+ // No-op.
+ }
+ };
+ }
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 02454bd5570..1112a4e0975 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -543,13 +543,28 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
Set<Option> opts);
@Override
- public CompletableFuture<List<GetResult>> findByIndex(
- String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter, Set<Option> opts) {
+ public CompletableFuture<Void> scanByIndex(
+ String scanPathPrefix, String indexName,
+ String fromKeyInclusive, String toKeyInclusive,
+ Predicate<GetResult> fallbackFilter,
+ ScanConsumer consumer, Set<Option> opts) {
if (isClosed()) {
- return alreadyClosedFailedFuture();
+ CompletableFuture<Void> failed = alreadyClosedFailedFuture();
+ failed.whenComplete((__, ex) -> {
+ if (ex != null) {
+ consumer.onError(ex);
+ }
+ });
+ return failed;
}
- return storeFindByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter, opts);
+ if (scanPathPrefix == null || indexName == null || consumer == null) {
+ MetadataStoreException ex = new MetadataStoreException(
+ "scanPathPrefix, indexName, and consumer must be
non-null");
+ consumer.onError(ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ return storeScanByIndex(scanPathPrefix, indexName, fromKeyInclusive,
toKeyInclusive,
+ fallbackFilter, consumer, opts);
}
@Override
@@ -601,23 +616,43 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
return result;
}
- protected CompletableFuture<List<GetResult>> storeFindByIndex(
- String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter, Set<Option> opts) {
- // Default fallback: full scan under scanPathPrefix, applying
fallbackFilter to each result.
- return getChildrenFromStore(scanPathPrefix, opts)
- .thenCompose(children -> {
- List<CompletableFuture<Optional<GetResult>>> futures =
children.stream()
- .map(child -> storeGet(scanPathPrefix + "/" +
child, opts))
- .toList();
- return FutureUtil.waitForAll(futures)
- .thenApply(__ -> futures.stream()
- .map(CompletableFuture::join)
- .filter(Optional::isPresent)
- .map(Optional::get)
- .filter(fallbackFilter)
- .toList());
- });
+ /**
+ * Backend hook for {@link #scanByIndex}. Default fallback: list children
under
+ * {@code scanPathPrefix}, fetch each, and stream those passing {@code
fallbackFilter} to
+ * {@code consumer}. Backends with a native indexed range-scan primitive
(Oxia) override this
+ * to issue one store-side scan against the index.
+ *
+ * <p>The default impl ignores {@code indexName}, {@code
fromKeyInclusive}, and
+ * {@code toKeyInclusive} — the caller's {@code fallbackFilter} is the
only criterion.
+ * Callers that want range bounds enforced on non-native backends should
encode the bounds in
+ * {@code fallbackFilter}.
+ */
+ protected CompletableFuture<Void> storeScanByIndex(
+ String scanPathPrefix, String indexName,
+ String fromKeyInclusive, String toKeyInclusive,
+ Predicate<GetResult> fallbackFilter,
+ ScanConsumer consumer, Set<Option> opts) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ getChildrenFromStore(scanPathPrefix, opts).thenCompose(children -> {
+ CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
+ for (String child : children) {
+ String childPath = scanPathPrefix.equals("/") ? "/" + child :
scanPathPrefix + "/" + child;
+ chain = chain.thenCompose(__ -> storeGet(childPath, opts))
+ .thenAccept(opt ->
opt.filter(fallbackFilter).ifPresent(consumer::onNext));
+ }
+ return chain;
+ }).whenComplete((v, ex) -> {
+ if (ex != null) {
+ Throwable cause = ex instanceof CompletionException &&
ex.getCause() != null
+ ? ex.getCause() : ex;
+ consumer.onError(cause);
+ result.completeExceptionally(cause);
+ } else {
+ consumer.onCompleted();
+ result.complete(null);
+ }
+ });
+ return result;
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 04b56883a40..16daffc81db 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -303,14 +303,18 @@ public class DualMetadataStore implements
MetadataStoreExtended {
}
@Override
- public CompletableFuture<List<GetResult>> findByIndex(
- String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter, Set<Option> opts) {
+ public CompletableFuture<Void> scanByIndex(
+ String scanPathPrefix, String indexName,
+ String fromKeyInclusive, String toKeyInclusive,
+ Predicate<GetResult> fallbackFilter,
+ ScanConsumer consumer, Set<Option> opts) {
return switch (migrationState.getPhase()) {
case NOT_STARTED, PREPARATION, COPYING, FAILED ->
- sourceStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter, opts);
+ sourceStore.scanByIndex(scanPathPrefix, indexName,
fromKeyInclusive, toKeyInclusive,
+ fallbackFilter, consumer, opts);
case COMPLETED ->
- targetStore.findByIndex(scanPathPrefix, indexName,
secondaryKey, fallbackFilter, opts);
+ targetStore.scanByIndex(scanPathPrefix, indexName,
fromKeyInclusive, toKeyInclusive,
+ fallbackFilter, consumer, opts);
};
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 07a327b3b39..c8f52e468ab 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -248,25 +248,51 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
}
@Override
- protected CompletableFuture<List<GetResult>> storeFindByIndex(
- String scanPathPrefix, String indexName, String secondaryKey,
- Predicate<GetResult> fallbackFilter, Set<Option> opts) {
- String scopedKey = scanPathPrefix + "/" + secondaryKey;
+ protected CompletableFuture<Void> storeScanByIndex(
+ String scanPathPrefix, String indexName,
+ String fromKeyInclusive, String toKeyInclusive,
+ Predicate<GetResult> fallbackFilter,
+ ScanConsumer consumer, Set<Option> opts) {
+ // Index entries are stored at "<parentPath>/<secondaryKey>" by
doStorePut, where
+ // parentPath == scanPathPrefix for records directly under the prefix.
The API uses
+ // inclusive-on-both-sides bounds; Oxia's list(start, end, ...) is
half-open, so we
+ // append a sentinel character to the upper bound to widen the
half-open range to
+ // include `toKeyInclusive`. We use '~' (0x7E) — it lex-sorts after
every printable
+ // ASCII byte, which is a safe choice for the secondary keys callers
actually use today
+ // (numeric-encoded timestamps, fixed-tag enums, etc.). Callers that
need full byte-range
+ // coverage can pass `toKeyInclusive=null` or use a different scheme.
+ String scopedFrom = fromKeyInclusive == null
+ ? scanPathPrefix + "/"
+ : scanPathPrefix + "/" + fromKeyInclusive;
+ String scopedTo = toKeyInclusive == null
+ ? scanPathPrefix + "0" // '0' (0x30) is the lex successor of
'/' (0x2F)
+ : scanPathPrefix + "/" + toKeyInclusive + "~";
Set<ListOption> listOpts = new HashSet<>(listOptions(opts));
listOpts.add(ListOption.UseIndex(indexName));
- return client.list(scopedKey, scopedKey + "~", listOpts)
+ CompletableFuture<Void> done = new CompletableFuture<>();
+ client.list(scopedFrom, scopedTo, listOpts)
.thenCompose(primaryKeys -> {
- List<CompletableFuture<Optional<GetResult>>> futures =
primaryKeys.stream()
- .map(p -> storeGet(p, opts))
- .toList();
- return FutureUtil.waitForAll(futures)
- .thenApply(__ -> futures.stream()
- .map(CompletableFuture::join)
- .filter(Optional::isPresent)
- .map(Optional::get)
- .toList());
+ // Native indexes already enforced the range;
fallbackFilter is unused here
+ // (it's the scan-and-filter compat-path predicate).
+ CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
+ for (String key : primaryKeys) {
+ chain = chain
+ .thenCompose(__ -> storeGet(key, opts))
+ .thenAccept(opt ->
opt.ifPresent(consumer::onNext));
+ }
+ return chain;
})
- .exceptionallyCompose(this::convertException);
+ .whenComplete((v, ex) -> {
+ if (ex != null) {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ consumer.onError(cause);
+ done.completeExceptionally(cause);
+ } else {
+ consumer.onCompleted();
+ done.complete(null);
+ }
+ });
+ return done;
}
private CompletableFuture<Stat> doStorePut(
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
index 1469dddee8c..48634f5e636 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
@@ -20,10 +20,12 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
@@ -31,7 +33,9 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
@@ -49,6 +53,14 @@ import org.testng.annotations.Test;
*/
public class MetadataCacheSecondaryIndexTest extends BaseMetadataStoreTest {
+ /** Convenience: drive the streaming {@link MetadataStore#scanByIndex} as
a point lookup. */
+ private static List<GetResult> findByIndex(MetadataStore store, String
prefix, String indexName,
+ String key,
Predicate<GetResult> filter) {
+ List<GetResult> out = new ArrayList<>();
+ store.scanByIndex(prefix, indexName, key, key, filter,
ScanConsumer.collectInto(out)).join();
+ return out;
+ }
+
@Data
@AllArgsConstructor
@NoArgsConstructor
@@ -105,14 +117,14 @@ public class MetadataCacheSecondaryIndexTest extends
BaseMetadataStoreTest {
v -> Map.of("by-owner", v.getOwner(), "by-team",
v.getTeam())).join();
// Owner=alice should return r1 + r2.
- List<GetResult> aliceOwned = store.findByIndex(basePath, "by-owner",
"alice",
- matchOwner("alice")).join();
+ List<GetResult> aliceOwned = findByIndex(store, basePath, "by-owner",
"alice",
+ matchOwner("alice"));
assertEquals(aliceOwned.size(), 2);
// Team=platform should return r1 + r3.
Set<String> platformPaths = new HashSet<>();
- for (GetResult r : store.findByIndex(basePath, "by-team", "platform",
- matchTeam("platform")).join()) {
+ for (GetResult r : findByIndex(store, basePath, "by-team", "platform",
+ matchTeam("platform"))) {
platformPaths.add(r.getStat().getPath());
}
assertEquals(platformPaths, Set.of(basePath + "/r1", basePath +
"/r3"));
@@ -131,18 +143,18 @@ public class MetadataCacheSecondaryIndexTest extends
BaseMetadataStoreTest {
v -> Map.of("by-owner", v.getOwner());
cache.create(basePath + "/r1", new IndexedValue("alice", "platform"),
extractor).join();
- assertEquals(store.findByIndex(basePath, "by-owner", "alice",
matchOwner("alice"))
- .join().size(), 1);
+ assertEquals(findByIndex(store, basePath, "by-owner", "alice",
matchOwner("alice"))
+ .size(), 1);
// Reassign owner via update — the new owner becomes the queryable one
and the
// old owner's lookup must no longer surface this record.
cache.readModifyUpdate(basePath + "/r1", current -> new
IndexedValue("bob", current.getTeam()),
extractor).join();
- assertEquals(store.findByIndex(basePath, "by-owner", "bob",
matchOwner("bob"))
- .join().size(), 1);
- assertEquals(store.findByIndex(basePath, "by-owner", "alice",
matchOwner("alice"))
- .join().size(), 0);
+ assertEquals(findByIndex(store, basePath, "by-owner", "bob",
matchOwner("bob"))
+ .size(), 1);
+ assertEquals(findByIndex(store, basePath, "by-owner", "alice",
matchOwner("alice"))
+ .size(), 0);
}
@Test(dataProvider = "impl")
@@ -161,6 +173,6 @@ public class MetadataCacheSecondaryIndexTest extends
BaseMetadataStoreTest {
assertTrue(cache.get(path).join().isPresent());
// No index registered, so a lookup with the would-be index name
returns nothing.
- assertEquals(store.findByIndex(path, "by-owner", "alice", r ->
false).join().size(), 0);
+ assertEquals(findByIndex(store, path, "by-owner", "alice", r ->
false).size(), 0);
}
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
index 8dd00543a8f..b842b5c45d2 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -30,13 +31,23 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
public class MetadataStoreSecondaryIndexTest extends BaseMetadataStoreTest {
+ /** Convenience: drive the streaming {@link MetadataStore#scanByIndex} as
a point lookup. */
+ private static List<GetResult> findByIndex(MetadataStore store, String
prefix, String indexName,
+ String key,
java.util.function.Predicate<GetResult> filter) {
+ List<GetResult> out = new ArrayList<>();
+ store.scanByIndex(prefix, indexName, key, key, filter,
ScanConsumer.collectInto(out)).join();
+ return out;
+ }
+
@Test(dataProvider = "impl")
public void putWithSecondaryIndexesPreservesValue(String provider,
Supplier<String> urlSupplier) throws Exception {
@Cleanup
@@ -106,8 +117,8 @@ public class MetadataStoreSecondaryIndexTest extends
BaseMetadataStoreTest {
Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
Map.of("by-owner", "broker-1")).join();
- List<GetResult> results = store.findByIndex(basePath, "by-owner",
"broker-1",
- r -> new String(r.getValue(),
StandardCharsets.UTF_8).contains("broker-1")).join();
+ List<GetResult> results = findByIndex(store, basePath, "by-owner",
"broker-1",
+ r -> new String(r.getValue(),
StandardCharsets.UTF_8).contains("broker-1"));
assertEquals(results.size(), 2);
Set<String> values = results.stream()
@@ -127,8 +138,8 @@ public class MetadataStoreSecondaryIndexTest extends
BaseMetadataStoreTest {
store.put(basePath + "/topic-1",
"value-1".getBytes(StandardCharsets.UTF_8),
Optional.of(-1L), EnumSet.noneOf(CreateOption.class)).join();
- List<GetResult> results = store.findByIndex(basePath, "by-owner",
"nonexistent",
- r -> false).join();
+ List<GetResult> results = findByIndex(store, basePath, "by-owner",
"nonexistent",
+ r -> false);
assertEquals(results.size(), 0);
}
@@ -141,8 +152,8 @@ public class MetadataStoreSecondaryIndexTest extends
BaseMetadataStoreTest {
String basePath = newKey();
- List<GetResult> results = store.findByIndex(basePath, "by-owner",
"broker-1",
- r -> true).join();
+ List<GetResult> results = findByIndex(store, basePath, "by-owner",
"broker-1",
+ r -> true);
assertEquals(results.size(), 0);
}
@@ -167,4 +178,70 @@ public class MetadataStoreSecondaryIndexTest extends
BaseMetadataStoreTest {
assertTrue(result.isPresent());
assertEquals(result.get().getValue(),
"v2".getBytes(StandardCharsets.UTF_8));
}
+
+ @Test(dataProvider = "impl")
+ public void scanByIndexInclusiveRange(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String basePath = newKey();
+ // Three records keyed by zero-padded numeric secondary key (matches
the txn-by-deadline
+ // shape PIP-473 needs). The fallback predicate also enforces the
range so the
+ // scan-and-filter path works correctly on backends without native
indexes.
+ for (long t : new long[]{100L, 200L, 300L}) {
+ String key = String.format("%020d", t);
+ store.put(basePath + "/r-" + t, ("v-" +
t).getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+ Map.of("by-time", key)).join();
+ }
+
+ // Predicate that mirrors the index range — needed by the fallback
scan path.
+ java.util.function.Predicate<GetResult> inRange = r -> {
+ String v = new String(r.getValue(), StandardCharsets.UTF_8);
+ long t = Long.parseLong(v.substring(2));
+ return t >= 100L && t <= 200L;
+ };
+
+ List<GetResult> results = new java.util.ArrayList<>();
+ store.scanByIndex(basePath, "by-time",
+ String.format("%020d", 100L), String.format("%020d", 200L),
+ inRange, ScanConsumer.collectInto(results)).join();
+
+ // Both 100 and 200 fall in [100, 200] inclusive; 300 doesn't.
+ Set<String> values = results.stream()
+ .map(r -> new String(r.getValue(), StandardCharsets.UTF_8))
+ .collect(Collectors.toSet());
+ assertEquals(values, Set.of("v-100", "v-200"));
+ }
+
+ @Test(dataProvider = "impl")
+ public void scanByIndexUnboundedFromKey(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store =
MetadataStoreExtended.create(urlSupplier.get(),
+ MetadataStoreConfig.builder().build());
+
+ String basePath = newKey();
+ for (long t : new long[]{100L, 200L, 300L}) {
+ String key = String.format("%020d", t);
+ store.put(basePath + "/r-" + t, ("v-" +
t).getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+ Map.of("by-time", key)).join();
+ }
+
+ // "All entries with by-time <= 200" — the timeout-sweep shape from
PIP-473.
+ java.util.function.Predicate<GetResult> upTo200 = r -> {
+ String v = new String(r.getValue(), StandardCharsets.UTF_8);
+ return Long.parseLong(v.substring(2)) <= 200L;
+ };
+
+ List<GetResult> results = new java.util.ArrayList<>();
+ store.scanByIndex(basePath, "by-time", null, String.format("%020d",
200L),
+ upTo200, ScanConsumer.collectInto(results)).join();
+
+ Set<String> values = results.stream()
+ .map(r -> new String(r.getValue(), StandardCharsets.UTF_8))
+ .collect(Collectors.toSet());
+ assertEquals(values, Set.of("v-100", "v-200"));
+ }
}