This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c5ab38dd68 [improve][metadata] Replace findByIndex with streaming 
scanByIndex (point + range) (#25731)
9c5ab38dd68 is described below

commit 9c5ab38dd68317dd1e8cba3135ddc3a59319b1fa
Author: Matteo Merli <[email protected]>
AuthorDate: Mon May 11 00:33:56 2026 -0700

    [improve][metadata] Replace findByIndex with streaming scanByIndex (point + 
range) (#25731)
---
 .../broker/resources/ScalableTopicResources.java   | 36 +++++----
 .../apache/pulsar/metadata/api/MetadataStore.java  | 66 ++++++++++------
 .../apache/pulsar/metadata/api/ScanConsumer.java   | 35 ++++++++-
 .../metadata/impl/AbstractMetadataStore.java       | 79 +++++++++++++------
 .../pulsar/metadata/impl/DualMetadataStore.java    | 14 ++--
 .../metadata/impl/oxia/OxiaMetadataStore.java      | 56 ++++++++++----
 .../metadata/MetadataCacheSecondaryIndexTest.java  | 34 ++++++---
 .../metadata/MetadataStoreSecondaryIndexTest.java  | 89 ++++++++++++++++++++--
 8 files changed, 312 insertions(+), 97 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 7867fb90509..5647feef52a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.broker.resources;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import lombok.CustomLog;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -33,10 +35,12 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 /**
@@ -297,7 +301,7 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
         // know index cardinalities up front). The predicate then enforces AND 
across
         // every filter on the loaded record.
         Map.Entry<String, String> indexFilter = 
propertyFilters.entrySet().iterator().next();
-        java.util.function.Predicate<org.apache.pulsar.metadata.api.GetResult> 
matchesAll = result -> {
+        Predicate<GetResult> matchesAll = result -> {
             try {
                 ScalableTopicMetadata md =
                         mapper.readValue(result.getValue(), 
ScalableTopicMetadata.class);
@@ -315,19 +319,23 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
                 return false;
             }
         };
-        return getStore().findByIndex(scanPathPrefix,
-                        indexFilter.getKey(), indexFilter.getValue(), 
matchesAll)
-                // Native-index implementations don't apply the fallback 
predicate, so
-                // re-check here. On the fallback path this is a no-op 
(predicate already
-                // applied) but cheap.
-                .thenApply(results -> results.stream()
-                        .filter(matchesAll)
-                        .map(r -> {
-                            String path = r.getStat().getPath();
-                            String encoded = 
path.substring(path.lastIndexOf('/') + 1);
-                            return TopicName.get("topic", ns, 
Codec.decode(encoded)).toString();
-                        })
-                        .collect(Collectors.toList()));
+        List<GetResult> results = new ArrayList<>();
+        return getStore().scanByIndex(scanPathPrefix,
+                        indexFilter.getKey(), indexFilter.getValue(), 
indexFilter.getValue(),
+                        matchesAll,
+                        ScanConsumer.collectInto(results))
+                .thenApply(__ ->
+                        // Native-index implementations don't apply the 
fallback predicate, so
+                        // re-check here. On the fallback path this is a no-op 
(predicate already
+                        // applied) but cheap.
+                        results.stream()
+                                .filter(matchesAll)
+                                .map(r -> {
+                                    String path = r.getStat().getPath();
+                                    String encoded = 
path.substring(path.lastIndexOf('/') + 1);
+                                    return TopicName.get("topic", ns, 
Codec.decode(encoded)).toString();
+                                })
+                                .collect(Collectors.toList()));
     }
 
     // --- Subscriptions ---
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 528c9fe1bbd..00c8e205e0e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -338,33 +338,55 @@ public interface MetadataStore extends AutoCloseable {
     }
 
     /**
-     * Find all records matching a secondary index.
-     *
-     * <p>On stores that support secondary indexes natively (e.g. Oxia), this 
uses the index
-     * efficiently. On stores that don't (e.g. ZooKeeper), it falls back to 
listing all children
-     * under {@code scanPathPrefix}, fetching each record, and applying {@code 
fallbackFilter}.
-     *
-     * @param scanPathPrefix path prefix for fallback scan (used by stores 
without native index support)
-     * @param indexName      the secondary index name
-     * @param secondaryKey   the secondary key to look up
-     * @param fallbackFilter predicate to filter results during fallback scan; 
ignored by native implementations
-     * @param opts           the set of {@link Option options} for this 
operation
-     * @return list of matching {@link GetResult} entries
+     * Stream records matching a secondary-index value or range.
+     *
+     * <p>One method serves both point lookup and range queries:
+     * <ul>
+     *   <li>Point lookup: {@code fromKey == toKey == key}.</li>
+     *   <li>Range: pass {@code null} on either side for an unbounded bound, 
or specific values for
+     *       a closed range. Both bounds are <b>inclusive</b>.</li>
+     * </ul>
+     *
+     * <p>On stores that support secondary indexes natively (e.g. Oxia), this 
is a single store-side
+     * range scan over the index. On stores that don't (e.g. ZooKeeper), it 
falls back to listing
+     * all children under {@code scanPathPrefix}, fetching each record, and 
applying
+     * {@code fallbackFilter}; the fallback also enforces the {@code [fromKey, 
toKey]} bounds.
+     *
+     * <p>Results are streamed to {@code consumer} as they become available — 
{@link ScanConsumer#onNext}
+     * once per match, then exactly one of {@link ScanConsumer#onCompleted} or
+     * {@link ScanConsumer#onError}. The returned future completes when the 
scan terminates and
+     * mirrors the terminal callback.
+     *
+     * @param scanPathPrefix    path prefix scoping the scan (and used by the 
fallback list)
+     * @param indexName         the secondary-index name
+     * @param fromKeyInclusive  lower bound on the secondary-key value, or 
{@code null} for unbounded
+     * @param toKeyInclusive    upper bound on the secondary-key value, or 
{@code null} for unbounded
+     * @param fallbackFilter    additional predicate applied during fallback 
scan; ignored by native implementations
+     * @param consumer          callback receiving records, completion, or an 
error
+     * @param opts              the set of {@link Option options} for this 
operation
+     * @return a future that completes when the scan terminates
      */
-    default CompletableFuture<List<GetResult>> findByIndex(
-            String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
-        return CompletableFuture.failedFuture(
-                new MetadataStoreException("Secondary index queries not 
supported by this store"));
+    default CompletableFuture<Void> scanByIndex(
+            String scanPathPrefix, String indexName,
+            String fromKeyInclusive, String toKeyInclusive,
+            Predicate<GetResult> fallbackFilter,
+            ScanConsumer consumer, Set<Option> opts) {
+        MetadataStoreException ex =
+                new MetadataStoreException("Secondary index queries not 
supported by this store");
+        consumer.onError(ex);
+        return CompletableFuture.failedFuture(ex);
     }
 
     /**
-     * Like {@link #findByIndex(String, String, String, Predicate, Set)} with 
no options.
+     * Like {@link #scanByIndex(String, String, String, String, Predicate, 
ScanConsumer, Set)} with no options.
      */
-    default CompletableFuture<List<GetResult>> findByIndex(
-            String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter) {
-        return findByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter, Set.of());
+    default CompletableFuture<Void> scanByIndex(
+            String scanPathPrefix, String indexName,
+            String fromKeyInclusive, String toKeyInclusive,
+            Predicate<GetResult> fallbackFilter,
+            ScanConsumer consumer) {
+        return scanByIndex(scanPathPrefix, indexName, fromKeyInclusive, 
toKeyInclusive,
+                fallbackFilter, consumer, Set.of());
     }
 
     /**
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
index 24ce6c15ac5..ddb8475c39a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
@@ -19,9 +19,10 @@
 package org.apache.pulsar.metadata.api;
 
 /**
- * Streaming consumer for {@link MetadataStore#scanChildren} results.
+ * Streaming consumer for {@link MetadataStore#scanChildren} and {@link 
MetadataStore#scanByIndex}
+ * results.
  *
- * <p>The store invokes {@link #onNext} for each child record (in key order), 
and then either
+ * <p>The store invokes {@link #onNext} for each record (in key order), and 
then either
  * {@link #onCompleted} (success) or {@link #onError} (failure) exactly once. 
Implementations must
  * be safe to invoke from a metadata-store internal thread; back-pressure is 
the consumer's
  * responsibility (long blocking work in {@code onNext} can stall the scan).
@@ -47,4 +48,34 @@ public interface ScanConsumer {
      * callbacks are made.
      */
     void onCompleted();
+
+    /**
+     * A {@link ScanConsumer} that accumulates emitted records into the given 
list. Useful when a
+     * caller wants the convenience of a {@code 
CompletableFuture<List<GetResult>>} on top of the
+     * streaming API — failure / completion are signaled by the future 
returned from
+     * {@code scanByIndex} / {@code scanChildren}.
+     *
+     * <pre>{@code
+     * List<GetResult> out = new ArrayList<>();
+     * store.scanByIndex(prefix, indexName, key, key, filter, 
ScanConsumer.collectInto(out)).join();
+     * }</pre>
+     */
+    static ScanConsumer collectInto(java.util.List<GetResult> out) {
+        return new ScanConsumer() {
+            @Override
+            public void onNext(GetResult result) {
+                out.add(result);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                // Caller observes failure via the scan's CompletableFuture.
+            }
+
+            @Override
+            public void onCompleted() {
+                // No-op.
+            }
+        };
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 02454bd5570..1112a4e0975 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -543,13 +543,28 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                                                         Set<Option> opts);
 
     @Override
-    public CompletableFuture<List<GetResult>> findByIndex(
-            String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
+    public CompletableFuture<Void> scanByIndex(
+            String scanPathPrefix, String indexName,
+            String fromKeyInclusive, String toKeyInclusive,
+            Predicate<GetResult> fallbackFilter,
+            ScanConsumer consumer, Set<Option> opts) {
         if (isClosed()) {
-            return alreadyClosedFailedFuture();
+            CompletableFuture<Void> failed = alreadyClosedFailedFuture();
+            failed.whenComplete((__, ex) -> {
+                if (ex != null) {
+                    consumer.onError(ex);
+                }
+            });
+            return failed;
         }
-        return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter, opts);
+        if (scanPathPrefix == null || indexName == null || consumer == null) {
+            MetadataStoreException ex = new MetadataStoreException(
+                    "scanPathPrefix, indexName, and consumer must be 
non-null");
+            consumer.onError(ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        return storeScanByIndex(scanPathPrefix, indexName, fromKeyInclusive, 
toKeyInclusive,
+                fallbackFilter, consumer, opts);
     }
 
     @Override
@@ -601,23 +616,43 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         return result;
     }
 
-    protected CompletableFuture<List<GetResult>> storeFindByIndex(
-            String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
-        // Default fallback: full scan under scanPathPrefix, applying 
fallbackFilter to each result.
-        return getChildrenFromStore(scanPathPrefix, opts)
-                .thenCompose(children -> {
-                    List<CompletableFuture<Optional<GetResult>>> futures = 
children.stream()
-                            .map(child -> storeGet(scanPathPrefix + "/" + 
child, opts))
-                            .toList();
-                    return FutureUtil.waitForAll(futures)
-                            .thenApply(__ -> futures.stream()
-                                    .map(CompletableFuture::join)
-                                    .filter(Optional::isPresent)
-                                    .map(Optional::get)
-                                    .filter(fallbackFilter)
-                                    .toList());
-                });
+    /**
+     * Backend hook for {@link #scanByIndex}. Default fallback: list children 
under
+     * {@code scanPathPrefix}, fetch each, and stream those passing {@code 
fallbackFilter} to
+     * {@code consumer}. Backends with a native indexed range-scan primitive 
(Oxia) override this
+     * to issue one store-side scan against the index.
+     *
+     * <p>The default impl ignores {@code indexName}, {@code 
fromKeyInclusive}, and
+     * {@code toKeyInclusive} — the caller's {@code fallbackFilter} is the 
only criterion.
+     * Callers that want range bounds enforced on non-native backends should 
encode the bounds in
+     * {@code fallbackFilter}.
+     */
+    protected CompletableFuture<Void> storeScanByIndex(
+            String scanPathPrefix, String indexName,
+            String fromKeyInclusive, String toKeyInclusive,
+            Predicate<GetResult> fallbackFilter,
+            ScanConsumer consumer, Set<Option> opts) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        getChildrenFromStore(scanPathPrefix, opts).thenCompose(children -> {
+            CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+            for (String child : children) {
+                String childPath = scanPathPrefix.equals("/") ? "/" + child : 
scanPathPrefix + "/" + child;
+                chain = chain.thenCompose(__ -> storeGet(childPath, opts))
+                        .thenAccept(opt -> 
opt.filter(fallbackFilter).ifPresent(consumer::onNext));
+            }
+            return chain;
+        }).whenComplete((v, ex) -> {
+            if (ex != null) {
+                Throwable cause = ex instanceof CompletionException && 
ex.getCause() != null
+                        ? ex.getCause() : ex;
+                consumer.onError(cause);
+                result.completeExceptionally(cause);
+            } else {
+                consumer.onCompleted();
+                result.complete(null);
+            }
+        });
+        return result;
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 04b56883a40..16daffc81db 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -303,14 +303,18 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
     }
 
     @Override
-    public CompletableFuture<List<GetResult>> findByIndex(
-            String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
+    public CompletableFuture<Void> scanByIndex(
+            String scanPathPrefix, String indexName,
+            String fromKeyInclusive, String toKeyInclusive,
+            Predicate<GetResult> fallbackFilter,
+            ScanConsumer consumer, Set<Option> opts) {
         return switch (migrationState.getPhase()) {
             case NOT_STARTED, PREPARATION, COPYING, FAILED ->
-                    sourceStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter, opts);
+                    sourceStore.scanByIndex(scanPathPrefix, indexName, 
fromKeyInclusive, toKeyInclusive,
+                            fallbackFilter, consumer, opts);
             case COMPLETED ->
-                    targetStore.findByIndex(scanPathPrefix, indexName, 
secondaryKey, fallbackFilter, opts);
+                    targetStore.scanByIndex(scanPathPrefix, indexName, 
fromKeyInclusive, toKeyInclusive,
+                            fallbackFilter, consumer, opts);
         };
     }
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 07a327b3b39..c8f52e468ab 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -248,25 +248,51 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     }
 
     @Override
-    protected CompletableFuture<List<GetResult>> storeFindByIndex(
-            String scanPathPrefix, String indexName, String secondaryKey,
-            Predicate<GetResult> fallbackFilter, Set<Option> opts) {
-        String scopedKey = scanPathPrefix + "/" + secondaryKey;
+    protected CompletableFuture<Void> storeScanByIndex(
+            String scanPathPrefix, String indexName,
+            String fromKeyInclusive, String toKeyInclusive,
+            Predicate<GetResult> fallbackFilter,
+            ScanConsumer consumer, Set<Option> opts) {
+        // Index entries are stored at "<parentPath>/<secondaryKey>" by 
doStorePut, where
+        // parentPath == scanPathPrefix for records directly under the prefix. 
The API uses
+        // inclusive-on-both-sides bounds; Oxia's list(start, end, ...) is 
half-open, so we
+        // append a sentinel character to the upper bound to widen the 
half-open range to
+        // include `toKeyInclusive`. We use '~' (0x7E) — it lex-sorts after 
every printable
+        // ASCII byte, which is a safe choice for the secondary keys callers 
actually use today
+        // (numeric-encoded timestamps, fixed-tag enums, etc.). Callers that 
need full byte-range
+        // coverage can pass `toKeyInclusive=null` or use a different scheme.
+        String scopedFrom = fromKeyInclusive == null
+                ? scanPathPrefix + "/"
+                : scanPathPrefix + "/" + fromKeyInclusive;
+        String scopedTo = toKeyInclusive == null
+                ? scanPathPrefix + "0"   // '0' (0x30) is the lex successor of 
'/' (0x2F)
+                : scanPathPrefix + "/" + toKeyInclusive + "~";
         Set<ListOption> listOpts = new HashSet<>(listOptions(opts));
         listOpts.add(ListOption.UseIndex(indexName));
-        return client.list(scopedKey, scopedKey + "~", listOpts)
+        CompletableFuture<Void> done = new CompletableFuture<>();
+        client.list(scopedFrom, scopedTo, listOpts)
                 .thenCompose(primaryKeys -> {
-                    List<CompletableFuture<Optional<GetResult>>> futures = 
primaryKeys.stream()
-                            .map(p -> storeGet(p, opts))
-                            .toList();
-                    return FutureUtil.waitForAll(futures)
-                            .thenApply(__ -> futures.stream()
-                                    .map(CompletableFuture::join)
-                                    .filter(Optional::isPresent)
-                                    .map(Optional::get)
-                                    .toList());
+                    // Native indexes already enforced the range; 
fallbackFilter is unused here
+                    // (it's the scan-and-filter compat-path predicate).
+                    CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+                    for (String key : primaryKeys) {
+                        chain = chain
+                                .thenCompose(__ -> storeGet(key, opts))
+                                .thenAccept(opt -> 
opt.ifPresent(consumer::onNext));
+                    }
+                    return chain;
                 })
-                .exceptionallyCompose(this::convertException);
+                .whenComplete((v, ex) -> {
+                    if (ex != null) {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        consumer.onError(cause);
+                        done.completeExceptionally(cause);
+                    } else {
+                        consumer.onCompleted();
+                        done.complete(null);
+                    }
+                });
+        return done;
     }
 
     private CompletableFuture<Stat> doStorePut(
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
index 1469dddee8c..48634f5e636 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheSecondaryIndexTest.java
@@ -20,10 +20,12 @@ package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import lombok.AllArgsConstructor;
 import lombok.Cleanup;
@@ -31,7 +33,9 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.testng.annotations.Test;
 
@@ -49,6 +53,14 @@ import org.testng.annotations.Test;
  */
 public class MetadataCacheSecondaryIndexTest extends BaseMetadataStoreTest {
 
+    /** Convenience: drive the streaming {@link MetadataStore#scanByIndex} as 
a point lookup. */
+    private static List<GetResult> findByIndex(MetadataStore store, String 
prefix, String indexName,
+                                                String key, 
Predicate<GetResult> filter) {
+        List<GetResult> out = new ArrayList<>();
+        store.scanByIndex(prefix, indexName, key, key, filter, 
ScanConsumer.collectInto(out)).join();
+        return out;
+    }
+
     @Data
     @AllArgsConstructor
     @NoArgsConstructor
@@ -105,14 +117,14 @@ public class MetadataCacheSecondaryIndexTest extends 
BaseMetadataStoreTest {
                 v -> Map.of("by-owner", v.getOwner(), "by-team", 
v.getTeam())).join();
 
         // Owner=alice should return r1 + r2.
-        List<GetResult> aliceOwned = store.findByIndex(basePath, "by-owner", 
"alice",
-                matchOwner("alice")).join();
+        List<GetResult> aliceOwned = findByIndex(store, basePath, "by-owner", 
"alice",
+                matchOwner("alice"));
         assertEquals(aliceOwned.size(), 2);
 
         // Team=platform should return r1 + r3.
         Set<String> platformPaths = new HashSet<>();
-        for (GetResult r : store.findByIndex(basePath, "by-team", "platform",
-                matchTeam("platform")).join()) {
+        for (GetResult r : findByIndex(store, basePath, "by-team", "platform",
+                matchTeam("platform"))) {
             platformPaths.add(r.getStat().getPath());
         }
         assertEquals(platformPaths, Set.of(basePath + "/r1", basePath + 
"/r3"));
@@ -131,18 +143,18 @@ public class MetadataCacheSecondaryIndexTest extends 
BaseMetadataStoreTest {
                 v -> Map.of("by-owner", v.getOwner());
 
         cache.create(basePath + "/r1", new IndexedValue("alice", "platform"), 
extractor).join();
-        assertEquals(store.findByIndex(basePath, "by-owner", "alice", 
matchOwner("alice"))
-                .join().size(), 1);
+        assertEquals(findByIndex(store, basePath, "by-owner", "alice", 
matchOwner("alice"))
+                .size(), 1);
 
         // Reassign owner via update — the new owner becomes the queryable one 
and the
         // old owner's lookup must no longer surface this record.
         cache.readModifyUpdate(basePath + "/r1", current -> new 
IndexedValue("bob", current.getTeam()),
                 extractor).join();
 
-        assertEquals(store.findByIndex(basePath, "by-owner", "bob", 
matchOwner("bob"))
-                .join().size(), 1);
-        assertEquals(store.findByIndex(basePath, "by-owner", "alice", 
matchOwner("alice"))
-                .join().size(), 0);
+        assertEquals(findByIndex(store, basePath, "by-owner", "bob", 
matchOwner("bob"))
+                .size(), 1);
+        assertEquals(findByIndex(store, basePath, "by-owner", "alice", 
matchOwner("alice"))
+                .size(), 0);
     }
 
     @Test(dataProvider = "impl")
@@ -161,6 +173,6 @@ public class MetadataCacheSecondaryIndexTest extends 
BaseMetadataStoreTest {
 
         assertTrue(cache.get(path).join().isPresent());
         // No index registered, so a lookup with the would-be index name 
returns nothing.
-        assertEquals(store.findByIndex(path, "by-owner", "alice", r -> 
false).join().size(), 0);
+        assertEquals(findByIndex(store, path, "by-owner", "alice", r -> 
false).size(), 0);
     }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
index 8dd00543a8f..b842b5c45d2 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreSecondaryIndexTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.metadata;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -30,13 +31,23 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.testng.annotations.Test;
 
 public class MetadataStoreSecondaryIndexTest extends BaseMetadataStoreTest {
 
+    /** Convenience: drive the streaming {@link MetadataStore#scanByIndex} as 
a point lookup. */
+    private static List<GetResult> findByIndex(MetadataStore store, String 
prefix, String indexName,
+                                                String key, 
java.util.function.Predicate<GetResult> filter) {
+        List<GetResult> out = new ArrayList<>();
+        store.scanByIndex(prefix, indexName, key, key, filter, 
ScanConsumer.collectInto(out)).join();
+        return out;
+    }
+
     @Test(dataProvider = "impl")
     public void putWithSecondaryIndexesPreservesValue(String provider, 
Supplier<String> urlSupplier) throws Exception {
         @Cleanup
@@ -106,8 +117,8 @@ public class MetadataStoreSecondaryIndexTest extends 
BaseMetadataStoreTest {
                 Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
                 Map.of("by-owner", "broker-1")).join();
 
-        List<GetResult> results = store.findByIndex(basePath, "by-owner", 
"broker-1",
-                r -> new String(r.getValue(), 
StandardCharsets.UTF_8).contains("broker-1")).join();
+        List<GetResult> results = findByIndex(store, basePath, "by-owner", 
"broker-1",
+                r -> new String(r.getValue(), 
StandardCharsets.UTF_8).contains("broker-1"));
 
         assertEquals(results.size(), 2);
         Set<String> values = results.stream()
@@ -127,8 +138,8 @@ public class MetadataStoreSecondaryIndexTest extends 
BaseMetadataStoreTest {
         store.put(basePath + "/topic-1", 
"value-1".getBytes(StandardCharsets.UTF_8),
                 Optional.of(-1L), EnumSet.noneOf(CreateOption.class)).join();
 
-        List<GetResult> results = store.findByIndex(basePath, "by-owner", 
"nonexistent",
-                r -> false).join();
+        List<GetResult> results = findByIndex(store, basePath, "by-owner", 
"nonexistent",
+                r -> false);
 
         assertEquals(results.size(), 0);
     }
@@ -141,8 +152,8 @@ public class MetadataStoreSecondaryIndexTest extends 
BaseMetadataStoreTest {
 
         String basePath = newKey();
 
-        List<GetResult> results = store.findByIndex(basePath, "by-owner", 
"broker-1",
-                r -> true).join();
+        List<GetResult> results = findByIndex(store, basePath, "by-owner", 
"broker-1",
+                r -> true);
 
         assertEquals(results.size(), 0);
     }
@@ -167,4 +178,70 @@ public class MetadataStoreSecondaryIndexTest extends 
BaseMetadataStoreTest {
         assertTrue(result.isPresent());
         assertEquals(result.get().getValue(), 
"v2".getBytes(StandardCharsets.UTF_8));
     }
+
+    @Test(dataProvider = "impl")
+    public void scanByIndexInclusiveRange(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String basePath = newKey();
+        // Three records keyed by zero-padded numeric secondary key (matches 
the txn-by-deadline
+        // shape PIP-473 needs). The fallback predicate also enforces the 
range so the
+        // scan-and-filter path works correctly on backends without native 
indexes.
+        for (long t : new long[]{100L, 200L, 300L}) {
+            String key = String.format("%020d", t);
+            store.put(basePath + "/r-" + t, ("v-" + 
t).getBytes(StandardCharsets.UTF_8),
+                    Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+                    Map.of("by-time", key)).join();
+        }
+
+        // Predicate that mirrors the index range — needed by the fallback 
scan path.
+        java.util.function.Predicate<GetResult> inRange = r -> {
+            String v = new String(r.getValue(), StandardCharsets.UTF_8);
+            long t = Long.parseLong(v.substring(2));
+            return t >= 100L && t <= 200L;
+        };
+
+        List<GetResult> results = new java.util.ArrayList<>();
+        store.scanByIndex(basePath, "by-time",
+                String.format("%020d", 100L), String.format("%020d", 200L),
+                inRange, ScanConsumer.collectInto(results)).join();
+
+        // Both 100 and 200 fall in [100, 200] inclusive; 300 doesn't.
+        Set<String> values = results.stream()
+                .map(r -> new String(r.getValue(), StandardCharsets.UTF_8))
+                .collect(Collectors.toSet());
+        assertEquals(values, Set.of("v-100", "v-200"));
+    }
+
+    @Test(dataProvider = "impl")
+    public void scanByIndexUnboundedFromKey(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        String basePath = newKey();
+        for (long t : new long[]{100L, 200L, 300L}) {
+            String key = String.format("%020d", t);
+            store.put(basePath + "/r-" + t, ("v-" + 
t).getBytes(StandardCharsets.UTF_8),
+                    Optional.of(-1L), EnumSet.noneOf(CreateOption.class),
+                    Map.of("by-time", key)).join();
+        }
+
+        // "All entries with by-time <= 200" — the timeout-sweep shape from 
PIP-473.
+        java.util.function.Predicate<GetResult> upTo200 = r -> {
+            String v = new String(r.getValue(), StandardCharsets.UTF_8);
+            return Long.parseLong(v.substring(2)) <= 200L;
+        };
+
+        List<GetResult> results = new java.util.ArrayList<>();
+        store.scanByIndex(basePath, "by-time", null, String.format("%020d", 
200L),
+                upTo200, ScanConsumer.collectInto(results)).join();
+
+        Set<String> values = results.stream()
+                .map(r -> new String(r.getValue(), StandardCharsets.UTF_8))
+                .collect(Collectors.toSet());
+        assertEquals(values, Set.of("v-100", "v-200"));
+    }
 }


Reply via email to