This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b75ca0e02c [improve] PIP-335: Pulsar with Oxia integration test 
(#22045)
2b75ca0e02c is described below

commit 2b75ca0e02c10262813de509b96f5678faffc934
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Feb 9 10:50:33 2024 -0800

    [improve] PIP-335: Pulsar with Oxia integration test (#22045)
---
 .../docker-images/latest-version-image/Dockerfile  |   2 +-
 .../latest-version-image/scripts/init-cluster.sh   |  38 ---
 .../latest-version-image/scripts/run-bookie.sh     |   1 -
 .../latest-version-image/scripts/run-broker.sh     |   1 -
 .../scripts/run-functions-worker.sh                |   1 -
 .../latest-version-image/scripts/run-proxy.sh      |   1 -
 .../latest-version-image/scripts/run-websocket.sh  |   1 -
 .../containers/PulsarInitMetadataContainer.java    |  76 +++++
 .../tests/integration/oxia/OxiaContainer.java      |  72 +++++
 .../tests/integration/oxia/OxiaSmokeTest.java      |  48 +++
 .../integration/topologies/PulsarCluster.java      | 330 ++++++++++++---------
 .../integration/topologies/PulsarClusterSpec.java  |   3 +
 .../src/test/resources/pulsar-messaging.xml        |   2 +
 13 files changed, 397 insertions(+), 179 deletions(-)

diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index 4973bec0441..f019af5c926 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -50,7 +50,7 @@ COPY conf/supervisord.conf /etc/supervisord.conf
 COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf 
conf/functions_worker.conf \
      conf/proxy.conf conf/websocket.conf /etc/supervisord/conf.d/
 
-COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
+COPY scripts/run-global-zk.sh scripts/run-local-zk.sh \
      scripts/run-bookie.sh scripts/run-broker.sh 
scripts/run-functions-worker.sh scripts/run-proxy.sh \
      scripts/run-standalone.sh scripts/run-websocket.sh \
      /pulsar/bin/
diff --git a/tests/docker-images/latest-version-image/scripts/init-cluster.sh 
b/tests/docker-images/latest-version-image/scripts/init-cluster.sh
deleted file mode 100755
index 926845d5a77..00000000000
--- a/tests/docker-images/latest-version-image/scripts/init-cluster.sh
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-set -x
-
-ZNODE="/initialized-$clusterName"
-
-bin/watch-znode.py -z $zkServers -p / -w
-
-bin/watch-znode.py -z $zkServers -p $ZNODE -e
-if [ $? != 0 ]; then
-    echo Initializing cluster
-    bin/apply-config-from-env.py conf/bookkeeper.conf &&
-        bin/pulsar initialize-cluster-metadata --cluster $clusterName 
--zookeeper $zkServers \
-                   --configuration-store $configurationStore --web-service-url 
http://$pulsarNode:8080/ \
-                   --broker-service-url pulsar://$pulsarNode:6650/ &&
-        bin/watch-znode.py -z $zkServers -p $ZNODE -c
-    echo Initialized
-else
-    echo Already Initialized
-fi
diff --git a/tests/docker-images/latest-version-image/scripts/run-bookie.sh 
b/tests/docker-images/latest-version-image/scripts/run-bookie.sh
index 64466eb2d9a..e454e667645 100755
--- a/tests/docker-images/latest-version-image/scripts/run-bookie.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-bookie.sh
@@ -29,6 +29,5 @@ if [ -z "$NO_AUTOSTART" ]; then
     sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/bookie.conf
 fi
 
-bin/watch-znode.py -z $zkServers -p /initialized-$clusterName -w
 exec /usr/bin/supervisord -c /etc/supervisord.conf
 
diff --git a/tests/docker-images/latest-version-image/scripts/run-broker.sh 
b/tests/docker-images/latest-version-image/scripts/run-broker.sh
index 6ed5d60c39e..4f89f145f2b 100755
--- a/tests/docker-images/latest-version-image/scripts/run-broker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-broker.sh
@@ -25,6 +25,5 @@ if [ -z "$NO_AUTOSTART" ]; then
     sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/broker.conf
 fi
 
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
 exec /usr/bin/supervisord -c /etc/supervisord.conf
 
diff --git 
a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh 
b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
index 3fadf960ee3..cd9d7593dbf 100755
--- a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
@@ -26,6 +26,5 @@ if [ -z "$NO_AUTOSTART" ]; then
     sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/functions_worker.conf
 fi
 
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
 exec /usr/bin/supervisord -c /etc/supervisord.conf
 
diff --git a/tests/docker-images/latest-version-image/scripts/run-proxy.sh 
b/tests/docker-images/latest-version-image/scripts/run-proxy.sh
index 4836a890bda..f44ed0bb658 100755
--- a/tests/docker-images/latest-version-image/scripts/run-proxy.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-proxy.sh
@@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then
     sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/proxy.conf
 fi
 
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
 exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/docker-images/latest-version-image/scripts/run-websocket.sh 
b/tests/docker-images/latest-version-image/scripts/run-websocket.sh
index a49ee111768..34e4b9016af 100755
--- a/tests/docker-images/latest-version-image/scripts/run-websocket.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-websocket.sh
@@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then
     sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/websocket.conf
 fi
 
-bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
 exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
new file mode 100644
index 00000000000..4251ed3bd57
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.tests.integration.containers;
+
+import java.io.IOException;
+import lombok.extern.slf4j.Slf4j;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+
+/**
+ * Initialize the Pulsar metadata
+ */
+@Slf4j
+public class PulsarInitMetadataContainer extends 
GenericContainer<PulsarInitMetadataContainer> {
+
+    public static final String NAME = "init-metadata";
+
+    private final String clusterName;
+    private final String metadataStoreUrl;
+    private final String configurationMetadataStoreUrl;
+    private final String brokerHostname;
+
+    public PulsarInitMetadataContainer(Network network,
+                                       String clusterName,
+                                       String metadataStoreUrl,
+                                       String configurationMetadataStoreUrl,
+                                       String brokerHostname) {
+        this.clusterName = clusterName;
+        this.metadataStoreUrl = metadataStoreUrl;
+        this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
+        this.brokerHostname = brokerHostname;
+        setDockerImageName(PulsarContainer.DEFAULT_IMAGE_NAME);
+        withNetwork(network);
+
+        setCommand("sleep 1000000");
+    }
+
+
+    public void initialize() throws Exception {
+        start();
+        ExecResult res = this.execInContainer(
+                "/pulsar/bin/pulsar", "initialize-cluster-metadata",
+                "--cluster", clusterName,
+                "--metadata-store", metadataStoreUrl,
+                "--configuration-metadata-store", 
configurationMetadataStoreUrl,
+                "--web-service-url", "http://"; + brokerHostname + ":8080/",
+                "--broker-service-url", "pulsar://" + brokerHostname + ":6650/"
+        );
+
+        if (res.getExitCode() == 0) {
+            log.info("Successfully initialized cluster");
+        } else {
+            log.warn("Failed to initialize Pulsar cluster. exit code: " + 
res.getExitCode());
+            log.warn("STDOUT: " + res.getStdout());
+            log.warn("STDERR: " + res.getStderr());
+            throw new IOException("Failed to initialized Pulsar Cluster");
+        }
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
new file mode 100644
index 00000000000..18d2dd77b7d
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.tests.integration.oxia;
+
+import java.time.Duration;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class OxiaContainer extends ChaosContainer<OxiaContainer> {
+
+    public static final String NAME = "oxia";
+
+    public static final int OXIA_PORT = 6648;
+    public static final int METRICS_PORT = 8080;
+    private static final int DEFAULT_SHARDS = 1;
+
+    private static final String DEFAULT_IMAGE_NAME = "streamnative/oxia:main";
+
+    public OxiaContainer(String clusterName) {
+        this(clusterName, DEFAULT_IMAGE_NAME, DEFAULT_SHARDS);
+    }
+
+    @SuppressWarnings("resource")
+    OxiaContainer(String clusterName, String imageName, int shards) {
+        super(clusterName, imageName);
+        if (shards <= 0) {
+            throw new IllegalArgumentException("shards must be greater than 
zero");
+        }
+        addExposedPorts(OXIA_PORT, METRICS_PORT);
+        this.withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName("oxia");
+            createContainerCmd.withName(getContainerName());
+        });
+        setCommand("oxia", "standalone",
+                "--shards=" + shards,
+                "--wal-sync-data=false");
+        waitingFor(
+                Wait.forHttp("/metrics")
+                        .forPort(METRICS_PORT)
+                        .forStatusCode(200)
+                        .withStartupTimeout(Duration.ofSeconds(30)));
+
+        PulsarContainer.configureLeaveContainerRunning(this);
+    }
+
+    public String getServiceAddress() {
+        return OxiaContainer.NAME + ":" + OXIA_PORT;
+    }
+
+    @Override
+    public String getContainerName() {
+        return clusterName + "-oxia";
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java
new file mode 100644
index 00000000000..d55c437b4f8
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.oxia;
+
+import java.util.function.Supplier;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.testng.annotations.Test;
+
+/**
+ * Test pulsar produce/consume semantics
+ */
+@Slf4j
+public class OxiaSmokeTest extends PulsarTestSuite {
+
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+            String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder 
specBuilder) {
+        specBuilder.enableOxia(true);
+        return specBuilder;
+    }
+
+    //
+    // Test Basic Publish & Consume Operations
+    //
+
+    @Test(dataProvider = "ServiceUrlAndTopics")
+    public void testPublishAndConsume(Supplier<String> serviceUrl, boolean 
isPersistent) throws Exception {
+        super.testPublishAndConsume(serviceUrl.get(), isPersistent);
+    }
+
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 88ff778732e..bc9b1e267b9 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Function;
+import lombok.Cleanup;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.IOUtils;
@@ -44,9 +45,11 @@ import 
org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.CSContainer;
 import org.apache.pulsar.tests.integration.containers.ProxyContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import 
org.apache.pulsar.tests.integration.containers.PulsarInitMetadataContainer;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ZKContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.oxia.OxiaContainer;
 import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -69,9 +72,12 @@ public class PulsarCluster {
      * @return the built pulsar cluster
      */
     public static PulsarCluster forSpec(PulsarClusterSpec spec) {
-        CSContainer csContainer = new CSContainer(spec.clusterName)
-                .withNetwork(Network.newNetwork())
-                .withNetworkAliases(CSContainer.NAME);
+        CSContainer csContainer = null;
+        if (!spec.enableOxia) {
+            csContainer = new CSContainer(spec.clusterName)
+                    .withNetwork(Network.newNetwork())
+                    .withNetworkAliases(CSContainer.NAME);
+        }
         return new PulsarCluster(spec, csContainer, false);
     }
 
@@ -86,6 +92,8 @@ public class PulsarCluster {
     private final String clusterName;
     private final Network network;
     private final ZKContainer zkContainer;
+
+    private final OxiaContainer oxiaContainer;
     private final CSContainer csContainer;
     private final boolean sharedCsContainer;
     private final Map<String, BKContainer> bookieContainers;
@@ -95,22 +103,44 @@ public class PulsarCluster {
     private Map<String, GenericContainer<?>> externalServices = 
Collections.emptyMap();
     private Map<String, Map<String, String>> externalServiceEnvs;
 
+    private final String metadataStoreUrl;
+    private final String configurationMetadataStoreUrl;
+
     private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, 
boolean sharedCsContainer) {
 
         this.spec = spec;
         this.sharedCsContainer = sharedCsContainer;
         this.clusterName = spec.clusterName();
-        this.network = csContainer.getNetwork();
-
-        this.zkContainer = new ZKContainer(clusterName);
-        this.zkContainer
-            .withNetwork(network)
-            .withNetworkAliases(appendClusterName(ZKContainer.NAME))
-            .withEnv("clusterName", clusterName)
-            .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
-            .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
-            .withEnv("forceSync", "no")
-            .withEnv("pulsarNode", appendClusterName("pulsar-broker-0"));
+        if (csContainer != null ) {
+            this.network = csContainer.getNetwork();
+        } else {
+            this.network = Network.newNetwork();
+        }
+
+
+
+        if (spec.enableOxia) {
+            this.zkContainer = null;
+            this.oxiaContainer = new OxiaContainer(clusterName);
+            this.oxiaContainer
+                    .withNetwork(network)
+                    .withNetworkAliases(appendClusterName(OxiaContainer.NAME));
+            metadataStoreUrl = "oxia://" + oxiaContainer.getServiceAddress();
+            configurationMetadataStoreUrl = metadataStoreUrl;
+        } else {
+            this.oxiaContainer = null;
+            this.zkContainer = new ZKContainer(clusterName);
+            this.zkContainer
+                    .withNetwork(network)
+                    .withNetworkAliases(appendClusterName(ZKContainer.NAME))
+                    .withEnv("clusterName", clusterName)
+                    .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
+                    .withEnv("configurationStore", CSContainer.NAME + ":" + 
CS_PORT)
+                    .withEnv("forceSync", "no")
+                    .withEnv("pulsarNode", 
appendClusterName("pulsar-broker-0"));
+            metadataStoreUrl = appendClusterName(ZKContainer.NAME);
+            configurationMetadataStoreUrl = CSContainer.NAME + ":" + CS_PORT;
+        }
 
         this.csContainer = csContainer;
 
@@ -121,27 +151,29 @@ public class PulsarCluster {
         this.proxyContainer = new 
ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, 
spec.enableTls)
                 .withNetwork(network)
                 .withNetworkAliases(appendClusterName("pulsar-proxy"))
-                .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
-                .withEnv("zookeeperServers", 
appendClusterName(ZKContainer.NAME))
-                .withEnv("configurationStoreServers", CSContainer.NAME + ":" + 
CS_PORT)
+                .withEnv("metadataStoreUrl", metadataStoreUrl)
+                .withEnv("configurationMetadataStoreUrl", 
configurationMetadataStoreUrl)
                 .withEnv("clusterName", clusterName);
-                // enable mTLS
+        // enable mTLS
         if (spec.enableTls) {
             proxyContainer
-                .withEnv("webServicePortTls", 
String.valueOf(BROKER_HTTPS_PORT))
-                .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
-                .withEnv("forwardAuthorizationCredentials", "true")
-                .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
-                .withEnv("tlsAllowInsecureConnection", "false")
-                .withEnv("tlsCertificateFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
-                .withEnv("tlsKeyFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
-                .withEnv("tlsTrustCertsFilePath", 
"/pulsar/certificate-authority/certs/ca.cert.pem")
-                .withEnv("brokerClientAuthenticationPlugin", 
AuthenticationTls.class.getName())
-                .withEnv("brokerClientAuthenticationParameters", 
String.format("tlsCertFile:%s,tlsKeyFile:%s", 
"/pulsar/certificate-authority/client-keys/admin.cert.pem", 
"/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
-                .withEnv("tlsEnabledWithBroker", "true")
-                .withEnv("brokerClientTrustCertsFilePath", 
"/pulsar/certificate-authority/certs/ca.cert.pem")
-                .withEnv("brokerClientCertificateFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
-                .withEnv("brokerClientKeyFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
+                    .withEnv("webServicePortTls", 
String.valueOf(BROKER_HTTPS_PORT))
+                    .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
+                    .withEnv("forwardAuthorizationCredentials", "true")
+                    .withEnv("tlsRequireTrustedClientCertOnConnect", "true")
+                    .withEnv("tlsAllowInsecureConnection", "false")
+                    .withEnv("tlsCertificateFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
+                    .withEnv("tlsKeyFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
+                    .withEnv("tlsTrustCertsFilePath", 
"/pulsar/certificate-authority/certs/ca.cert.pem")
+                    .withEnv("brokerClientAuthenticationPlugin", 
AuthenticationTls.class.getName())
+                    .withEnv("brokerClientAuthenticationParameters", 
String.format("tlsCertFile:%s,tlsKeyFile:%s",
+                            
"/pulsar/certificate-authority/client-keys/admin.cert.pem",
+                            
"/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
+                    .withEnv("tlsEnabledWithBroker", "true")
+                    .withEnv("brokerClientTrustCertsFilePath", 
"/pulsar/certificate-authority/certs/ca.cert.pem")
+                    .withEnv("brokerClientCertificateFilePath",
+                            
"/pulsar/certificate-authority/server-keys/proxy.cert.pem")
+                    .withEnv("brokerClientKeyFilePath", 
"/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
 
         }
         if (spec.proxyEnvs != null) {
@@ -157,7 +189,7 @@ public class PulsarCluster {
                     BKContainer bookieContainer = new BKContainer(clusterName, 
name)
                             .withNetwork(network)
                             .withNetworkAliases(appendClusterName(name))
-                            .withEnv("zkServers", 
appendClusterName(ZKContainer.NAME))
+                            .withEnv("metadataServiceUri", "metadata-store:" + 
metadataStoreUrl)
                             .withEnv("useHostNameAsBookieID", "true")
                             // Disable fsyncs for tests since they're slow 
within the containers
                             .withEnv("journalSyncData", "false")
@@ -179,50 +211,56 @@ public class PulsarCluster {
 
         // create brokers
         brokerContainers.putAll(
-            runNumContainers("broker", spec.numBrokers(), (name) -> {
-                BrokerContainer brokerContainer = new 
BrokerContainer(clusterName, appendClusterName(name), spec.enableTls)
-                        .withNetwork(network)
-                        .withNetworkAliases(appendClusterName(name))
-                        .withEnv("zkServers", 
appendClusterName(ZKContainer.NAME))
-                        .withEnv("zookeeperServers", 
appendClusterName(ZKContainer.NAME))
-                        .withEnv("configurationStoreServers", CSContainer.NAME 
+ ":" + CS_PORT)
-                        .withEnv("clusterName", clusterName)
-                        
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
-                        .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
-                        // used in s3 tests
-                        .withEnv("AWS_ACCESS_KEY_ID", 
"accesskey").withEnv("AWS_SECRET_KEY", "secretkey")
-                        .withEnv("maxMessageSize", "" + spec.maxMessageSize);
-                    if (spec.enableTls) {
-                        // enable mTLS
-                        brokerContainer
-                            .withEnv("webServicePortTls", 
String.valueOf(BROKER_HTTPS_PORT))
-                            .withEnv("brokerServicePortTls", 
String.valueOf(BROKER_PORT_TLS))
-                            .withEnv("authenticateOriginalAuthData", "true")
-                            .withEnv("tlsAllowInsecureConnection", "false")
-                            .withEnv("tlsRequireTrustedClientCertOnConnect", 
"true")
-                            .withEnv("tlsTrustCertsFilePath", 
"/pulsar/certificate-authority/certs/ca.cert.pem")
-                            .withEnv("tlsCertificateFilePath", 
"/pulsar/certificate-authority/server-keys/broker.cert.pem")
-                            .withEnv("tlsKeyFilePath", 
"/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
-                    }
-                    if (spec.queryLastMessage) {
-                        
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
-                        brokerContainer.withEnv("bookkeeperUseV2WireProtocol", 
"false");
-                    }
-                    if (spec.brokerEnvs != null) {
-                        brokerContainer.withEnv(spec.brokerEnvs);
-                    }
-                    if (spec.brokerMountFiles != null) {
-                        
spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
-                    }
-                    if (spec.brokerAdditionalPorts() != null) {
-                        
spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort);
-                    }
-                    return brokerContainer;
-                }
-            ));
+                runNumContainers("broker", spec.numBrokers(), (name) -> {
+                            BrokerContainer brokerContainer =
+                                    new BrokerContainer(clusterName, 
appendClusterName(name), spec.enableTls)
+                                            .withNetwork(network)
+                                            
.withNetworkAliases(appendClusterName(name))
+                                            .withEnv("metadataStoreUrl", 
metadataStoreUrl)
+                                            
.withEnv("configurationMetadataStoreUrl", configurationMetadataStoreUrl)
+                                            .withEnv("clusterName", 
clusterName)
+                                            
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
+                                            
.withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
+                                            // used in s3 tests
+                                            .withEnv("AWS_ACCESS_KEY_ID", 
"accesskey").withEnv("AWS_SECRET_KEY",
+                                                    "secretkey")
+                                            .withEnv("maxMessageSize", "" + 
spec.maxMessageSize);
+                            if (spec.enableTls) {
+                                // enable mTLS
+                                brokerContainer
+                                        .withEnv("webServicePortTls", 
String.valueOf(BROKER_HTTPS_PORT))
+                                        .withEnv("brokerServicePortTls", 
String.valueOf(BROKER_PORT_TLS))
+                                        
.withEnv("authenticateOriginalAuthData", "true")
+                                        .withEnv("tlsAllowInsecureConnection", 
"false")
+                                        
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
+                                        .withEnv("tlsTrustCertsFilePath", 
"/pulsar/certificate-authority/certs/ca"
+                                                + ".cert.pem")
+                                        .withEnv("tlsCertificateFilePath",
+                                                
"/pulsar/certificate-authority/server-keys/broker.cert.pem")
+                                        .withEnv("tlsKeyFilePath",
+                                                
"/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
+                            }
+                            if (spec.queryLastMessage) {
+                                
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
+                                
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
+                            }
+                            if (spec.brokerEnvs != null) {
+                                brokerContainer.withEnv(spec.brokerEnvs);
+                            }
+                            if (spec.brokerMountFiles != null) {
+                                
spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
+                            }
+                            if (spec.brokerAdditionalPorts() != null) {
+                                
spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort);
+                            }
+                            return brokerContainer;
+                        }
+                ));
 
         spec.classPathVolumeMounts.forEach((key, value) -> {
-            zkContainer.withClasspathResourceMapping(key, value, 
BindMode.READ_WRITE);
+            if (zkContainer != null) {
+                zkContainer.withClasspathResourceMapping(key, value, 
BindMode.READ_WRITE);
+            }
             proxyContainer.withClasspathResourceMapping(key, value, 
BindMode.READ_WRITE);
 
             bookieContainers.values().forEach(c -> 
c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
@@ -278,20 +316,33 @@ public class PulsarCluster {
     }
 
     public void start() throws Exception {
-        // start the local zookeeper
-        zkContainer.start();
-        log.info("Successfully started local zookeeper container.");
 
-        // start the configuration store
-        if (!sharedCsContainer) {
-            csContainer.start();
-            log.info("Successfully started configuration store container.");
+        if (!spec.enableOxia) {
+            // start the local zookeeper
+            zkContainer.start();
+            log.info("Successfully started local zookeeper container.");
+
+            // start the configuration store
+            if (!sharedCsContainer) {
+                csContainer.start();
+                log.info("Successfully started configuration store 
container.");
+            }
+        } else {
+            oxiaContainer.start();
         }
 
-        // init the cluster
-        zkContainer.execCmd(
-            "bin/init-cluster.sh");
-        log.info("Successfully initialized the cluster.");
+        {
+            // Run cluster metadata initialization
+            @Cleanup
+            PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
+                    network,
+                    clusterName,
+                    metadataStoreUrl,
+                    configurationMetadataStoreUrl,
+                    appendClusterName("pulsar-broker-0")
+            );
+            init.initialize();
+        }
 
         // start bookies
         bookieContainers.values().forEach(BKContainer::start);
@@ -318,7 +369,8 @@ public class PulsarCluster {
                 serviceContainer.withNetwork(network);
                 serviceContainer.withNetworkAliases(service.getKey());
                 if (null != externalServiceEnvs && null != 
externalServiceEnvs.get(service.getKey())) {
-                    Map<String, String> env = 
externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap());
+                    Map<String, String> env =
+                            externalServiceEnvs.getOrDefault(service.getKey(), 
Collections.emptyMap());
                     serviceContainer.withEnv(env);
                 }
                 
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
@@ -390,6 +442,10 @@ public class PulsarCluster {
             zkContainer.stop();
         }
 
+        if (oxiaContainer != null) {
+            oxiaContainer.stop();
+        }
+
         try {
             network.close();
         } catch (Exception e) {
@@ -403,7 +459,8 @@ public class PulsarCluster {
                 .forEach(GenericContainer::stop);
     }
 
-    public synchronized void setupFunctionWorkers(String suffix, 
FunctionRuntimeType runtimeType, int numFunctionWorkers) {
+    public synchronized void setupFunctionWorkers(String suffix, 
FunctionRuntimeType runtimeType,
+                                                  int numFunctionWorkers) {
         switch (runtimeType) {
             case THREAD:
                 startFunctionWorkersWithThreadContainerFactory(suffix, 
numFunctionWorkers);
@@ -418,23 +475,23 @@ public class PulsarCluster {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
-            "functions-worker-process-" + suffix,
-            numFunctionWorkers,
-            (name) -> new WorkerContainer(clusterName, name)
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                // worker settings
-                .withEnv("PF_workerId", name)
-                .withEnv("PF_workerHostname", name)
-                .withEnv("PF_workerPort", "" + 
PulsarContainer.BROKER_HTTP_PORT)
-                .withEnv("PF_pulsarFunctionsCluster", clusterName)
-                .withEnv("PF_pulsarServiceUrl", serviceUrl)
-                .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
-                // script
-                .withEnv("clusterName", clusterName)
-                .withEnv("zookeeperServers", ZKContainer.NAME)
-                // bookkeeper tools
-                .withEnv("zkServers", ZKContainer.NAME)
+                "functions-worker-process-" + suffix,
+                numFunctionWorkers,
+                (name) -> new WorkerContainer(clusterName, name)
+                        .withNetwork(network)
+                        .withNetworkAliases(name)
+                        // worker settings
+                        .withEnv("PF_workerId", name)
+                        .withEnv("PF_workerHostname", name)
+                        .withEnv("PF_workerPort", "" + 
PulsarContainer.BROKER_HTTP_PORT)
+                        .withEnv("PF_pulsarFunctionsCluster", clusterName)
+                        .withEnv("PF_pulsarServiceUrl", serviceUrl)
+                        .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+                        // script
+                        .withEnv("clusterName", clusterName)
+                        .withEnv("zookeeperServers", ZKContainer.NAME)
+                        // bookkeeper tools
+                        .withEnv("zkServers", ZKContainer.NAME)
         ));
         this.startWorkers();
     }
@@ -443,25 +500,26 @@ public class PulsarCluster {
         String serviceUrl = "pulsar://pulsar-broker-0:" + 
PulsarContainer.BROKER_PORT;
         String httpServiceUrl = "http://pulsar-broker-0:"; + 
PulsarContainer.BROKER_HTTP_PORT;
         workerContainers.putAll(runNumContainers(
-            "functions-worker-thread-" + suffix,
-            numFunctionWorkers,
-            (name) -> new WorkerContainer(clusterName, name)
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                // worker settings
-                .withEnv("PF_workerId", name)
-                .withEnv("PF_workerHostname", name)
-                .withEnv("PF_workerPort", "" + 
PulsarContainer.BROKER_HTTP_PORT)
-                .withEnv("PF_pulsarFunctionsCluster", clusterName)
-                .withEnv("PF_pulsarServiceUrl", serviceUrl)
-                .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
-                .withEnv("PF_functionRuntimeFactoryClassName", 
"org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")
-                .withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", 
"pf-container-group")
-                // script
-                .withEnv("clusterName", clusterName)
-                .withEnv("zookeeperServers", ZKContainer.NAME)
-                // bookkeeper tools
-                .withEnv("zkServers", ZKContainer.NAME)
+                "functions-worker-thread-" + suffix,
+                numFunctionWorkers,
+                (name) -> new WorkerContainer(clusterName, name)
+                        .withNetwork(network)
+                        .withNetworkAliases(name)
+                        // worker settings
+                        .withEnv("PF_workerId", name)
+                        .withEnv("PF_workerHostname", name)
+                        .withEnv("PF_workerPort", "" + 
PulsarContainer.BROKER_HTTP_PORT)
+                        .withEnv("PF_pulsarFunctionsCluster", clusterName)
+                        .withEnv("PF_pulsarServiceUrl", serviceUrl)
+                        .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
+                        .withEnv("PF_functionRuntimeFactoryClassName",
+                                
"org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")
+                        
.withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", 
"pf-container-group")
+                        // script
+                        .withEnv("clusterName", clusterName)
+                        .withEnv("zookeeperServers", ZKContainer.NAME)
+                        // bookkeeper tools
+                        .withEnv("zkServers", ZKContainer.NAME)
         ));
         this.startWorkers();
     }
@@ -502,9 +560,9 @@ public class PulsarCluster {
         containers.forEach((name, container) -> {
             PulsarContainer.configureLeaveContainerRunning(container);
             container
-                .withNetwork(network)
-                .withNetworkAliases(name)
-                .start();
+                    .withNetwork(network)
+                    .withNetworkAliases(name)
+                    .start();
             log.info("Successfully start container {}.", name);
         });
     }
@@ -576,15 +634,16 @@ public class PulsarCluster {
         return zkContainer;
     }
 
-    public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) 
throws Exception {
+    public ContainerExecResult runAdminCommandOnAnyBroker(String... commands) 
throws Exception {
         return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
     }
 
-    public ContainerExecResult 
runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception {
+    public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String... 
commands) throws Exception {
         return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, 
commands);
     }
 
-    private ContainerExecResult runCommandOnAnyBrokerWithScript(String 
scriptType, String...commands) throws Exception {
+    private ContainerExecResult runCommandOnAnyBrokerWithScript(String 
scriptType, String... commands)
+            throws Exception {
         BrokerContainer container = getAnyBroker();
         String[] cmds = new String[commands.length + 1];
         cmds[0] = scriptType;
@@ -618,8 +677,8 @@ public class PulsarCluster {
 
     public ContainerExecResult createNamespace(String nsName) throws Exception 
{
         return runAdminCommandOnAnyBroker(
-            "namespaces", "create", "public/" + nsName,
-            "--clusters", clusterName);
+                "namespaces", "create", "public/" + nsName,
+                "--clusters", clusterName);
     }
 
     public ContainerExecResult createPartitionedTopic(String topicName, int 
partitions) throws Exception {
@@ -630,8 +689,8 @@ public class PulsarCluster {
 
     public ContainerExecResult enableDeduplication(String nsName, boolean 
enabled) throws Exception {
         return runAdminCommandOnAnyBroker(
-            "namespaces", "set-deduplication", "public/" + nsName,
-            enabled ? "--enable" : "--disable");
+                "namespaces", "set-deduplication", "public/" + nsName,
+                enabled ? "--enable" : "--disable");
     }
 
     public void dumpFunctionLogs(String name) {
@@ -644,7 +703,8 @@ public class PulsarCluster {
                 });
                 log.info("Function {} logs {}", name, logs);
             } catch (com.github.dockerjava.api.exception.NotFoundException 
notFound) {
-                log.info("Cannot download {} logs from {} not found exception 
{}", name, container.getContainerName(), notFound.toString());
+                log.info("Cannot download {} logs from {} not found exception 
{}", name, container.getContainerName(),
+                        notFound.toString());
             } catch (Throwable err) {
                 log.info("Cannot download {} logs from {}", name, 
container.getContainerName(), err);
             }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 81e7ae70eff..b705b347cff 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -175,4 +175,7 @@ public class PulsarClusterSpec {
      */
     @Default
     boolean enableTls = false;
+
+    @Default
+    boolean enableOxia = false;
 }
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml 
b/tests/integration/src/test/resources/pulsar-messaging.xml
index cfbdb225870..603756fab68 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -29,6 +29,8 @@
             <class 
name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" />
             <class 
name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest"
 />
             <class name="org.apache.pulsar.tests.integration.admin.AdminTest" 
/>
+
+            <class 
name="org.apache.pulsar.tests.integration.oxia.OxiaSmokeTest" />
         </classes>
     </test>
 </suite>
\ No newline at end of file


Reply via email to