This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cd0ab9d6ad3 [improve][metadata] Add streaming scanChildren to
MetadataStore (#25701)
cd0ab9d6ad3 is described below
commit cd0ab9d6ad33f9cde9bcf56177a3f6f9deb9f510
Author: Matteo Merli <[email protected]>
AuthorDate: Wed May 6 21:29:23 2026 -0700
[improve][metadata] Add streaming scanChildren to MetadataStore (#25701)
---
.../apache/pulsar/metadata/api/MetadataStore.java | 27 ++++
.../apache/pulsar/metadata/api/ScanConsumer.java | 50 +++++++
.../metadata/impl/AbstractMetadataStore.java | 51 +++++++
.../pulsar/metadata/impl/DualMetadataStore.java | 11 ++
.../metadata/impl/FaultInjectionMetadataStore.java | 13 ++
.../metadata/impl/LocalMemoryMetadataStore.java | 34 +++++
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 80 +++++++++++
.../metadata/impl/oxia/OxiaMetadataStore.java | 36 +++++
.../pulsar/metadata/BaseMetadataStoreTest.java | 1 +
.../metadata/MetadataStoreScanChildrenTest.java | 149 +++++++++++++++++++++
10 files changed, 452 insertions(+)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
index 19e50ed6998..b2d90800943 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java
@@ -287,6 +287,33 @@ public interface MetadataStore extends AutoCloseable {
new MetadataStoreException("Secondary index queries not
supported by this store"));
}
+ /**
+ * Stream all direct children of {@code parentPath} together with their
values.
+ *
+ * <p>This is the value-bearing counterpart to {@link #getChildren} — same
semantics for
+ * what counts as a "child" (one hierarchical level below {@code
parentPath}, no
+ * descendants), but each record carries the value and {@link Stat}
alongside the path.
+ * Results are delivered to {@code consumer} as they become available so
callers don't
+ * have to materialize a potentially-large list in memory.
+ *
+ * <p>The consumer's {@link ScanConsumer#onNext} is invoked for each
child, then either
+ * {@link ScanConsumer#onCompleted} (success) or {@link
ScanConsumer#onError} (failure)
+ * exactly once. The returned future completes when the scan terminates
and mirrors the
+ * terminal callback — callers may rely on either.
+ *
+ * <p>Backends with a native range-scan primitive (Oxia, RocksDB,
in-memory NavigableMap)
+ * issue a single store-side scan. Other backends fall back to {@link
#getChildren} +
+ * sequential {@link #get}, at the cost of one extra round trip per child.
+ *
+ * @param parentPath path whose direct children should be streamed
+ * @param consumer callback that receives records, completion, or an
error
+ * @return a future that completes when the scan terminates
+ */
+ default CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ return CompletableFuture.failedFuture(
+ new MetadataStoreException("scanChildren not supported by this
store"));
+ }
+
/**
* Returns the default metadata cache config.
*
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
new file mode 100644
index 00000000000..24ce6c15ac5
--- /dev/null
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/ScanConsumer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.api;
+
+/**
+ * Streaming consumer for {@link MetadataStore#scanChildren} results.
+ *
+ * <p>The store invokes {@link #onNext} for each child record (in key order),
and then either
+ * {@link #onCompleted} (success) or {@link #onError} (failure) exactly once.
Implementations must
+ * be safe to invoke from a metadata-store internal thread; back-pressure is
the consumer's
+ * responsibility (long blocking work in {@code onNext} can stall the scan).
+ */
+public interface ScanConsumer {
+
+ /**
+ * Called once per record. The result's {@link Stat#getPath()} carries the
full key.
+ *
+ * @param result a child record under the requested parent path
+ */
+ void onNext(GetResult result);
+
+ /**
+ * Called at most once when the scan fails. After this call no further
callbacks are made.
+ *
+ * @param throwable the cause of the failure
+ */
+ void onError(Throwable throwable);
+
+ /**
+ * Called at most once when the scan finishes without error. After this
call no further
+ * callbacks are made.
+ */
+ void onCompleted();
+}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 6442ece216e..02086898e9f 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -69,6 +70,7 @@ import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -529,6 +531,55 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
return storeFindByIndex(scanPathPrefix, indexName, secondaryKey,
fallbackFilter);
}
+ @Override
+ public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ if (isClosed()) {
+ CompletableFuture<Void> failed = alreadyClosedFailedFuture();
+ failed.whenComplete((__, ex) -> {
+ if (ex != null) {
+ consumer.onError(ex);
+ }
+ });
+ return failed;
+ }
+ if (parentPath == null) {
+ MetadataStoreException ex = new MetadataStoreException("parentPath
must be non-null");
+ consumer.onError(ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ return storeScanChildren(parentPath, consumer);
+ }
+
+ /**
+ * Backend hook for {@link #scanChildren}. The default implementation
lists the parent's
+ * children with {@link #getChildrenFromStore} and fetches each value
sequentially with
+ * {@link #storeGet}. Backends with a native range-scan primitive (Oxia,
RocksDB,
+ * in-memory NavigableMap) override this method for a single store-side
scan.
+ */
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ getChildrenFromStore(parentPath).thenCompose(children -> {
+ CompletableFuture<Void> chain =
CompletableFuture.completedFuture(null);
+ for (String child : children) {
+ String childPath = parentPath.equals("/") ? "/" + child :
parentPath + "/" + child;
+ chain = chain.thenCompose(__ -> storeGet(childPath))
+ .thenAccept(opt -> opt.ifPresent(consumer::onNext));
+ }
+ return chain;
+ }).whenComplete((v, ex) -> {
+ if (ex != null) {
+ Throwable cause = ex instanceof CompletionException &&
ex.getCause() != null
+ ? ex.getCause() : ex;
+ consumer.onError(cause);
+ result.completeExceptionally(cause);
+ } else {
+ consumer.onCompleted();
+ result.complete(null);
+ }
+ });
+ return result;
+ }
+
protected CompletableFuture<List<GetResult>> storeFindByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
Predicate<GetResult> fallbackFilter) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 70965887012..bcc6b7226da 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -309,6 +310,16 @@ public class DualMetadataStore implements
MetadataStoreExtended {
};
}
+ @Override
+ public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ return switch (migrationState.getPhase()) {
+ case NOT_STARTED, PREPARATION, COPYING, FAILED ->
+ sourceStore.scanChildren(parentPath, consumer);
+ case COMPLETED ->
+ targetStore.scanChildren(parentPath, consumer);
+ };
+ }
+
@Override
public CompletableFuture<Stat> put(String path, byte[] value,
Optional<Long> expectedVersion,
EnumSet<CreateOption> options,
Map<String, String> secondaryIndexes) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index f409115ed9c..de988b6b1c8 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -60,6 +61,7 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
EXISTS,
PUT,
DELETE,
+ SCAN_CHILDREN,
}
@Data
@@ -155,6 +157,17 @@ public class FaultInjectionMetadataStore implements
MetadataStoreExtended {
return store.deleteRecursive(path);
}
+ @Override
+ public CompletableFuture<Void> scanChildren(String parentPath,
ScanConsumer consumer) {
+ Optional<MetadataStoreException> ex =
programmedFailure(OperationType.SCAN_CHILDREN, parentPath);
+ if (ex.isPresent()) {
+ consumer.onError(ex.get());
+ return FutureUtil.failedFuture(ex.get());
+ }
+
+ return store.scanChildren(parentPath, consumer);
+ }
+
@Override
public void registerListener(Consumer<Notification> listener) {
store.registerListener(listener);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 4679d410edd..7c7bd31e29b 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -45,6 +45,7 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -121,6 +122,39 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
}
}
+ @Override
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ // Snapshot the immediate children under the lock, then dispatch
outside it so a slow
+ // consumer can't stall other store operations.
+ List<GetResult> snapshot = new ArrayList<>();
+ synchronized (map) {
+ String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
+ String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
+ map.subMap(firstKey, false, lastKey, false).forEach((key, value)
-> {
+ // Filter to direct children only — paths with no further "/"
beyond the
+ // parent's level. Same scoping `getChildrenFromStore` applies.
+ int relStart = firstKey.length();
+ if (key.indexOf('/', relStart) >= 0) {
+ return;
+ }
+ snapshot.add(new GetResult(
+ value.data,
+ new Stat(key, value.version, value.createdTimestamp,
value.modifiedTimestamp,
+ value.isEphemeral(), true)));
+ });
+ }
+ try {
+ for (GetResult r : snapshot) {
+ consumer.onNext(r);
+ }
+ consumer.onCompleted();
+ return CompletableFuture.completedFuture(null);
+ } catch (Throwable t) {
+ consumer.onError(t);
+ return FutureUtil.failedFuture(t);
+ }
+ }
+
@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (!isValidPath(path)) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 4ad7e3de2fd..3a7d7abe5d2 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.rocksdb.ColumnFamilyDescriptor;
@@ -406,6 +407,85 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
}
}
+ @Override
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ // Native iterator-based scan over the parent's key range, with the
same direct-child
+ // filter getChildrenFromStore applies. Snapshot under the read lock
then dispatch
+ // outside it.
+ List<GetResult> snapshot = new ArrayList<>();
+ try {
+ dbStateLock.readLock().lock();
+ if (isClosed()) {
+ CompletableFuture<Void> failed = alreadyClosedFailedFuture();
+ failed.whenComplete((__, ex) -> {
+ if (ex != null) {
+ consumer.onError(ex);
+ }
+ });
+ return failed;
+ }
+ String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
+ String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
+ byte[] endBytes = toBytes(lastKey);
+ try (RocksIterator iterator = db.newIterator(optionDontCache)) {
+ for (iterator.seek(toBytes(firstKey)); iterator.isValid();
iterator.next()) {
+ byte[] keyBytes = iterator.key();
+ if (compareUnsigned(keyBytes, endBytes) >= 0) {
+ break;
+ }
+ String currentPath = toString(keyBytes);
+ // Direct children only.
+ if (currentPath.indexOf('/', firstKey.length()) >= 0) {
+ continue;
+ }
+ byte[] value = iterator.value();
+ if (value == null) {
+ continue;
+ }
+ MetaValue metaValue = MetaValue.parse(value);
+ if (metaValue.ephemeral && metaValue.owner != instanceId) {
+ // Ephemeral record left behind by a different
session; skip.
+ continue;
+ }
+ snapshot.add(new GetResult(metaValue.getData(),
+ new Stat(currentPath,
+ metaValue.getVersion(),
+ metaValue.getCreatedTimestamp(),
+ metaValue.getModifiedTimestamp(),
+ metaValue.ephemeral,
+ metaValue.getOwner() == instanceId)));
+ }
+ }
+ } catch (Throwable e) {
+ MetadataStoreException ex = MetadataStoreException.wrap(e);
+ consumer.onError(ex);
+ return FutureUtil.failedFuture(ex);
+ } finally {
+ dbStateLock.readLock().unlock();
+ }
+ try {
+ for (GetResult r : snapshot) {
+ consumer.onNext(r);
+ }
+ consumer.onCompleted();
+ return CompletableFuture.completedFuture(null);
+ } catch (Throwable t) {
+ consumer.onError(t);
+ return FutureUtil.failedFuture(t);
+ }
+ }
+
+ private static int compareUnsigned(byte[] a, byte[] b) {
+ int len = Math.min(a.length, b.length);
+ for (int i = 0; i < len; i++) {
+ int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
+ if (diff != 0) {
+ return diff;
+ }
+ }
+ return a.length - b.length;
+ }
+
@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
log.debug().attr("path", path).attr("instanceId",
instanceId).log("getChildrenFromStore");
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 13cefa12623..21bbba615d4 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -51,6 +51,7 @@ import
org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.ScanConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -216,6 +217,41 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
return doStorePut(path, data, optExpectedVersion, options,
secondaryIndexes);
}
+ @Override
+ protected CompletableFuture<Void> storeScanChildren(String parentPath,
ScanConsumer consumer) {
+ // Oxia's hierarchical sort makes [parentPath + "/", parentPath +
"//") the canonical
+ // range covering exactly the immediate children — same convention
getChildrenFromStore
+ // uses with `client.list(...)`.
+ String firstKey = parentPath.endsWith("/") ? parentPath : parentPath +
"/";
+ String lastKey = firstKey + "/";
+ CompletableFuture<Void> done = new CompletableFuture<>();
+ try {
+ client.rangeScan(firstKey, lastKey, new
io.oxia.client.api.RangeScanConsumer() {
+ @Override
+ public void onNext(io.oxia.client.api.GetResult result) {
+ consumer.onNext(new GetResult(result.value(),
+ convertStat(result.key(), result.version())));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ consumer.onError(throwable);
+ done.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ consumer.onCompleted();
+ done.complete(null);
+ }
+ });
+ } catch (Throwable t) {
+ consumer.onError(t);
+ done.completeExceptionally(t);
+ }
+ return done;
+ }
+
@Override
protected CompletableFuture<List<GetResult>> storeFindByIndex(
String scanPathPrefix, String indexName, String secondaryKey,
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index d55ad072dff..fee0955744c 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -146,6 +146,7 @@ public abstract class BaseMetadataStoreTest extends
TestRetrySupport {
return filterImplementations("ZooKeeper", "MockZooKeeper");
}
+
protected Object[][] filterImplementations(String... providers) {
Set<String> providersSet = Set.of(providers);
return Arrays.stream(allImplementations())
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
new file mode 100644
index 00000000000..37eea03713c
--- /dev/null
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreScanChildrenTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import lombok.Cleanup;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.testng.annotations.Test;
+
+public class MetadataStoreScanChildrenTest extends BaseMetadataStoreTest {
+
+ /** Records all callbacks for assertion in tests. */
+ private static final class CollectingConsumer implements ScanConsumer {
+ final List<GetResult> records = new ArrayList<>();
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch done = new CountDownLatch(1);
+
+ @Override
+ public void onNext(GetResult result) {
+ records.add(result);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ error.set(throwable);
+ done.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ done.countDown();
+ }
+
+ void awaitDone() throws InterruptedException {
+ assertTrue(done.await(30, TimeUnit.SECONDS), "scan did not
terminate within 30s");
+ }
+ }
+
+ @Test(dataProvider = "impl")
+ public void streamsAllChildrenWithValues(String provider, Supplier<String>
urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(
+ urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String parent = newKey();
+ Set<String> expectedNames = Set.of("a", "b", "c");
+ for (String name : expectedNames) {
+ store.put(parent + "/" + name,
name.getBytes(StandardCharsets.UTF_8), Optional.of(-1L)).join();
+ }
+
+ CollectingConsumer consumer = new CollectingConsumer();
+ store.scanChildren(parent, consumer).join();
+ consumer.awaitDone();
+
+ assertEquals(consumer.records.size(), 3);
+ Set<String> seenPaths = new HashSet<>();
+ Set<String> seenValues = new HashSet<>();
+ for (GetResult r : consumer.records) {
+ seenPaths.add(r.getStat().getPath());
+ seenValues.add(new String(r.getValue(), StandardCharsets.UTF_8));
+ }
+ assertEquals(seenPaths, Set.of(parent + "/a", parent + "/b", parent +
"/c"));
+ assertEquals(seenValues, expectedNames);
+ }
+
+ @Test(dataProvider = "impl")
+ public void parentWithNoChildrenCompletesEmpty(String provider,
Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(
+ urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String parent = newKey();
+
+ CollectingConsumer consumer = new CollectingConsumer();
+ store.scanChildren(parent, consumer).join();
+ consumer.awaitDone();
+
+ assertEquals(consumer.records.size(), 0);
+ }
+
+ @Test(dataProvider = "impl")
+ public void doesNotEmitDescendantsBeyondImmediateChildren(String provider,
Supplier<String> urlSupplier)
+ throws Exception {
+ // scanChildren is hierarchy-aware: deeper paths (children of
children) are NOT emitted.
+ @Cleanup
+ MetadataStoreExtended store = MetadataStoreExtended.create(
+ urlSupplier.get(), MetadataStoreConfig.builder().build());
+
+ String parent = newKey();
+ String child = parent + "/child";
+ String grandchild = child + "/inner";
+
+ store.put(child, "C".getBytes(StandardCharsets.UTF_8),
Optional.of(-1L)).join();
+ store.put(grandchild, "G".getBytes(StandardCharsets.UTF_8),
Optional.of(-1L)).join();
+
+ CollectingConsumer consumer = new CollectingConsumer();
+ store.scanChildren(parent, consumer).join();
+ consumer.awaitDone();
+
+ assertEquals(consumer.records.size(), 1);
+ assertEquals(consumer.records.get(0).getStat().getPath(), child);
+ }
+
+ @Test(dataProvider = "impl")
+ public void closedStoreRejectsScan(String provider, Supplier<String>
urlSupplier) throws Exception {
+ MetadataStoreExtended store = MetadataStoreExtended.create(
+ urlSupplier.get(), MetadataStoreConfig.builder().build());
+ store.close();
+
+ CollectingConsumer consumer = new CollectingConsumer();
+ CompletionException ex = expectThrows(CompletionException.class,
+ () -> store.scanChildren("/anything", consumer).join());
+ assertTrue(ex.getCause() instanceof
MetadataStoreException.AlreadyClosedException,
+ "expected AlreadyClosedException, got: " + ex.getCause());
+ }
+}