This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a83e9ed7032 [feat] PIP-335: Add support Oxia as a metadata store 
(#22007)
a83e9ed7032 is described below

commit a83e9ed70323b80319545326716340ce76d690eb
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Feb 7 14:10:33 2024 -0800

    [feat] PIP-335: Add support Oxia as a metadata store (#22007)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +
 pom.xml                                            |  13 +
 pulsar-broker/pom.xml                              |   6 +
 pulsar-metadata/pom.xml                            |  12 +
 .../metadata/impl/MetadataStoreFactoryImpl.java    |   3 +
 .../metadata/impl/oxia/OxiaMetadataStore.java      | 283 +++++++++++++++++++++
 .../impl/oxia/OxiaMetadataStoreProvider.java       |  75 ++++++
 .../pulsar/metadata/impl/oxia/package-info.java    |  19 ++
 .../pulsar/metadata/BaseMetadataStoreTest.java     |  18 ++
 .../apache/pulsar/metadata/MetadataBenchmark.java  |   2 +-
 .../apache/pulsar/metadata/MetadataStoreTest.java  |  30 ++-
 11 files changed, 450 insertions(+), 13 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 0bf0fee823c..e3941c54a74 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -465,6 +465,8 @@ The Apache Software License, Version 2.0
     - io.dropwizard.metrics-metrics-jmx-4.1.12.1.jar
   * Prometheus
     - io.prometheus-simpleclient_httpserver-0.16.0.jar
+  * Oxia
+    - io.streamnative.oxia-oxia-client-0.1.0-shaded.jar
   * Java JSON WebTokens
     - io.jsonwebtoken-jjwt-api-0.11.1.jar
     - io.jsonwebtoken-jjwt-impl-0.11.1.jar
diff --git a/pom.xml b/pom.xml
index 0c216c9dab8..4dfeb30821a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -242,6 +242,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <apache-http-client.version>4.5.13</apache-http-client.version>
     <apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
     <jetcd.version>0.7.5</jetcd.version>
+    <oxia.version>0.1.0</oxia.version>
     <snakeyaml.version>2.0</snakeyaml.version>
     <ant.version>1.10.12</ant.version>
     <seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
@@ -1152,6 +1153,18 @@ flexible messaging model and an intuitive client 
API.</description>
         <version>${sketches.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>io.streamnative.oxia</groupId>
+        <artifactId>oxia-client</artifactId>
+        <version>${oxia.version}</version>
+        <classifier>shaded</classifier>
+      </dependency>
+      <dependency>
+        <groupId>io.streamnative.oxia</groupId>
+        <artifactId>oxia-testcontainers</artifactId>
+        <version>${oxia.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>com.amazonaws</groupId>
         <artifactId>aws-java-sdk-bom</artifactId>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 9dd319f7911..c39de184b05 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -164,6 +164,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.streamnative.oxia</groupId>
+      <artifactId>oxia-testcontainers</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <!-- zookeeper server -->
     <dependency>
        <groupId>io.dropwizard.metrics</groupId>
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index d232a1f5c00..8600d0ea191 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -62,6 +62,18 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>io.streamnative.oxia</groupId>
+      <artifactId>oxia-client</artifactId>
+      <classifier>shaded</classifier>
+    </dependency>
+
+    <dependency>
+      <groupId>io.streamnative.oxia</groupId>
+      <artifactId>oxia-testcontainers</artifactId>
+      <scope>test</scope>
+    </dependency>
+
     <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
index dd4df69fc43..cb7bea718e4 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.metadata.impl.EtcdMetadataStore.ETCD_SCHEME_IDEN
 import static 
org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER;
 import static 
org.apache.pulsar.metadata.impl.RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER;
 import static 
org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;
+import static 
org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStoreProvider.OXIA_SCHEME_IDENTIFIER;
 import com.google.common.base.Splitter;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,6 +32,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreProvider;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStoreProvider;
 
 @Slf4j
 public class MetadataStoreFactoryImpl {
@@ -66,6 +68,7 @@ public class MetadataStoreFactoryImpl {
         providers.put(MEMORY_SCHEME_IDENTIFIER, new 
MemoryMetadataStoreProvider());
         providers.put(ROCKSDB_SCHEME_IDENTIFIER, new 
RocksdbMetadataStoreProvider());
         providers.put(ETCD_SCHEME_IDENTIFIER, new EtcdMetadataStoreProvider());
+        providers.put(OXIA_SCHEME_IDENTIFIER, new OxiaMetadataStoreProvider());
         providers.put(ZK_SCHEME_IDENTIFIER, new ZkMetadataStoreProvider());
 
         String factoryClasses = 
System.getProperty(METADATASTORE_PROVIDERS_PROPERTY, "");
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
new file mode 100644
index 00000000000..2ab744e2053
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.oxia;
+
+import io.streamnative.oxia.client.OxiaClientBuilder;
+import io.streamnative.oxia.client.api.AsyncOxiaClient;
+import io.streamnative.oxia.client.api.DeleteOption;
+import io.streamnative.oxia.client.api.KeyAlreadyExistsException;
+import io.streamnative.oxia.client.api.Notification;
+import io.streamnative.oxia.client.api.PutOption;
+import io.streamnative.oxia.client.api.PutResult;
+import io.streamnative.oxia.client.api.UnexpectedVersionIdException;
+import io.streamnative.oxia.client.api.Version;
+import java.time.Duration;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
+
+@Slf4j
+public class OxiaMetadataStore extends AbstractMetadataStore {
+
+    private final AsyncOxiaClient client;
+
+    private final String identity;
+    private final Optional<MetadataEventSynchronizer> synchronizer;
+
+    OxiaMetadataStore(
+            @NonNull String serviceAddress,
+            @NonNull String namespace,
+            @NonNull MetadataStoreConfig metadataStoreConfig,
+            boolean enableSessionWatcher)
+            throws Exception {
+        super("oxia-metadata");
+
+        var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
+        if (!metadataStoreConfig.isBatchingEnabled()) {
+            linger = 0;
+        }
+        this.synchronizer = 
Optional.ofNullable(metadataStoreConfig.getSynchronizer());
+        identity = UUID.randomUUID().toString();
+        client =
+                new OxiaClientBuilder(serviceAddress)
+                        .clientIdentifier(identity)
+                        .namespace(namespace)
+                        
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
+                        .batchLinger(Duration.ofMillis(linger))
+                        
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations())
+                        .asyncClient()
+                        .get();
+        client.notifications(this::notificationCallback);
+        
super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer()));
+    }
+
+    private void notificationCallback(Notification notification) {
+        if (notification instanceof Notification.KeyCreated keyCreated) {
+            receivedNotification(
+                    new org.apache.pulsar.metadata.api.Notification(
+                            NotificationType.Created, keyCreated.key()));
+            notifyParentChildrenChanged(keyCreated.key());
+
+        } else if (notification instanceof Notification.KeyModified 
keyModified) {
+            receivedNotification(
+                    new org.apache.pulsar.metadata.api.Notification(
+                            NotificationType.Modified, keyModified.key()));
+        } else if (notification instanceof Notification.KeyDeleted keyDeleted) 
{
+            receivedNotification(
+                    new org.apache.pulsar.metadata.api.Notification(
+                            NotificationType.Deleted, keyDeleted.key()));
+            notifyParentChildrenChanged(keyDeleted.key());
+        } else {
+            log.error("Unknown notification type {}", notification);
+        }
+    }
+
+    Optional<GetResult> convertGetResult(
+            String path, io.streamnative.oxia.client.api.GetResult result) {
+        if (result == null) {
+            return Optional.empty();
+        }
+        return Optional.of(result)
+                .map(
+                        oxiaResult ->
+                                new GetResult(oxiaResult.getValue(), 
convertStat(path, oxiaResult.getVersion())));
+    }
+
+    Stat convertStat(String path, Version version) {
+        return new Stat(
+                path,
+                version.versionId(),
+                version.createdTimestamp(),
+                version.modifiedTimestamp(),
+                version.sessionId().isPresent(),
+                version.clientIdentifier().stream().anyMatch(identity::equals),
+                version.modificationsCount() == 0);
+    }
+
+    @Override
+    protected CompletableFuture<List<String>> getChildrenFromStore(String 
path) {
+        var pathWithSlash = path + "/";
+
+        return client
+                .list(pathWithSlash, pathWithSlash + "/")
+                .thenApply(
+                        children ->
+                                children.stream().map(child -> 
child.substring(pathWithSlash.length())).toList())
+                .exceptionallyCompose(this::convertException);
+    }
+
+    @Override
+    protected CompletableFuture<Boolean> existsFromStore(String path) {
+        return client.get(path).thenApply(Objects::nonNull)
+                .exceptionallyCompose(this::convertException);
+    }
+
+    @Override
+    protected CompletableFuture<Optional<GetResult>> storeGet(String path) {
+        return client.get(path).thenApply(res -> convertGetResult(path, res))
+                .exceptionallyCompose(this::convertException);
+    }
+
+    @Override
+    protected CompletableFuture<Void> storeDelete(String path, Optional<Long> 
expectedVersion) {
+        return getChildrenFromStore(path)
+                .thenCompose(
+                        children -> {
+                            if (children.size() > 0) {
+                                return CompletableFuture.failedFuture(
+                                        new MetadataStoreException("Key '" + 
path + "' has children"));
+                            } else {
+                                var delOption =
+                                        expectedVersion
+                                                
.map(DeleteOption::ifVersionIdEquals)
+                                                
.orElse(DeleteOption.Unconditionally);
+                                CompletableFuture<Boolean> result = 
client.delete(path, delOption);
+                                return result
+                                        .thenCompose(
+                                                exists -> {
+                                                    if (!exists) {
+                                                        return 
CompletableFuture.failedFuture(
+                                                                new 
MetadataStoreException.NotFoundException(
+                                                                        "Key 
'" + path + "' does not exist"));
+                                                    }
+                                                    return 
CompletableFuture.completedFuture((Void) null);
+                                                })
+                                        
.exceptionallyCompose(this::convertException);
+                            }
+                        });
+    }
+
+    @Override
+    protected CompletableFuture<Stat> storePut(
+            String path, byte[] data, Optional<Long> optExpectedVersion, 
EnumSet<CreateOption> options) {
+        CompletableFuture<Void> parentsCreated = createParents(path);
+        return parentsCreated.thenCompose(
+                __ -> {
+                    var expectedVersion = optExpectedVersion;
+                    if (expectedVersion.isPresent()
+                            && expectedVersion.get() != -1L
+                            && options.contains(CreateOption.Sequential)) {
+                        return CompletableFuture.failedFuture(
+                                new MetadataStoreException(
+                                        "Can't have expectedVersion and 
Sequential at the same time"));
+                    }
+                    CompletableFuture<String> actualPath;
+                    if (options.contains(CreateOption.Sequential)) {
+                        var parent = parent(path);
+                        var parentPath = parent == null ? "/" : parent;
+
+                        actualPath =
+                                client
+                                        .put(parentPath, new byte[] {})
+                                        .thenApply(
+                                                r -> String.format("%s%010d", 
path, r.version().modificationsCount()));
+                        expectedVersion = Optional.of(-1L);
+                    } else {
+                        actualPath = CompletableFuture.completedFuture(path);
+                    }
+                    var versionCondition =
+                            expectedVersion
+                                    .map(
+                                            ver -> {
+                                                if (ver == -1) {
+                                                    return 
PutOption.IfRecordDoesNotExist;
+                                                }
+                                                return 
PutOption.ifVersionIdEquals(ver);
+                                            })
+                                    .orElse(PutOption.Unconditionally);
+                    var putOptions =
+                            options.contains(CreateOption.Ephemeral)
+                                    ? new PutOption[] 
{PutOption.AsEphemeralRecord, versionCondition}
+                                    : new PutOption[] {versionCondition};
+                    return actualPath
+                            .thenCompose(
+                                    aPath ->
+                                            client
+                                                    .put(aPath, data, 
putOptions)
+                                                    .thenApply(res -> new 
PathWithPutResult(aPath, res)))
+                            .thenApply(res -> convertStat(res.path(), 
res.result().version()))
+                            .exceptionallyCompose(this::convertException);
+                });
+    }
+
+    private <T> CompletionStage<T> convertException(Throwable ex) {
+        if (ex.getCause() instanceof UnexpectedVersionIdException
+                || ex.getCause() instanceof KeyAlreadyExistsException) {
+            return CompletableFuture.failedFuture(
+                    new 
MetadataStoreException.BadVersionException(ex.getCause()));
+        } else if (ex.getCause() instanceof IllegalStateException) {
+            return CompletableFuture.failedFuture(new 
MetadataStoreException.AlreadyClosedException(ex.getCause()));
+        } else {
+            return CompletableFuture.failedFuture(ex.getCause());
+        }
+    }
+
+    private CompletableFuture<Void> createParents(String path) {
+        var parent = parent(path);
+        if (parent == null || parent.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return exists(parent)
+                .thenCompose(
+                        exists -> {
+                            if (exists) {
+                                return CompletableFuture.completedFuture(null);
+                            } else {
+                                return client
+                                        .put(parent, new byte[] {}, 
PutOption.IfRecordDoesNotExist)
+                                        .thenCompose(__ -> 
createParents(parent));
+                            }
+                        })
+                .exceptionallyCompose(
+                        ex -> {
+                            if (ex.getCause() instanceof 
KeyAlreadyExistsException) {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                            return 
CompletableFuture.failedFuture(ex.getCause());
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (client != null) {
+            client.close();
+        }
+        super.close();
+    }
+
+    public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
+        return synchronizer;
+    }
+
+    private record PathWithPutResult(String path, PutResult result) {}
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java
new file mode 100644
index 00000000000..a4c52134a8a
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.oxia;
+
+import lombok.NonNull;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+
+public class OxiaMetadataStoreProvider implements MetadataStoreProvider {
+    // declare the specific namespace to avoid any changes in the future.
+    public static final String DefaultNamespace = "default";
+
+    public static final  String OXIA_SCHEME = "oxia";
+    public static final String OXIA_SCHEME_IDENTIFIER = OXIA_SCHEME + ":";
+
+    @Override
+    public String urlScheme() {
+        return OXIA_SCHEME;
+    }
+
+    @Override
+    public @NonNull MetadataStore create(
+            String metadataURL, MetadataStoreConfig metadataStoreConfig, 
boolean enableSessionWatcher)
+            throws MetadataStoreException {
+        var serviceAddress = getServiceAddressAndNamespace(metadataURL);
+        try {
+            return new OxiaMetadataStore(
+                    serviceAddress.getLeft(),
+                    serviceAddress.getRight(),
+                    metadataStoreConfig,
+                    enableSessionWatcher);
+        } catch (Exception e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    @NonNull
+    Pair<String, String> getServiceAddressAndNamespace(String metadataURL)
+            throws MetadataStoreException {
+        if (metadataURL == null || !metadataURL.startsWith(urlScheme() + 
"://")) {
+            throw new MetadataStoreException("Invalid metadata URL. Must start 
with 'oxia://'.");
+        }
+        final var addressWithNamespace = 
metadataURL.substring("oxia://".length());
+        final var split = addressWithNamespace.split("/");
+        if (split.length > 2) {
+            throw new MetadataStoreException(
+                    "Invalid metadata URL."
+                            + " the oxia metadata format should be 
'oxia://host:port/[namespace]'.");
+        }
+        if (split.length == 1) {
+            // Use default namespace
+            return Pair.of(split[0], DefaultNamespace);
+        }
+        return Pair.of(split[0], split[1]);
+    }
+}
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java
new file mode 100644
index 00000000000..d63afa5b0a8
--- /dev/null
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.oxia;
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index 411ee038c48..491e3d0b964 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import io.etcd.jetcd.launcher.EtcdCluster;
 import io.etcd.jetcd.test.EtcdClusterExtension;
+import io.streamnative.oxia.testcontainers.OxiaContainer;
 import java.io.File;
 import java.net.URI;
 import java.util.UUID;
@@ -39,6 +40,8 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
     protected TestZKServer zks;
     protected EtcdCluster etcdCluster;
 
+    protected OxiaContainer oxiaServer;
+
     @BeforeClass(alwaysRun = true)
     @Override
     public void setup() throws Exception {
@@ -59,6 +62,11 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
             etcdCluster.close();
             etcdCluster = null;
         }
+
+        if (oxiaServer != null) {
+            oxiaServer.close();
+            oxiaServer = null;
+        }
     }
 
     private static String createTempFolder() {
@@ -79,6 +87,7 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
                 {"Memory", stringSupplier(() -> "memory:" + 
UUID.randomUUID())},
                 {"RocksDB", stringSupplier(() -> "rocksdb:" + 
createTempFolder())},
                 {"Etcd", stringSupplier(() -> "etcd:" + 
getEtcdClusterConnectString())},
+                {"Oxia", stringSupplier(() -> "oxia://" + 
getOxiaServerConnectString())},
         };
     }
 
@@ -87,9 +96,18 @@ public abstract class BaseMetadataStoreTest extends 
TestRetrySupport {
         return new Object[][]{
                 {"ZooKeeper", stringSupplier(() -> zks.getConnectionString())},
                 {"Etcd", stringSupplier(() -> "etcd:" + 
getEtcdClusterConnectString())},
+                {"Oxia", stringSupplier(() -> "oxia://" + 
getOxiaServerConnectString())},
         };
     }
 
+    private synchronized String getOxiaServerConnectString() {
+        if (oxiaServer == null) {
+            oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME);
+            oxiaServer.start();
+        }
+        return oxiaServer.getServiceAddress();
+    }
+
     private synchronized String getEtcdClusterConnectString() {
         if (etcdCluster == null) {
             etcdCluster = 
EtcdClusterExtension.builder().withClusterName("test").withNodes(1).withSsl(false).build()
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
index 227c0e2c9dc..b3b95ddc580 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.testng.annotations.Test;
 
 @Slf4j
-public class MetadataBenchmark extends MetadataStoreTest {
+public class MetadataBenchmark extends BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl", enabled = false)
     public void testGet(String provider, Supplier<String> urlSupplier) throws 
Exception {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 244ed025e3e..b1578188c68 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -62,27 +62,28 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl")
     public void emptyStoreTest(String provider, Supplier<String> urlSupplier) 
throws Exception {
+        String prefix = newKey();
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
                 MetadataStoreConfig.builder().fsyncEnable(false).build());
 
-        assertFalse(store.exists("/non-existing-key").join());
-        assertFalse(store.exists("/non-existing-key/child").join());
-        assertFalse(store.get("/non-existing-key").join().isPresent());
-        assertFalse(store.get("/non-existing-key/child").join().isPresent());
+        assertFalse(store.exists(prefix + "/non-existing-key").join());
+        assertFalse(store.exists(prefix + "/non-existing-key/child").join());
+        assertFalse(store.get(prefix + 
"/non-existing-key").join().isPresent());
+        assertFalse(store.get(prefix + 
"/non-existing-key/child").join().isPresent());
 
-        assertEquals(store.getChildren("/non-existing-key").join(), 
Collections.emptyList());
-        assertEquals(store.getChildren("/non-existing-key/child").join(), 
Collections.emptyList());
+        assertEquals(store.getChildren(prefix + "/non-existing-key").join(), 
Collections.emptyList());
+        assertEquals(store.getChildren(prefix + 
"/non-existing-key/child").join(), Collections.emptyList());
 
         try {
-            store.delete("/non-existing-key", Optional.empty()).join();
+            store.delete(prefix + "/non-existing-key", 
Optional.empty()).join();
             fail("Should have failed");
         } catch (CompletionException e) {
             assertException(e, NotFoundException.class);
         }
 
         try {
-            store.delete("/non-existing-key", Optional.of(1L)).join();
+            store.delete(prefix + "/non-existing-key", Optional.of(1L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
             assertTrue(NotFoundException.class.isInstance(e.getCause()) || 
BadVersionException.class.isInstance(
@@ -400,6 +401,10 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl")
     public void testDeleteUnusedDirectories(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        if (provider.equals("Oxia")) {
+            return;
+        }
+
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(),
                 MetadataStoreConfig.builder().fsyncEnable(false).build());
@@ -710,10 +715,11 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         assertTrue(store1.exists(parent).get());
         assertFalse(store1.exists(parent + "/a").get());
         store2.put(parent + "/a", value, Optional.empty()).get();
-        assertTrue(store1.exists(parent + "/a").get());
+
+        Awaitility.await()
+                .untilAsserted(() -> assertTrue(store1.exists(parent + 
"/a").get()));
+
         // There is a chance watcher event is not triggered before the 
store1.exists() call.
-        Awaitility.await().atMost(3, TimeUnit.SECONDS)
-                .pollInterval(100, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> assertFalse(store1.exists(parent + 
"/b").get()));
+        assertFalse(store1.exists(parent + "/b").get());
     }
 }

Reply via email to