This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2b75ca0e02c [improve] PIP-335: Pulsar with Oxia integration test (#22045) 2b75ca0e02c is described below commit 2b75ca0e02c10262813de509b96f5678faffc934 Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri Feb 9 10:50:33 2024 -0800 [improve] PIP-335: Pulsar with Oxia integration test (#22045) --- .../docker-images/latest-version-image/Dockerfile | 2 +- .../latest-version-image/scripts/init-cluster.sh | 38 --- .../latest-version-image/scripts/run-bookie.sh | 1 - .../latest-version-image/scripts/run-broker.sh | 1 - .../scripts/run-functions-worker.sh | 1 - .../latest-version-image/scripts/run-proxy.sh | 1 - .../latest-version-image/scripts/run-websocket.sh | 1 - .../containers/PulsarInitMetadataContainer.java | 76 +++++ .../tests/integration/oxia/OxiaContainer.java | 72 +++++ .../tests/integration/oxia/OxiaSmokeTest.java | 48 +++ .../integration/topologies/PulsarCluster.java | 330 ++++++++++++--------- .../integration/topologies/PulsarClusterSpec.java | 3 + .../src/test/resources/pulsar-messaging.xml | 2 + 13 files changed, 397 insertions(+), 179 deletions(-) diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile index 4973bec0441..f019af5c926 100644 --- a/tests/docker-images/latest-version-image/Dockerfile +++ b/tests/docker-images/latest-version-image/Dockerfile @@ -50,7 +50,7 @@ COPY conf/supervisord.conf /etc/supervisord.conf COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \ conf/proxy.conf conf/websocket.conf /etc/supervisord/conf.d/ -COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \ +COPY scripts/run-global-zk.sh scripts/run-local-zk.sh \ scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh \ scripts/run-standalone.sh scripts/run-websocket.sh \ /pulsar/bin/ diff --git a/tests/docker-images/latest-version-image/scripts/init-cluster.sh b/tests/docker-images/latest-version-image/scripts/init-cluster.sh deleted file mode 100755 index 926845d5a77..00000000000 --- a/tests/docker-images/latest-version-image/scripts/init-cluster.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -set -x - -ZNODE="/initialized-$clusterName" - -bin/watch-znode.py -z $zkServers -p / -w - -bin/watch-znode.py -z $zkServers -p $ZNODE -e -if [ $? != 0 ]; then - echo Initializing cluster - bin/apply-config-from-env.py conf/bookkeeper.conf && - bin/pulsar initialize-cluster-metadata --cluster $clusterName --zookeeper $zkServers \ - --configuration-store $configurationStore --web-service-url http://$pulsarNode:8080/ \ - --broker-service-url pulsar://$pulsarNode:6650/ && - bin/watch-znode.py -z $zkServers -p $ZNODE -c - echo Initialized -else - echo Already Initialized -fi diff --git a/tests/docker-images/latest-version-image/scripts/run-bookie.sh b/tests/docker-images/latest-version-image/scripts/run-bookie.sh index 64466eb2d9a..e454e667645 100755 --- a/tests/docker-images/latest-version-image/scripts/run-bookie.sh +++ b/tests/docker-images/latest-version-image/scripts/run-bookie.sh @@ -29,6 +29,5 @@ if [ -z "$NO_AUTOSTART" ]; then sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/bookie.conf fi -bin/watch-znode.py -z $zkServers -p /initialized-$clusterName -w exec /usr/bin/supervisord -c /etc/supervisord.conf diff --git a/tests/docker-images/latest-version-image/scripts/run-broker.sh b/tests/docker-images/latest-version-image/scripts/run-broker.sh index 6ed5d60c39e..4f89f145f2b 100755 --- a/tests/docker-images/latest-version-image/scripts/run-broker.sh +++ b/tests/docker-images/latest-version-image/scripts/run-broker.sh @@ -25,6 +25,5 @@ if [ -z "$NO_AUTOSTART" ]; then sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/broker.conf fi -bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w exec /usr/bin/supervisord -c /etc/supervisord.conf diff --git a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh index 3fadf960ee3..cd9d7593dbf 100755 --- a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh +++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh @@ -26,6 +26,5 @@ if [ -z "$NO_AUTOSTART" ]; then sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/functions_worker.conf fi -bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w exec /usr/bin/supervisord -c /etc/supervisord.conf diff --git a/tests/docker-images/latest-version-image/scripts/run-proxy.sh b/tests/docker-images/latest-version-image/scripts/run-proxy.sh index 4836a890bda..f44ed0bb658 100755 --- a/tests/docker-images/latest-version-image/scripts/run-proxy.sh +++ b/tests/docker-images/latest-version-image/scripts/run-proxy.sh @@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/proxy.conf fi -bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w exec /usr/bin/supervisord -c /etc/supervisord.conf diff --git a/tests/docker-images/latest-version-image/scripts/run-websocket.sh b/tests/docker-images/latest-version-image/scripts/run-websocket.sh index a49ee111768..34e4b9016af 100755 --- a/tests/docker-images/latest-version-image/scripts/run-websocket.sh +++ b/tests/docker-images/latest-version-image/scripts/run-websocket.sh @@ -25,5 +25,4 @@ if [ -z "$NO_AUTOSTART" ]; then sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/websocket.conf fi -bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w exec /usr/bin/supervisord -c /etc/supervisord.conf diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java new file mode 100644 index 00000000000..4251ed3bd57 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarInitMetadataContainer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.tests.integration.containers; + +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; + +/** + * Initialize the Pulsar metadata + */ +@Slf4j +public class PulsarInitMetadataContainer extends GenericContainer<PulsarInitMetadataContainer> { + + public static final String NAME = "init-metadata"; + + private final String clusterName; + private final String metadataStoreUrl; + private final String configurationMetadataStoreUrl; + private final String brokerHostname; + + public PulsarInitMetadataContainer(Network network, + String clusterName, + String metadataStoreUrl, + String configurationMetadataStoreUrl, + String brokerHostname) { + this.clusterName = clusterName; + this.metadataStoreUrl = metadataStoreUrl; + this.configurationMetadataStoreUrl = configurationMetadataStoreUrl; + this.brokerHostname = brokerHostname; + setDockerImageName(PulsarContainer.DEFAULT_IMAGE_NAME); + withNetwork(network); + + setCommand("sleep 1000000"); + } + + + public void initialize() throws Exception { + start(); + ExecResult res = this.execInContainer( + "/pulsar/bin/pulsar", "initialize-cluster-metadata", + "--cluster", clusterName, + "--metadata-store", metadataStoreUrl, + "--configuration-metadata-store", configurationMetadataStoreUrl, + "--web-service-url", "http://" + brokerHostname + ":8080/", + "--broker-service-url", "pulsar://" + brokerHostname + ":6650/" + ); + + if (res.getExitCode() == 0) { + log.info("Successfully initialized cluster"); + } else { + log.warn("Failed to initialize Pulsar cluster. exit code: " + res.getExitCode()); + log.warn("STDOUT: " + res.getStdout()); + log.warn("STDERR: " + res.getStderr()); + throw new IOException("Failed to initialized Pulsar Cluster"); + } + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java new file mode 100644 index 00000000000..18d2dd77b7d --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaContainer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.tests.integration.oxia; + +import java.time.Duration; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class OxiaContainer extends ChaosContainer<OxiaContainer> { + + public static final String NAME = "oxia"; + + public static final int OXIA_PORT = 6648; + public static final int METRICS_PORT = 8080; + private static final int DEFAULT_SHARDS = 1; + + private static final String DEFAULT_IMAGE_NAME = "streamnative/oxia:main"; + + public OxiaContainer(String clusterName) { + this(clusterName, DEFAULT_IMAGE_NAME, DEFAULT_SHARDS); + } + + @SuppressWarnings("resource") + OxiaContainer(String clusterName, String imageName, int shards) { + super(clusterName, imageName); + if (shards <= 0) { + throw new IllegalArgumentException("shards must be greater than zero"); + } + addExposedPorts(OXIA_PORT, METRICS_PORT); + this.withCreateContainerCmdModifier(createContainerCmd -> { + createContainerCmd.withHostName("oxia"); + createContainerCmd.withName(getContainerName()); + }); + setCommand("oxia", "standalone", + "--shards=" + shards, + "--wal-sync-data=false"); + waitingFor( + Wait.forHttp("/metrics") + .forPort(METRICS_PORT) + .forStatusCode(200) + .withStartupTimeout(Duration.ofSeconds(30))); + + PulsarContainer.configureLeaveContainerRunning(this); + } + + public String getServiceAddress() { + return OxiaContainer.NAME + ":" + OXIA_PORT; + } + + @Override + public String getContainerName() { + return clusterName + "-oxia"; + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java new file mode 100644 index 00000000000..d55c437b4f8 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/oxia/OxiaSmokeTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.oxia; + +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.testng.annotations.Test; + +/** + * Test pulsar produce/consume semantics + */ +@Slf4j +public class OxiaSmokeTest extends PulsarTestSuite { + + protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster( + String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { + specBuilder.enableOxia(true); + return specBuilder; + } + + // + // Test Basic Publish & Consume Operations + // + + @Test(dataProvider = "ServiceUrlAndTopics") + public void testPublishAndConsume(Supplier<String> serviceUrl, boolean isPersistent) throws Exception { + super.testPublishAndConsume(serviceUrl.get(), isPersistent); + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 88ff778732e..bc9b1e267b9 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.Function; +import lombok.Cleanup; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; @@ -44,9 +45,11 @@ import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.containers.CSContainer; import org.apache.pulsar.tests.integration.containers.ProxyContainer; import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.containers.PulsarInitMetadataContainer; import org.apache.pulsar.tests.integration.containers.WorkerContainer; import org.apache.pulsar.tests.integration.containers.ZKContainer; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.oxia.OxiaContainer; import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -69,9 +72,12 @@ public class PulsarCluster { * @return the built pulsar cluster */ public static PulsarCluster forSpec(PulsarClusterSpec spec) { - CSContainer csContainer = new CSContainer(spec.clusterName) - .withNetwork(Network.newNetwork()) - .withNetworkAliases(CSContainer.NAME); + CSContainer csContainer = null; + if (!spec.enableOxia) { + csContainer = new CSContainer(spec.clusterName) + .withNetwork(Network.newNetwork()) + .withNetworkAliases(CSContainer.NAME); + } return new PulsarCluster(spec, csContainer, false); } @@ -86,6 +92,8 @@ public class PulsarCluster { private final String clusterName; private final Network network; private final ZKContainer zkContainer; + + private final OxiaContainer oxiaContainer; private final CSContainer csContainer; private final boolean sharedCsContainer; private final Map<String, BKContainer> bookieContainers; @@ -95,22 +103,44 @@ public class PulsarCluster { private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap(); private Map<String, Map<String, String>> externalServiceEnvs; + private final String metadataStoreUrl; + private final String configurationMetadataStoreUrl; + private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) { this.spec = spec; this.sharedCsContainer = sharedCsContainer; this.clusterName = spec.clusterName(); - this.network = csContainer.getNetwork(); - - this.zkContainer = new ZKContainer(clusterName); - this.zkContainer - .withNetwork(network) - .withNetworkAliases(appendClusterName(ZKContainer.NAME)) - .withEnv("clusterName", clusterName) - .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) - .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT) - .withEnv("forceSync", "no") - .withEnv("pulsarNode", appendClusterName("pulsar-broker-0")); + if (csContainer != null ) { + this.network = csContainer.getNetwork(); + } else { + this.network = Network.newNetwork(); + } + + + + if (spec.enableOxia) { + this.zkContainer = null; + this.oxiaContainer = new OxiaContainer(clusterName); + this.oxiaContainer + .withNetwork(network) + .withNetworkAliases(appendClusterName(OxiaContainer.NAME)); + metadataStoreUrl = "oxia://" + oxiaContainer.getServiceAddress(); + configurationMetadataStoreUrl = metadataStoreUrl; + } else { + this.oxiaContainer = null; + this.zkContainer = new ZKContainer(clusterName); + this.zkContainer + .withNetwork(network) + .withNetworkAliases(appendClusterName(ZKContainer.NAME)) + .withEnv("clusterName", clusterName) + .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) + .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT) + .withEnv("forceSync", "no") + .withEnv("pulsarNode", appendClusterName("pulsar-broker-0")); + metadataStoreUrl = appendClusterName(ZKContainer.NAME); + configurationMetadataStoreUrl = CSContainer.NAME + ":" + CS_PORT; + } this.csContainer = csContainer; @@ -121,27 +151,29 @@ public class PulsarCluster { this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls) .withNetwork(network) .withNetworkAliases(appendClusterName("pulsar-proxy")) - .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) - .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME)) - .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) + .withEnv("metadataStoreUrl", metadataStoreUrl) + .withEnv("configurationMetadataStoreUrl", configurationMetadataStoreUrl) .withEnv("clusterName", clusterName); - // enable mTLS + // enable mTLS if (spec.enableTls) { proxyContainer - .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT)) - .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS)) - .withEnv("forwardAuthorizationCredentials", "true") - .withEnv("tlsRequireTrustedClientCertOnConnect", "true") - .withEnv("tlsAllowInsecureConnection", "false") - .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem") - .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem") - .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem") - .withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName()) - .withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", "/pulsar/certificate-authority/client-keys/admin.cert.pem", "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem")) - .withEnv("tlsEnabledWithBroker", "true") - .withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem") - .withEnv("brokerClientCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem") - .withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem"); + .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT)) + .withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS)) + .withEnv("forwardAuthorizationCredentials", "true") + .withEnv("tlsRequireTrustedClientCertOnConnect", "true") + .withEnv("tlsAllowInsecureConnection", "false") + .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem") + .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem") + .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem") + .withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName()) + .withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", + "/pulsar/certificate-authority/client-keys/admin.cert.pem", + "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem")) + .withEnv("tlsEnabledWithBroker", "true") + .withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem") + .withEnv("brokerClientCertificateFilePath", + "/pulsar/certificate-authority/server-keys/proxy.cert.pem") + .withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem"); } if (spec.proxyEnvs != null) { @@ -157,7 +189,7 @@ public class PulsarCluster { BKContainer bookieContainer = new BKContainer(clusterName, name) .withNetwork(network) .withNetworkAliases(appendClusterName(name)) - .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) + .withEnv("metadataServiceUri", "metadata-store:" + metadataStoreUrl) .withEnv("useHostNameAsBookieID", "true") // Disable fsyncs for tests since they're slow within the containers .withEnv("journalSyncData", "false") @@ -179,50 +211,56 @@ public class PulsarCluster { // create brokers brokerContainers.putAll( - runNumContainers("broker", spec.numBrokers(), (name) -> { - BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name), spec.enableTls) - .withNetwork(network) - .withNetworkAliases(appendClusterName(name)) - .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) - .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME)) - .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) - .withEnv("clusterName", clusterName) - .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") - .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1") - // used in s3 tests - .withEnv("AWS_ACCESS_KEY_ID", "accesskey").withEnv("AWS_SECRET_KEY", "secretkey") - .withEnv("maxMessageSize", "" + spec.maxMessageSize); - if (spec.enableTls) { - // enable mTLS - brokerContainer - .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT)) - .withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS)) - .withEnv("authenticateOriginalAuthData", "true") - .withEnv("tlsAllowInsecureConnection", "false") - .withEnv("tlsRequireTrustedClientCertOnConnect", "true") - .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem") - .withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem") - .withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem"); - } - if (spec.queryLastMessage) { - brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10"); - brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false"); - } - if (spec.brokerEnvs != null) { - brokerContainer.withEnv(spec.brokerEnvs); - } - if (spec.brokerMountFiles != null) { - spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind); - } - if (spec.brokerAdditionalPorts() != null) { - spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort); - } - return brokerContainer; - } - )); + runNumContainers("broker", spec.numBrokers(), (name) -> { + BrokerContainer brokerContainer = + new BrokerContainer(clusterName, appendClusterName(name), spec.enableTls) + .withNetwork(network) + .withNetworkAliases(appendClusterName(name)) + .withEnv("metadataStoreUrl", metadataStoreUrl) + .withEnv("configurationMetadataStoreUrl", configurationMetadataStoreUrl) + .withEnv("clusterName", clusterName) + .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") + .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1") + // used in s3 tests + .withEnv("AWS_ACCESS_KEY_ID", "accesskey").withEnv("AWS_SECRET_KEY", + "secretkey") + .withEnv("maxMessageSize", "" + spec.maxMessageSize); + if (spec.enableTls) { + // enable mTLS + brokerContainer + .withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT)) + .withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS)) + .withEnv("authenticateOriginalAuthData", "true") + .withEnv("tlsAllowInsecureConnection", "false") + .withEnv("tlsRequireTrustedClientCertOnConnect", "true") + .withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca" + + ".cert.pem") + .withEnv("tlsCertificateFilePath", + "/pulsar/certificate-authority/server-keys/broker.cert.pem") + .withEnv("tlsKeyFilePath", + "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem"); + } + if (spec.queryLastMessage) { + brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10"); + brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false"); + } + if (spec.brokerEnvs != null) { + brokerContainer.withEnv(spec.brokerEnvs); + } + if (spec.brokerMountFiles != null) { + spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind); + } + if (spec.brokerAdditionalPorts() != null) { + spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort); + } + return brokerContainer; + } + )); spec.classPathVolumeMounts.forEach((key, value) -> { - zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE); + if (zkContainer != null) { + zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE); + } proxyContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE); bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE)); @@ -278,20 +316,33 @@ public class PulsarCluster { } public void start() throws Exception { - // start the local zookeeper - zkContainer.start(); - log.info("Successfully started local zookeeper container."); - // start the configuration store - if (!sharedCsContainer) { - csContainer.start(); - log.info("Successfully started configuration store container."); + if (!spec.enableOxia) { + // start the local zookeeper + zkContainer.start(); + log.info("Successfully started local zookeeper container."); + + // start the configuration store + if (!sharedCsContainer) { + csContainer.start(); + log.info("Successfully started configuration store container."); + } + } else { + oxiaContainer.start(); } - // init the cluster - zkContainer.execCmd( - "bin/init-cluster.sh"); - log.info("Successfully initialized the cluster."); + { + // Run cluster metadata initialization + @Cleanup + PulsarInitMetadataContainer init = new PulsarInitMetadataContainer( + network, + clusterName, + metadataStoreUrl, + configurationMetadataStoreUrl, + appendClusterName("pulsar-broker-0") + ); + init.initialize(); + } // start bookies bookieContainers.values().forEach(BKContainer::start); @@ -318,7 +369,8 @@ public class PulsarCluster { serviceContainer.withNetwork(network); serviceContainer.withNetworkAliases(service.getKey()); if (null != externalServiceEnvs && null != externalServiceEnvs.get(service.getKey())) { - Map<String, String> env = externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap()); + Map<String, String> env = + externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap()); serviceContainer.withEnv(env); } PulsarContainer.configureLeaveContainerRunning(serviceContainer); @@ -390,6 +442,10 @@ public class PulsarCluster { zkContainer.stop(); } + if (oxiaContainer != null) { + oxiaContainer.stop(); + } + try { network.close(); } catch (Exception e) { @@ -403,7 +459,8 @@ public class PulsarCluster { .forEach(GenericContainer::stop); } - public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) { + public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, + int numFunctionWorkers) { switch (runtimeType) { case THREAD: startFunctionWorkersWithThreadContainerFactory(suffix, numFunctionWorkers); @@ -418,23 +475,23 @@ public class PulsarCluster { String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT; String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT; workerContainers.putAll(runNumContainers( - "functions-worker-process-" + suffix, - numFunctionWorkers, - (name) -> new WorkerContainer(clusterName, name) - .withNetwork(network) - .withNetworkAliases(name) - // worker settings - .withEnv("PF_workerId", name) - .withEnv("PF_workerHostname", name) - .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT) - .withEnv("PF_pulsarFunctionsCluster", clusterName) - .withEnv("PF_pulsarServiceUrl", serviceUrl) - .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl) - // script - .withEnv("clusterName", clusterName) - .withEnv("zookeeperServers", ZKContainer.NAME) - // bookkeeper tools - .withEnv("zkServers", ZKContainer.NAME) + "functions-worker-process-" + suffix, + numFunctionWorkers, + (name) -> new WorkerContainer(clusterName, name) + .withNetwork(network) + .withNetworkAliases(name) + // worker settings + .withEnv("PF_workerId", name) + .withEnv("PF_workerHostname", name) + .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT) + .withEnv("PF_pulsarFunctionsCluster", clusterName) + .withEnv("PF_pulsarServiceUrl", serviceUrl) + .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl) + // script + .withEnv("clusterName", clusterName) + .withEnv("zookeeperServers", ZKContainer.NAME) + // bookkeeper tools + .withEnv("zkServers", ZKContainer.NAME) )); this.startWorkers(); } @@ -443,25 +500,26 @@ public class PulsarCluster { String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT; String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT; workerContainers.putAll(runNumContainers( - "functions-worker-thread-" + suffix, - numFunctionWorkers, - (name) -> new WorkerContainer(clusterName, name) - .withNetwork(network) - .withNetworkAliases(name) - // worker settings - .withEnv("PF_workerId", name) - .withEnv("PF_workerHostname", name) - .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT) - .withEnv("PF_pulsarFunctionsCluster", clusterName) - .withEnv("PF_pulsarServiceUrl", serviceUrl) - .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl) - .withEnv("PF_functionRuntimeFactoryClassName", "org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory") - .withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", "pf-container-group") - // script - .withEnv("clusterName", clusterName) - .withEnv("zookeeperServers", ZKContainer.NAME) - // bookkeeper tools - .withEnv("zkServers", ZKContainer.NAME) + "functions-worker-thread-" + suffix, + numFunctionWorkers, + (name) -> new WorkerContainer(clusterName, name) + .withNetwork(network) + .withNetworkAliases(name) + // worker settings + .withEnv("PF_workerId", name) + .withEnv("PF_workerHostname", name) + .withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT) + .withEnv("PF_pulsarFunctionsCluster", clusterName) + .withEnv("PF_pulsarServiceUrl", serviceUrl) + .withEnv("PF_pulsarWebServiceUrl", httpServiceUrl) + .withEnv("PF_functionRuntimeFactoryClassName", + "org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory") + .withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", "pf-container-group") + // script + .withEnv("clusterName", clusterName) + .withEnv("zookeeperServers", ZKContainer.NAME) + // bookkeeper tools + .withEnv("zkServers", ZKContainer.NAME) )); this.startWorkers(); } @@ -502,9 +560,9 @@ public class PulsarCluster { containers.forEach((name, container) -> { PulsarContainer.configureLeaveContainerRunning(container); container - .withNetwork(network) - .withNetworkAliases(name) - .start(); + .withNetwork(network) + .withNetworkAliases(name) + .start(); log.info("Successfully start container {}.", name); }); } @@ -576,15 +634,16 @@ public class PulsarCluster { return zkContainer; } - public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) throws Exception { + public ContainerExecResult runAdminCommandOnAnyBroker(String... commands) throws Exception { return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands); } - public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception { + public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String... commands) throws Exception { return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, commands); } - private ContainerExecResult runCommandOnAnyBrokerWithScript(String scriptType, String...commands) throws Exception { + private ContainerExecResult runCommandOnAnyBrokerWithScript(String scriptType, String... commands) + throws Exception { BrokerContainer container = getAnyBroker(); String[] cmds = new String[commands.length + 1]; cmds[0] = scriptType; @@ -618,8 +677,8 @@ public class PulsarCluster { public ContainerExecResult createNamespace(String nsName) throws Exception { return runAdminCommandOnAnyBroker( - "namespaces", "create", "public/" + nsName, - "--clusters", clusterName); + "namespaces", "create", "public/" + nsName, + "--clusters", clusterName); } public ContainerExecResult createPartitionedTopic(String topicName, int partitions) throws Exception { @@ -630,8 +689,8 @@ public class PulsarCluster { public ContainerExecResult enableDeduplication(String nsName, boolean enabled) throws Exception { return runAdminCommandOnAnyBroker( - "namespaces", "set-deduplication", "public/" + nsName, - enabled ? "--enable" : "--disable"); + "namespaces", "set-deduplication", "public/" + nsName, + enabled ? "--enable" : "--disable"); } public void dumpFunctionLogs(String name) { @@ -644,7 +703,8 @@ public class PulsarCluster { }); log.info("Function {} logs {}", name, logs); } catch (com.github.dockerjava.api.exception.NotFoundException notFound) { - log.info("Cannot download {} logs from {} not found exception {}", name, container.getContainerName(), notFound.toString()); + log.info("Cannot download {} logs from {} not found exception {}", name, container.getContainerName(), + notFound.toString()); } catch (Throwable err) { log.info("Cannot download {} logs from {}", name, container.getContainerName(), err); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java index 81e7ae70eff..b705b347cff 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java @@ -175,4 +175,7 @@ public class PulsarClusterSpec { */ @Default boolean enableTls = false; + + @Default + boolean enableOxia = false; } diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml index cfbdb225870..603756fab68 100644 --- a/tests/integration/src/test/resources/pulsar-messaging.xml +++ b/tests/integration/src/test/resources/pulsar-messaging.xml @@ -29,6 +29,8 @@ <class name="org.apache.pulsar.tests.integration.messaging.ReaderMessagingTest" /> <class name="org.apache.pulsar.tests.integration.messaging.NonDurableConsumerMessagingTest" /> <class name="org.apache.pulsar.tests.integration.admin.AdminTest" /> + + <class name="org.apache.pulsar.tests.integration.oxia.OxiaSmokeTest" /> </classes> </test> </suite> \ No newline at end of file