This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd0ab9d6ad3 [improve][metadata] Add streaming scanChildren to 
MetadataStore (#25701)
cd0ab9d6ad3 is described below

commit cd0ab9d6ad33f9cde9bcf56177a3f6f9deb9f510
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 21:29:23 2026 -0700

    [improve][metadata] Add streaming scanChildren to MetadataStore (#25701)
---
 .../apache/pulsar/metadata/api/MetadataStore.java  |  27 ++++
 .../apache/pulsar/metadata/api/ScanConsumer.java   |  50 +++++++
 .../metadata/impl/AbstractMetadataStore.java       |  51 +++++++
 .../pulsar/metadata/impl/DualMetadataStore.java    |  11 ++
 .../metadata/impl/FaultInjectionMetadataStore.java |  13 ++
 .../metadata/impl/LocalMemoryMetadataStore.java    |  34 +++++
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |  80 +++++++++++
 .../metadata/impl/oxia/OxiaMetadataStore.java      |  36 +++++
 .../pulsar/metadata/BaseMetadataStoreTest.java     |   1 +
 .../metadata/MetadataStoreScanChildrenTest.java    | 149 +++++++++++++++++++++
 10 files changed, 452 insertions(+)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 19e50ed6998..b2d90800943 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -287,6 +287,33 @@ public interface MetadataStore extends AutoCloseable {
                 new MetadataStoreException("Secondary index queries not 
supported by this store"));
     }
 
+    /**
+     * Stream all direct children of {@code parentPath} together with their 
values.
+     *
+     * <p>This is the value-bearing counterpart to {@link #getChildren} — same 
semantics for
+     * what counts as a "child" (one hierarchical level below {@code 
parentPath}, no
+     * descendants), but each record carries the value and {@link Stat} 
alongside the path.
+     * Results are delivered to {@code consumer} as they become available so 
callers don't
+     * have to materialize a potentially-large list in memory.
+     *
+     * <p>The consumer's {@link ScanConsumer#onNext} is invoked for each 
child, then either
+     * {@link ScanConsumer#onCompleted} (success) or {@link 
ScanConsumer#onError} (failure)
+     * exactly once. The returned future completes when the scan terminates 
and mirrors the
+     * terminal callback — callers may rely on either.
+     *
+     * <p>Backends with a native range-scan primitive (Oxia, RocksDB, 
in-memory NavigableMap)
+     * issue a single store-side scan. Other backends fall back to {@link 
#getChildren} +
+     * sequential {@link #get}, at the cost of one extra round trip per child.
+     *
+     * @param parentPath path whose direct children should be streamed
+     * @param consumer   callback that receives records, completion, or an 
error
+     * @return a future that completes when the scan terminates
+     */
+    default CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+        return CompletableFuture.failedFuture(
+                new MetadataStoreException("scanChildren not supported by this 
store"));
+    }
+
     /**
      * Returns the default metadata cache config.
      *
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
new file mode 100644
index 00000000000..24ce6c15ac5
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+/**
+ * Streaming consumer for {@link MetadataStore#scanChildren} results.
+ *
+ * <p>The store invokes {@link #onNext} for each child record (in key order), 
and then either
+ * {@link #onCompleted} (success) or {@link #onError} (failure) exactly once. 
Implementations must
+ * be safe to invoke from a metadata-store internal thread; back-pressure is 
the consumer's
+ * responsibility (long blocking work in {@code onNext} can stall the scan).
+ */
+public interface ScanConsumer {
+
+    /**
+     * Called once per record. The result's {@link Stat#getPath()} carries the 
full key.
+     *
+     * @param result a child record under the requested parent path
+     */
+    void onNext(GetResult result);
+
+    /**
+     * Called at most once when the scan fails. After this call no further 
callbacks are made.
+     *
+     * @param throwable the cause of the failure
+     */
+    void onError(Throwable throwable);
+
+    /**
+     * Called at most once when the scan finishes without error. After this 
call no further
+     * callbacks are made.
+     */
+    void onCompleted();
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 6442ece216e..02086898e9f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -41,6 +41,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -69,6 +70,7 @@ import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -529,6 +531,55 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, 
fallbackFilter);
     }
 
+    @Override
+    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+        if (isClosed()) {
+            CompletableFuture<Void> failed = alreadyClosedFailedFuture();
+            failed.whenComplete((__, ex) -> {
+                if (ex != null) {
+                    consumer.onError(ex);
+                }
+            });
+            return failed;
+        }
+        if (parentPath == null) {
+            MetadataStoreException ex = new MetadataStoreException("parentPath 
must be non-null");
+            consumer.onError(ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        return storeScanChildren(parentPath, consumer);
+    }
+
+    /**
+     * Backend hook for {@link #scanChildren}. The default implementation 
lists the parent's
+     * children with {@link #getChildrenFromStore} and fetches each value 
sequentially with
+     * {@link #storeGet}. Backends with a native range-scan primitive (Oxia, 
RocksDB,
+     * in-memory NavigableMap) override this method for a single store-side 
scan.
+     */
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        getChildrenFromStore(parentPath).thenCompose(children -> {
+            CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+            for (String child : children) {
+                String childPath = parentPath.equals("/") ? "/" + child : 
parentPath + "/" + child;
+                chain = chain.thenCompose(__ -> storeGet(childPath))
+                        .thenAccept(opt -> opt.ifPresent(consumer::onNext));
+            }
+            return chain;
+        }).whenComplete((v, ex) -> {
+            if (ex != null) {
+                Throwable cause = ex instanceof CompletionException && 
ex.getCause() != null
+                        ? ex.getCause() : ex;
+                consumer.onError(cause);
+                result.completeExceptionally(cause);
+            } else {
+                consumer.onCompleted();
+                result.complete(null);
+            }
+        });
+        return result;
+    }
+
     protected CompletableFuture<List<GetResult>> storeFindByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
             Predicate<GetResult> fallbackFilter) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 70965887012..bcc6b7226da 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -309,6 +310,16 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
         };
     }
 
+    @Override
+    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+        return switch (migrationState.getPhase()) {
+            case NOT_STARTED, PREPARATION, COPYING, FAILED ->
+                    sourceStore.scanChildren(parentPath, consumer);
+            case COMPLETED ->
+                    targetStore.scanChildren(parentPath, consumer);
+        };
+    }
+
     @Override
     public CompletableFuture<Stat> put(String path, byte[] value, 
Optional<Long> expectedVersion,
                                        EnumSet<CreateOption> options, 
Map<String, String> secondaryIndexes) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index f409115ed9c..de988b6b1c8 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.metadata.api.MetadataEvent;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -60,6 +61,7 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
         EXISTS,
         PUT,
         DELETE,
+        SCAN_CHILDREN,
     }
 
     @Data
@@ -155,6 +157,17 @@ public class FaultInjectionMetadataStore implements 
MetadataStoreExtended {
         return store.deleteRecursive(path);
     }
 
+    @Override
+    public CompletableFuture<Void> scanChildren(String parentPath, 
ScanConsumer consumer) {
+        Optional<MetadataStoreException> ex = 
programmedFailure(OperationType.SCAN_CHILDREN, parentPath);
+        if (ex.isPresent()) {
+            consumer.onError(ex.get());
+            return FutureUtil.failedFuture(ex.get());
+        }
+
+        return store.scanChildren(parentPath, consumer);
+    }
+
     @Override
     public void registerListener(Consumer<Notification> listener) {
         store.registerListener(listener);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 4679d410edd..7c7bd31e29b 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -45,6 +45,7 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -121,6 +122,39 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
         }
     }
 
+    @Override
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+        // Snapshot the immediate children under the lock, then dispatch 
outside it so a slow
+        // consumer can't stall other store operations.
+        List<GetResult> snapshot = new ArrayList<>();
+        synchronized (map) {
+            String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
+            String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
+            map.subMap(firstKey, false, lastKey, false).forEach((key, value) 
-> {
+                // Filter to direct children only — paths with no further "/" 
beyond the
+                // parent's level. Same scoping `getChildrenFromStore` applies.
+                int relStart = firstKey.length();
+                if (key.indexOf('/', relStart) >= 0) {
+                    return;
+                }
+                snapshot.add(new GetResult(
+                        value.data,
+                        new Stat(key, value.version, value.createdTimestamp, 
value.modifiedTimestamp,
+                                value.isEphemeral(), true)));
+            });
+        }
+        try {
+            for (GetResult r : snapshot) {
+                consumer.onNext(r);
+            }
+            consumer.onCompleted();
+            return CompletableFuture.completedFuture(null);
+        } catch (Throwable t) {
+            consumer.onError(t);
+            return FutureUtil.failedFuture(t);
+        }
+    }
+
     @Override
     public CompletableFuture<List<String>> getChildrenFromStore(String path) {
         if (!isValidPath(path)) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 4ad7e3de2fd..3a7d7abe5d2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.rocksdb.ColumnFamilyDescriptor;
@@ -406,6 +407,85 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
         }
     }
 
+    @Override
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+        // Native iterator-based scan over the parent's key range, with the 
same direct-child
+        // filter getChildrenFromStore applies. Snapshot under the read lock 
then dispatch
+        // outside it.
+        List<GetResult> snapshot = new ArrayList<>();
+        try {
+            dbStateLock.readLock().lock();
+            if (isClosed()) {
+                CompletableFuture<Void> failed = alreadyClosedFailedFuture();
+                failed.whenComplete((__, ex) -> {
+                    if (ex != null) {
+                        consumer.onError(ex);
+                    }
+                });
+                return failed;
+            }
+            String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
+            String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
+            byte[] endBytes = toBytes(lastKey);
+            try (RocksIterator iterator = db.newIterator(optionDontCache)) {
+                for (iterator.seek(toBytes(firstKey)); iterator.isValid(); 
iterator.next()) {
+                    byte[] keyBytes = iterator.key();
+                    if (compareUnsigned(keyBytes, endBytes) >= 0) {
+                        break;
+                    }
+                    String currentPath = toString(keyBytes);
+                    // Direct children only.
+                    if (currentPath.indexOf('/', firstKey.length()) >= 0) {
+                        continue;
+                    }
+                    byte[] value = iterator.value();
+                    if (value == null) {
+                        continue;
+                    }
+                    MetaValue metaValue = MetaValue.parse(value);
+                    if (metaValue.ephemeral && metaValue.owner != instanceId) {
+                        // Ephemeral record left behind by a different 
session; skip.
+                        continue;
+                    }
+                    snapshot.add(new GetResult(metaValue.getData(),
+                            new Stat(currentPath,
+                                    metaValue.getVersion(),
+                                    metaValue.getCreatedTimestamp(),
+                                    metaValue.getModifiedTimestamp(),
+                                    metaValue.ephemeral,
+                                    metaValue.getOwner() == instanceId)));
+                }
+            }
+        } catch (Throwable e) {
+            MetadataStoreException ex = MetadataStoreException.wrap(e);
+            consumer.onError(ex);
+            return FutureUtil.failedFuture(ex);
+        } finally {
+            dbStateLock.readLock().unlock();
+        }
+        try {
+            for (GetResult r : snapshot) {
+                consumer.onNext(r);
+            }
+            consumer.onCompleted();
+            return CompletableFuture.completedFuture(null);
+        } catch (Throwable t) {
+            consumer.onError(t);
+            return FutureUtil.failedFuture(t);
+        }
+    }
+
+    private static int compareUnsigned(byte[] a, byte[] b) {
+        int len = Math.min(a.length, b.length);
+        for (int i = 0; i < len; i++) {
+            int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+            if (diff != 0) {
+                return diff;
+            }
+        }
+        return a.length - b.length;
+    }
+
     @Override
     public CompletableFuture<List<String>> getChildrenFromStore(String path) {
         log.debug().attr("path", path).attr("instanceId", 
instanceId).log("getChildrenFromStore");
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 13cefa12623..21bbba615d4 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -51,6 +51,7 @@ import 
org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -216,6 +217,41 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
         return doStorePut(path, data, optExpectedVersion, options, 
secondaryIndexes);
     }
 
+    @Override
+    protected CompletableFuture<Void> storeScanChildren(String parentPath, 
ScanConsumer consumer) {
+        // Oxia's hierarchical sort makes [parentPath + "/", parentPath + 
"//") the canonical
+        // range covering exactly the immediate children — same convention 
getChildrenFromStore
+        // uses with `client.list(...)`.
+        String firstKey = parentPath.endsWith("/") ? parentPath : parentPath + 
"/";
+        String lastKey = firstKey + "/";
+        CompletableFuture<Void> done = new CompletableFuture<>();
+        try {
+            client.rangeScan(firstKey, lastKey, new 
io.oxia.client.api.RangeScanConsumer() {
+                @Override
+                public void onNext(io.oxia.client.api.GetResult result) {
+                    consumer.onNext(new GetResult(result.value(),
+                            convertStat(result.key(), result.version())));
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    consumer.onError(throwable);
+                    done.completeExceptionally(throwable);
+                }
+
+                @Override
+                public void onCompleted() {
+                    consumer.onCompleted();
+                    done.complete(null);
+                }
+            });
+        } catch (Throwable t) {
+            consumer.onError(t);
+            done.completeExceptionally(t);
+        }
+        return done;
+    }
+
     @Override
     protected CompletableFuture<List<GetResult>> storeFindByIndex(
             String scanPathPrefix, String indexName, String secondaryKey,
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index d55ad072dff..fee0955744c 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -146,6 +146,7 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
         return filterImplementations("ZooKeeper", "MockZooKeeper");
     }
 
+
     protected Object[][] filterImplementations(String... providers) {
         Set<String> providersSet = Set.of(providers);
         return Arrays.stream(allImplementations())
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
new file mode 100644
index 00000000000..37eea03713c
--- /dev/null
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+public class MetadataStoreScanChildrenTest extends BaseMetadataStoreTest {
+
+    /** Records all callbacks for assertion in tests. */
+    private static final class CollectingConsumer implements ScanConsumer {
+        final List<GetResult> records = new ArrayList<>();
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+        final CountDownLatch done = new CountDownLatch(1);
+
+        @Override
+        public void onNext(GetResult result) {
+            records.add(result);
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            error.set(throwable);
+            done.countDown();
+        }
+
+        @Override
+        public void onCompleted() {
+            done.countDown();
+        }
+
+        void awaitDone() throws InterruptedException {
+            assertTrue(done.await(30, TimeUnit.SECONDS), "scan did not 
terminate within 30s");
+        }
+    }
+
+    @Test(dataProvider = "impl")
+    public void streamsAllChildrenWithValues(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(
+                urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String parent = newKey();
+        Set<String> expectedNames = Set.of("a", "b", "c");
+        for (String name : expectedNames) {
+            store.put(parent + "/" + name, 
name.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join();
+        }
+
+        CollectingConsumer consumer = new CollectingConsumer();
+        store.scanChildren(parent, consumer).join();
+        consumer.awaitDone();
+
+        assertEquals(consumer.records.size(), 3);
+        Set<String> seenPaths = new HashSet<>();
+        Set<String> seenValues = new HashSet<>();
+        for (GetResult r : consumer.records) {
+            seenPaths.add(r.getStat().getPath());
+            seenValues.add(new String(r.getValue(), StandardCharsets.UTF_8));
+        }
+        assertEquals(seenPaths, Set.of(parent + "/a", parent + "/b", parent + 
"/c"));
+        assertEquals(seenValues, expectedNames);
+    }
+
+    @Test(dataProvider = "impl")
+    public void parentWithNoChildrenCompletesEmpty(String provider, 
Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(
+                urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String parent = newKey();
+
+        CollectingConsumer consumer = new CollectingConsumer();
+        store.scanChildren(parent, consumer).join();
+        consumer.awaitDone();
+
+        assertEquals(consumer.records.size(), 0);
+    }
+
+    @Test(dataProvider = "impl")
+    public void doesNotEmitDescendantsBeyondImmediateChildren(String provider, 
Supplier<String> urlSupplier)
+            throws Exception {
+        // scanChildren is hierarchy-aware: deeper paths (children of 
children) are NOT emitted.
+        @Cleanup
+        MetadataStoreExtended store = MetadataStoreExtended.create(
+                urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+        String parent = newKey();
+        String child = parent + "/child";
+        String grandchild = child + "/inner";
+
+        store.put(child, "C".getBytes(StandardCharsets.UTF_8), 
Optional.of(-1L)).join();
+        store.put(grandchild, "G".getBytes(StandardCharsets.UTF_8), 
Optional.of(-1L)).join();
+
+        CollectingConsumer consumer = new CollectingConsumer();
+        store.scanChildren(parent, consumer).join();
+        consumer.awaitDone();
+
+        assertEquals(consumer.records.size(), 1);
+        assertEquals(consumer.records.get(0).getStat().getPath(), child);
+    }
+
+    @Test(dataProvider = "impl")
+    public void closedStoreRejectsScan(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        MetadataStoreExtended store = MetadataStoreExtended.create(
+                urlSupplier.get(), MetadataStoreConfig.builder().build());
+        store.close();
+
+        CollectingConsumer consumer = new CollectingConsumer();
+        CompletionException ex = expectThrows(CompletionException.class,
+                () -> store.scanChildren("/anything", consumer).join());
+        assertTrue(ex.getCause() instanceof 
MetadataStoreException.AlreadyClosedException,
+                "expected AlreadyClosedException, got: " + ex.getCause());
+    }
+}

Reply via email to