This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9579c4dea91 [improve][ci] Upgrade/Downgrade test (#22988)
9579c4dea91 is described below

commit 9579c4dea9120ed29523d7eb56c1b0637aaa9bc4
Author: Andrey Yegorov <[email protected]>
AuthorDate: Tue Oct 8 15:22:41 2024 -0700

    [improve][ci] Upgrade/Downgrade test (#22988)
---
 .github/workflows/pulsar-ci.yaml                   |   3 +
 build/run_integration_group.sh                     |   4 +
 .../integration/containers/PulsarContainer.java    |   5 +
 .../integration/topologies/PulsarCluster.java      |  58 +++++--
 .../integration/topologies/PulsarClusterSpec.java  |   6 +
 .../topologies/PulsarClusterTestBase.java          |   6 +-
 .../upgrade/PulsarUpgradeDowngradeTest.java        | 175 +++++++++++++++++++++
 .../src/test/resources/pulsar-upgrade.xml          |   2 +-
 8 files changed, 243 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 091dab25ec6..47a39bef9c9 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -603,6 +603,9 @@ jobs:
           - name: Metrics
             group: METRICS
 
+          - name: Upgrade
+            group: UPGRADE
+
     steps:
       - name: checkout
         uses: actions/checkout@v4
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index 2d82fce0887..63b92d4e0a7 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -177,6 +177,10 @@ test_group_standalone() {
   mvn_run_integration_test "$@" 
-DintegrationTestSuiteFile=pulsar-standalone.xml -DintegrationTests
 }
 
+test_group_upgrade() {
+ mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-upgrade.xml 
-DintegrationTests
+}
+
 test_group_transaction() {
   mvn_run_integration_test "$@" 
-DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 77cdc1bfd28..3cdb048aea5 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -51,8 +51,13 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
     public static final int BROKER_HTTP_PORT = 8080;
     public static final int BROKER_HTTPS_PORT = 8081;
 
+    public static final String ALPINE_IMAGE_NAME = "alpine:3.20";
     public static final String DEFAULT_IMAGE_NAME = 
System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME",
             "apachepulsar/pulsar-test-latest-version:latest");
+    public static final String UPGRADE_TEST_IMAGE_NAME = 
System.getenv().getOrDefault("PULSAR_UPGRADE_TEST_IMAGE_NAME",
+            DEFAULT_IMAGE_NAME);
+    public static final String LAST_RELEASE_IMAGE_NAME = 
System.getenv().getOrDefault("PULSAR_LAST_RELEASE_IMAGE_NAME",
+            "apachepulsar/pulsar:3.0.7");
     public static final String DEFAULT_HTTP_PATH = "/metrics";
     public static final String PULSAR_2_5_IMAGE_NAME = 
"apachepulsar/pulsar:2.5.0";
     public static final String PULSAR_2_4_IMAGE_NAME = 
"apachepulsar/pulsar:2.4.0";
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 90f08a96394..35fb453c4bb 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -72,22 +72,28 @@ public class PulsarCluster {
      * @return the built pulsar cluster
      */
     public static PulsarCluster forSpec(PulsarClusterSpec spec) {
+        return forSpec(spec, Network.newNetwork());
+    }
+
+    public static PulsarCluster forSpec(PulsarClusterSpec spec, Network 
network) {
+        checkArgument(network != null, "Network should not be null");
         CSContainer csContainer = null;
         if (!spec.enableOxia) {
             csContainer = new CSContainer(spec.clusterName)
-                    .withNetwork(Network.newNetwork())
+                    .withNetwork(network)
                     .withNetworkAliases(CSContainer.NAME);
         }
-        return new PulsarCluster(spec, csContainer, false);
+        return new PulsarCluster(spec, network, csContainer, false);
     }
 
     public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer 
csContainer) {
-        return new PulsarCluster(spec, csContainer, true);
+        return new PulsarCluster(spec, csContainer.getNetwork(), csContainer, 
true);
     }
 
     @Getter
     private final PulsarClusterSpec spec;
 
+    public boolean closeNetworkOnExit = true;
     @Getter
     private final String clusterName;
     private final Network network;
@@ -108,19 +114,18 @@ public class PulsarCluster {
     private final String metadataStoreUrl;
     private final String configurationMetadataStoreUrl;
 
-    private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, 
boolean sharedCsContainer) {
-
+    private PulsarCluster(PulsarClusterSpec spec, Network network, CSContainer 
csContainer, boolean sharedCsContainer) {
         this.spec = spec;
         this.sharedCsContainer = sharedCsContainer;
         this.clusterName = spec.clusterName();
-        if (csContainer != null ) {
+        if (network != null) {
+            this.network = network;
+        } else if (csContainer != null) {
             this.network = csContainer.getNetwork();
         } else {
             this.network = Network.newNetwork();
         }
 
-
-
         if (spec.enableOxia) {
             this.zkContainer = null;
             this.oxiaContainer = new OxiaContainer(clusterName);
@@ -203,7 +208,9 @@ public class PulsarCluster {
                             .withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", 
"0.95")
                             .withEnv("diskUsageThreshold", "0.99")
                             .withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", 
"0.97")
-                            .withEnv("nettyMaxFrameSizeBytes", 
String.valueOf(spec.maxMessageSize));
+                            .withEnv("nettyMaxFrameSizeBytes", 
String.valueOf(spec.maxMessageSize))
+                            .withEnv("ledgerDirectories", "data/bookkeeper/" + 
name + "/ledgers")
+                            .withEnv("journalDirectory", "data/bookkeeper/" + 
name + "/journal");
                     if (spec.bookkeeperEnvs != null) {
                         bookieContainer.withEnv(spec.bookkeeperEnvs);
                     }
@@ -262,10 +269,27 @@ public class PulsarCluster {
                         }
                 ));
 
+        if (spec.dataContainer != null) {
+            if (!sharedCsContainer && csContainer != null) {
+                csContainer.withVolumesFrom(spec.dataContainer, 
BindMode.READ_WRITE);
+            }
+            if (zkContainer != null) {
+                zkContainer.withVolumesFrom(spec.dataContainer, 
BindMode.READ_WRITE);
+            }
+            proxyContainer.withVolumesFrom(spec.dataContainer, 
BindMode.READ_WRITE);
+
+            bookieContainers.values().forEach(c -> 
c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
+            brokerContainers.values().forEach(c -> 
c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
+            workerContainers.values().forEach(c -> 
c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
+        }
+
         spec.classPathVolumeMounts.forEach((key, value) -> {
             if (zkContainer != null) {
                 zkContainer.withClasspathResourceMapping(key, value, 
BindMode.READ_WRITE);
             }
+            if (!sharedCsContainer && csContainer != null) {
+                csContainer.withClasspathResourceMapping(key, value, 
BindMode.READ_WRITE);
+            }
             proxyContainer.withClasspathResourceMapping(key, value, 
BindMode.READ_WRITE);
 
             bookieContainers.values().forEach(c -> 
c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
@@ -323,6 +347,10 @@ public class PulsarCluster {
     }
 
     public void start() throws Exception {
+        start(true);
+    }
+
+    public void start(boolean doInit) throws Exception {
 
         if (!spec.enableOxia) {
             // start the local zookeeper
@@ -338,7 +366,7 @@ public class PulsarCluster {
             oxiaContainer.start();
         }
 
-        {
+        if (doInit) {
             // Run cluster metadata initialization
             @Cleanup
             PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
@@ -453,10 +481,12 @@ public class PulsarCluster {
             oxiaContainer.stop();
         }
 
-        try {
-            network.close();
-        } catch (Exception e) {
-            log.info("Failed to shutdown network for pulsar cluster {}", 
clusterName, e);
+        if (closeNetworkOnExit) {
+            try {
+                network.close();
+            } catch (Exception e) {
+                log.info("Failed to shutdown network for pulsar cluster {}", 
clusterName, e);
+            }
         }
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 8a991be49fa..ca45c9b7c9b 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -124,6 +124,12 @@ public class PulsarClusterSpec {
     @Builder.Default
     Map<String, String> classPathVolumeMounts = new TreeMap<>();
 
+    /**
+     * Data container
+     */
+    @Builder.Default
+    GenericContainer<?> dataContainer = null;
+
     /**
      * Pulsar Test Image Name
      *
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index 93e2221ab24..8b99f213735 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -142,6 +142,10 @@ public abstract class PulsarClusterTestBase extends 
PulsarTestBase {
     }
 
     protected void setupCluster(PulsarClusterSpec spec) throws Exception {
+        setupCluster(spec, true);
+    }
+
+    protected void setupCluster(PulsarClusterSpec spec, boolean doInit) throws 
Exception {
         incrementSetupNumber();
         log.info("Setting up cluster {} with {} bookies, {} brokers",
                 spec.clusterName(), spec.numBookies(), spec.numBrokers());
@@ -150,7 +154,7 @@ public abstract class PulsarClusterTestBase extends 
PulsarTestBase {
 
         beforeStartCluster();
 
-        pulsarCluster.start();
+        pulsarCluster.start(doInit);
 
         pulsarAdmin = 
PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java
new file mode 100644
index 00000000000..ddabd67b229
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.upgrade;
+
+import com.github.dockerjava.api.model.Bind;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testng.annotations.Test;
+import java.util.stream.Stream;
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Test upgrading/downgrading Pulsar cluster from major releases.
+ */
+@Slf4j
+public class PulsarUpgradeDowngradeTest extends PulsarClusterTestBase {
+
+    @Test(timeOut=600_000)
+    public void upgradeTest() throws Exception {
+        testUpgradeDowngrade(PulsarContainer.LAST_RELEASE_IMAGE_NAME, 
PulsarContainer.UPGRADE_TEST_IMAGE_NAME);
+    }
+
+    private void testUpgradeDowngrade(String imageOld, String imageNew) throws 
Exception {
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), 
randomName(5))
+                .filter(s -> !s.isEmpty())
+                .collect(joining("-"));
+        String topicName = generateTopicName("testupdown", true);
+
+        @Cleanup
+        Network network = Network.newNetwork();
+        @Cleanup
+        GenericContainer<?> alpine = new 
GenericContainer<>(PulsarContainer.ALPINE_IMAGE_NAME)
+                .withExposedPorts(80)
+                .withNetwork(network)
+                .withNetworkAliases("shared-storage")
+                .withEnv("MAGIC_NUMBER", "42")
+                .withCreateContainerCmdModifier(createContainerCmd -> 
createContainerCmd
+                    .getHostConfig()
+                    .withBinds(Bind.parse("/pulsar/data:/pulsar/data")))
+                .withCommand("/bin/sh", "-c",
+                        "mkdir -p /pulsar/data && "
+                                + "chmod -R ug+rwx /pulsar/data && "
+                                + "chown -R 10000:0 /pulsar/data && "
+                                + "rm -rf /pulsar/data/* && "
+                                + "while true; do echo \"$MAGIC_NUMBER\" | nc 
-l -p 80; done");
+        alpine.start();
+
+        PulsarClusterSpec specOld = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .clusterName(clusterName)
+                .dataContainer(alpine)
+                .pulsarTestImage(imageOld)
+                .build();
+
+        PulsarClusterSpec specNew = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .clusterName(clusterName)
+                .dataContainer(alpine)
+                .pulsarTestImage(imageNew)
+                .build();
+
+        log.info("Setting up OLD cluster {} with {} bookies, {} brokers using 
{}",
+                specOld.clusterName(), specOld.numBookies(), 
specOld.numBrokers(), imageOld);
+
+        pulsarCluster = PulsarCluster.forSpec(specNew, network);
+        pulsarCluster.closeNetworkOnExit = false;
+        pulsarCluster.start(true);
+
+        try {
+            log.info("setting retention");
+            pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "set-retention", "--size", "100M", "--time", "100m", 
"public/default");
+
+            publishAndConsume(topicName, 
pulsarCluster.getPlainTextServiceUrl(), 10, 10);
+        } finally {
+            pulsarCluster.stop();
+        }
+
+        log.info("Upgrading to NEW cluster {} with {} bookies, {} brokers 
using {}",
+                specNew.clusterName(), specNew.numBookies(), 
specNew.numBrokers(), imageNew);
+
+        pulsarCluster = PulsarCluster.forSpec(specNew, network);
+        pulsarCluster.closeNetworkOnExit = false;
+        pulsarCluster.start(false);
+
+        try {
+            publishAndConsume(topicName, 
pulsarCluster.getPlainTextServiceUrl(), 10, 20);
+        } finally {
+            pulsarCluster.stop();
+        }
+
+        log.info("Downgrading to OLD cluster {} with {} bookies, {} brokers 
using {}",
+                specOld.clusterName(), specOld.numBookies(), 
specOld.numBrokers(), imageOld);
+
+        pulsarCluster = PulsarCluster.forSpec(specOld, network);
+        pulsarCluster.closeNetworkOnExit = false;
+        pulsarCluster.start(false);
+
+        try {
+            publishAndConsume(topicName, 
pulsarCluster.getPlainTextServiceUrl(), 10, 30);
+        } finally {
+            pulsarCluster.stop();
+            alpine.stop();
+            network.close();
+        }
+    }
+
+    private void publishAndConsume(String topicName, String serviceUrl, int 
numProduce, int numConsume) throws Exception {
+        log.info("publishAndConsume: topic name: {}", topicName);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .build();
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        log.info("Publishing {} messages", numProduce);
+        for (int i = numConsume - numProduce; i < numConsume; i++) {
+            log.info("Publishing message: {}", "smoke-message-" + i);
+            producer.send("smoke-message-" + i);
+        }
+
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        consumer.seek(MessageId.earliest);
+
+        log.info("Consuming {} messages", numConsume);
+        for (int i = 0; i < numConsume; i++) {
+            log.info("Waiting for message: {}", i);
+            Message<String> m = consumer.receive();
+            log.info("Received message: {}", m.getValue());
+            assertEquals("smoke-message-" + i, m.getValue());
+        }
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-upgrade.xml 
b/tests/integration/src/test/resources/pulsar-upgrade.xml
index a52db547533..dc966b160ba 100644
--- a/tests/integration/src/test/resources/pulsar-upgrade.xml
+++ b/tests/integration/src/test/resources/pulsar-upgrade.xml
@@ -22,7 +22,7 @@
 <suite name="Pulsar Upgrade Integration Tests" verbose="2" annotations="JDK">
     <test name="pulsar-upgrade-test-suite" preserve-order="true" >
         <classes>
-            <class 
name="org.apache.pulsar.tests.integration.upgrade.PulsarZKDowngradeTest" />
+            <class 
name="org.apache.pulsar.tests.integration.upgrade.PulsarUpgradeDowngradeTest" />
         </classes>
     </test>
 </suite>

Reply via email to