This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9579c4dea91 [improve][ci] Upgrade/Downgrade test (#22988)
9579c4dea91 is described below
commit 9579c4dea9120ed29523d7eb56c1b0637aaa9bc4
Author: Andrey Yegorov <[email protected]>
AuthorDate: Tue Oct 8 15:22:41 2024 -0700
[improve][ci] Upgrade/Downgrade test (#22988)
---
.github/workflows/pulsar-ci.yaml | 3 +
build/run_integration_group.sh | 4 +
.../integration/containers/PulsarContainer.java | 5 +
.../integration/topologies/PulsarCluster.java | 58 +++++--
.../integration/topologies/PulsarClusterSpec.java | 6 +
.../topologies/PulsarClusterTestBase.java | 6 +-
.../upgrade/PulsarUpgradeDowngradeTest.java | 175 +++++++++++++++++++++
.../src/test/resources/pulsar-upgrade.xml | 2 +-
8 files changed, 243 insertions(+), 16 deletions(-)
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 091dab25ec6..47a39bef9c9 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -603,6 +603,9 @@ jobs:
- name: Metrics
group: METRICS
+ - name: Upgrade
+ group: UPGRADE
+
steps:
- name: checkout
uses: actions/checkout@v4
diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh
index 2d82fce0887..63b92d4e0a7 100755
--- a/build/run_integration_group.sh
+++ b/build/run_integration_group.sh
@@ -177,6 +177,10 @@ test_group_standalone() {
mvn_run_integration_test "$@"
-DintegrationTestSuiteFile=pulsar-standalone.xml -DintegrationTests
}
+test_group_upgrade() {
+ mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-upgrade.xml
-DintegrationTests
+}
+
test_group_transaction() {
mvn_run_integration_test "$@"
-DintegrationTestSuiteFile=pulsar-transaction.xml -DintegrationTests
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 77cdc1bfd28..3cdb048aea5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -51,8 +51,13 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
public static final int BROKER_HTTP_PORT = 8080;
public static final int BROKER_HTTPS_PORT = 8081;
+ public static final String ALPINE_IMAGE_NAME = "alpine:3.20";
public static final String DEFAULT_IMAGE_NAME =
System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME",
"apachepulsar/pulsar-test-latest-version:latest");
+ public static final String UPGRADE_TEST_IMAGE_NAME =
System.getenv().getOrDefault("PULSAR_UPGRADE_TEST_IMAGE_NAME",
+ DEFAULT_IMAGE_NAME);
+ public static final String LAST_RELEASE_IMAGE_NAME =
System.getenv().getOrDefault("PULSAR_LAST_RELEASE_IMAGE_NAME",
+ "apachepulsar/pulsar:3.0.7");
public static final String DEFAULT_HTTP_PATH = "/metrics";
public static final String PULSAR_2_5_IMAGE_NAME =
"apachepulsar/pulsar:2.5.0";
public static final String PULSAR_2_4_IMAGE_NAME =
"apachepulsar/pulsar:2.4.0";
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 90f08a96394..35fb453c4bb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -72,22 +72,28 @@ public class PulsarCluster {
* @return the built pulsar cluster
*/
public static PulsarCluster forSpec(PulsarClusterSpec spec) {
+ return forSpec(spec, Network.newNetwork());
+ }
+
+ public static PulsarCluster forSpec(PulsarClusterSpec spec, Network
network) {
+ checkArgument(network != null, "Network should not be null");
CSContainer csContainer = null;
if (!spec.enableOxia) {
csContainer = new CSContainer(spec.clusterName)
- .withNetwork(Network.newNetwork())
+ .withNetwork(network)
.withNetworkAliases(CSContainer.NAME);
}
- return new PulsarCluster(spec, csContainer, false);
+ return new PulsarCluster(spec, network, csContainer, false);
}
public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer
csContainer) {
- return new PulsarCluster(spec, csContainer, true);
+ return new PulsarCluster(spec, csContainer.getNetwork(), csContainer,
true);
}
@Getter
private final PulsarClusterSpec spec;
+ public boolean closeNetworkOnExit = true;
@Getter
private final String clusterName;
private final Network network;
@@ -108,19 +114,18 @@ public class PulsarCluster {
private final String metadataStoreUrl;
private final String configurationMetadataStoreUrl;
- private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer,
boolean sharedCsContainer) {
-
+ private PulsarCluster(PulsarClusterSpec spec, Network network, CSContainer
csContainer, boolean sharedCsContainer) {
this.spec = spec;
this.sharedCsContainer = sharedCsContainer;
this.clusterName = spec.clusterName();
- if (csContainer != null ) {
+ if (network != null) {
+ this.network = network;
+ } else if (csContainer != null) {
this.network = csContainer.getNetwork();
} else {
this.network = Network.newNetwork();
}
-
-
if (spec.enableOxia) {
this.zkContainer = null;
this.oxiaContainer = new OxiaContainer(clusterName);
@@ -203,7 +208,9 @@ public class PulsarCluster {
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold",
"0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold",
"0.97")
- .withEnv("nettyMaxFrameSizeBytes",
String.valueOf(spec.maxMessageSize));
+ .withEnv("nettyMaxFrameSizeBytes",
String.valueOf(spec.maxMessageSize))
+ .withEnv("ledgerDirectories", "data/bookkeeper/" +
name + "/ledgers")
+ .withEnv("journalDirectory", "data/bookkeeper/" +
name + "/journal");
if (spec.bookkeeperEnvs != null) {
bookieContainer.withEnv(spec.bookkeeperEnvs);
}
@@ -262,10 +269,27 @@ public class PulsarCluster {
}
));
+ if (spec.dataContainer != null) {
+ if (!sharedCsContainer && csContainer != null) {
+ csContainer.withVolumesFrom(spec.dataContainer,
BindMode.READ_WRITE);
+ }
+ if (zkContainer != null) {
+ zkContainer.withVolumesFrom(spec.dataContainer,
BindMode.READ_WRITE);
+ }
+ proxyContainer.withVolumesFrom(spec.dataContainer,
BindMode.READ_WRITE);
+
+ bookieContainers.values().forEach(c ->
c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
+ brokerContainers.values().forEach(c ->
c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
+ workerContainers.values().forEach(c ->
c.withVolumesFrom(spec.dataContainer, BindMode.READ_WRITE));
+ }
+
spec.classPathVolumeMounts.forEach((key, value) -> {
if (zkContainer != null) {
zkContainer.withClasspathResourceMapping(key, value,
BindMode.READ_WRITE);
}
+ if (!sharedCsContainer && csContainer != null) {
+ csContainer.withClasspathResourceMapping(key, value,
BindMode.READ_WRITE);
+ }
proxyContainer.withClasspathResourceMapping(key, value,
BindMode.READ_WRITE);
bookieContainers.values().forEach(c ->
c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
@@ -323,6 +347,10 @@ public class PulsarCluster {
}
public void start() throws Exception {
+ start(true);
+ }
+
+ public void start(boolean doInit) throws Exception {
if (!spec.enableOxia) {
// start the local zookeeper
@@ -338,7 +366,7 @@ public class PulsarCluster {
oxiaContainer.start();
}
- {
+ if (doInit) {
// Run cluster metadata initialization
@Cleanup
PulsarInitMetadataContainer init = new PulsarInitMetadataContainer(
@@ -453,10 +481,12 @@ public class PulsarCluster {
oxiaContainer.stop();
}
- try {
- network.close();
- } catch (Exception e) {
- log.info("Failed to shutdown network for pulsar cluster {}",
clusterName, e);
+ if (closeNetworkOnExit) {
+ try {
+ network.close();
+ } catch (Exception e) {
+ log.info("Failed to shutdown network for pulsar cluster {}",
clusterName, e);
+ }
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 8a991be49fa..ca45c9b7c9b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -124,6 +124,12 @@ public class PulsarClusterSpec {
@Builder.Default
Map<String, String> classPathVolumeMounts = new TreeMap<>();
+ /**
+ * Data container
+ */
+ @Builder.Default
+ GenericContainer<?> dataContainer = null;
+
/**
* Pulsar Test Image Name
*
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
index 93e2221ab24..8b99f213735 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java
@@ -142,6 +142,10 @@ public abstract class PulsarClusterTestBase extends
PulsarTestBase {
}
protected void setupCluster(PulsarClusterSpec spec) throws Exception {
+ setupCluster(spec, true);
+ }
+
+ protected void setupCluster(PulsarClusterSpec spec, boolean doInit) throws
Exception {
incrementSetupNumber();
log.info("Setting up cluster {} with {} bookies, {} brokers",
spec.clusterName(), spec.numBookies(), spec.numBrokers());
@@ -150,7 +154,7 @@ public abstract class PulsarClusterTestBase extends
PulsarTestBase {
beforeStartCluster();
- pulsarCluster.start();
+ pulsarCluster.start(doInit);
pulsarAdmin =
PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java
new file mode 100644
index 00000000000..ddabd67b229
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/upgrade/PulsarUpgradeDowngradeTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.upgrade;
+
+import com.github.dockerjava.api.model.Bind;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testng.annotations.Test;
+import java.util.stream.Stream;
+import static java.util.stream.Collectors.joining;
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Test upgrading/downgrading Pulsar cluster from major releases.
+ */
+@Slf4j
+public class PulsarUpgradeDowngradeTest extends PulsarClusterTestBase {
+
+ @Test(timeOut=600_000)
+ public void upgradeTest() throws Exception {
+ testUpgradeDowngrade(PulsarContainer.LAST_RELEASE_IMAGE_NAME,
PulsarContainer.UPGRADE_TEST_IMAGE_NAME);
+ }
+
+ private void testUpgradeDowngrade(String imageOld, String imageNew) throws
Exception {
+ final String clusterName = Stream.of(this.getClass().getSimpleName(),
randomName(5))
+ .filter(s -> !s.isEmpty())
+ .collect(joining("-"));
+ String topicName = generateTopicName("testupdown", true);
+
+ @Cleanup
+ Network network = Network.newNetwork();
+ @Cleanup
+ GenericContainer<?> alpine = new
GenericContainer<>(PulsarContainer.ALPINE_IMAGE_NAME)
+ .withExposedPorts(80)
+ .withNetwork(network)
+ .withNetworkAliases("shared-storage")
+ .withEnv("MAGIC_NUMBER", "42")
+ .withCreateContainerCmdModifier(createContainerCmd ->
createContainerCmd
+ .getHostConfig()
+ .withBinds(Bind.parse("/pulsar/data:/pulsar/data")))
+ .withCommand("/bin/sh", "-c",
+ "mkdir -p /pulsar/data && "
+ + "chmod -R ug+rwx /pulsar/data && "
+ + "chown -R 10000:0 /pulsar/data && "
+ + "rm -rf /pulsar/data/* && "
+ + "while true; do echo \"$MAGIC_NUMBER\" | nc
-l -p 80; done");
+ alpine.start();
+
+ PulsarClusterSpec specOld = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(1)
+ .clusterName(clusterName)
+ .dataContainer(alpine)
+ .pulsarTestImage(imageOld)
+ .build();
+
+ PulsarClusterSpec specNew = PulsarClusterSpec.builder()
+ .numBookies(2)
+ .numBrokers(1)
+ .clusterName(clusterName)
+ .dataContainer(alpine)
+ .pulsarTestImage(imageNew)
+ .build();
+
+ log.info("Setting up OLD cluster {} with {} bookies, {} brokers using
{}",
+ specOld.clusterName(), specOld.numBookies(),
specOld.numBrokers(), imageOld);
+
+ pulsarCluster = PulsarCluster.forSpec(specNew, network);
+ pulsarCluster.closeNetworkOnExit = false;
+ pulsarCluster.start(true);
+
+ try {
+ log.info("setting retention");
+ pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+ "set-retention", "--size", "100M", "--time", "100m",
"public/default");
+
+ publishAndConsume(topicName,
pulsarCluster.getPlainTextServiceUrl(), 10, 10);
+ } finally {
+ pulsarCluster.stop();
+ }
+
+ log.info("Upgrading to NEW cluster {} with {} bookies, {} brokers
using {}",
+ specNew.clusterName(), specNew.numBookies(),
specNew.numBrokers(), imageNew);
+
+ pulsarCluster = PulsarCluster.forSpec(specNew, network);
+ pulsarCluster.closeNetworkOnExit = false;
+ pulsarCluster.start(false);
+
+ try {
+ publishAndConsume(topicName,
pulsarCluster.getPlainTextServiceUrl(), 10, 20);
+ } finally {
+ pulsarCluster.stop();
+ }
+
+ log.info("Downgrading to OLD cluster {} with {} bookies, {} brokers
using {}",
+ specOld.clusterName(), specOld.numBookies(),
specOld.numBrokers(), imageOld);
+
+ pulsarCluster = PulsarCluster.forSpec(specOld, network);
+ pulsarCluster.closeNetworkOnExit = false;
+ pulsarCluster.start(false);
+
+ try {
+ publishAndConsume(topicName,
pulsarCluster.getPlainTextServiceUrl(), 10, 30);
+ } finally {
+ pulsarCluster.stop();
+ alpine.stop();
+ network.close();
+ }
+ }
+
+ private void publishAndConsume(String topicName, String serviceUrl, int
numProduce, int numConsume) throws Exception {
+ log.info("publishAndConsume: topic name: {}", topicName);
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .build();
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+
+ log.info("Publishing {} messages", numProduce);
+ for (int i = numConsume - numProduce; i < numConsume; i++) {
+ log.info("Publishing message: {}", "smoke-message-" + i);
+ producer.send("smoke-message-" + i);
+ }
+
+ @Cleanup
+ Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName("my-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ consumer.seek(MessageId.earliest);
+
+ log.info("Consuming {} messages", numConsume);
+ for (int i = 0; i < numConsume; i++) {
+ log.info("Waiting for message: {}", i);
+ Message<String> m = consumer.receive();
+ log.info("Received message: {}", m.getValue());
+ assertEquals("smoke-message-" + i, m.getValue());
+ }
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-upgrade.xml
b/tests/integration/src/test/resources/pulsar-upgrade.xml
index a52db547533..dc966b160ba 100644
--- a/tests/integration/src/test/resources/pulsar-upgrade.xml
+++ b/tests/integration/src/test/resources/pulsar-upgrade.xml
@@ -22,7 +22,7 @@
<suite name="Pulsar Upgrade Integration Tests" verbose="2" annotations="JDK">
<test name="pulsar-upgrade-test-suite" preserve-order="true" >
<classes>
- <class
name="org.apache.pulsar.tests.integration.upgrade.PulsarZKDowngradeTest" />
+ <class
name="org.apache.pulsar.tests.integration.upgrade.PulsarUpgradeDowngradeTest" />
</classes>
</test>
</suite>