This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a72743aa7946c4c0ce196abaf2504bc693093111 Author: Lari Hotari <[email protected]> AuthorDate: Wed Dec 17 05:16:28 2025 +0200 [improve][io] Replace Qpid in tests with RabbitMQ in Testcontainers and upgrade RabbitMQ client version (#25085) (cherry picked from commit d80401faba37d3707c5e1eb92fff29f8040c2a64) --- pom.xml | 2 +- pulsar-io/rabbitmq/pom.xml | 19 +----- .../pulsar/io/rabbitmq/RabbitMQBrokerManager.java | 42 +++++++------ .../pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java | 8 +-- .../io/rabbitmq/source/RabbitMQSourceTest.java | 8 +-- pulsar-io/rabbitmq/src/test/resources/qpid.json | 68 ---------------------- 6 files changed, 31 insertions(+), 116 deletions(-) diff --git a/pom.xml b/pom.xml index ea04ffe534a..b736bbaceb2 100644 --- a/pom.xml +++ b/pom.xml @@ -226,7 +226,7 @@ flexible messaging model and an intuitive client API.</description> <cassandra.version>3.11.2</cassandra.version> <aerospike-client.version>4.5.0</aerospike-client.version> <kafka-client.version>3.9.0</kafka-client.version> - <rabbitmq-client.version>5.18.0</rabbitmq-client.version> + <rabbitmq-client.version>5.28.0</rabbitmq-client.version> <aws-sdk.version>1.12.788</aws-sdk.version> <aws-sdk2.version>2.32.28</aws-sdk2.version> <avro.version>1.11.4</avro.version> diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml index 39b2b3f8105..6f0ea7d1ef8 100644 --- a/pulsar-io/rabbitmq/pom.xml +++ b/pulsar-io/rabbitmq/pom.xml @@ -81,24 +81,9 @@ </dependency> <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-broker</artifactId> - <version>9.2.0</version> + <groupId>org.testcontainers</groupId> + <artifactId>rabbitmq</artifactId> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-bdbstore</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-broker-plugins-derby-store</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.awaitility</groupId> diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java index 4ff8c61e4f4..ef69608eec5 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java @@ -18,36 +18,34 @@ */ package org.apache.pulsar.io.rabbitmq; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; -import org.apache.qpid.server.SystemLauncher; -import org.apache.qpid.server.model.SystemConfig; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.utility.DockerImageName; public class RabbitMQBrokerManager { + private RabbitMQContainer rabbitMQContainer; - private final SystemLauncher systemLauncher = new SystemLauncher(); - - public void startBroker(String port) throws Exception { - Map<String, Object> brokerOptions = getBrokerOptions(port); - systemLauncher.startup(brokerOptions); + public void startBroker() throws Exception { + rabbitMQContainer = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine")); + rabbitMQContainer.withVhost("default"); + rabbitMQContainer.start(); } public void stopBroker() { - systemLauncher.shutdown(); + if (rabbitMQContainer != null) { + rabbitMQContainer.stop(); + rabbitMQContainer = null; + } + } + + public int getPort() { + return rabbitMQContainer.getAmqpPort(); } - Map<String, Object> getBrokerOptions(String port) throws Exception { - Path tmpFolder = Files.createTempDirectory("qpidWork"); - Map<String, Object> config = new HashMap<>(); - config.put("qpid.work_dir", tmpFolder.toAbsolutePath().toString()); - config.put("qpid.amqp_port", port); + public String getUser() { + return rabbitMQContainer.getAdminUsername(); + } - Map<String, Object> context = new HashMap<>(); - context.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:qpid.json"); - context.put(SystemConfig.TYPE, "Memory"); - context.put(SystemConfig.CONTEXT, config); - return context; + public String getPassword() { + return rabbitMQContainer.getAdminPassword(); } } diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java index f03a36ce114..51d09163829 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java @@ -40,7 +40,7 @@ public class RabbitMQSinkTest { @BeforeMethod public void setUp() throws Exception { rabbitMQBrokerManager = new RabbitMQBrokerManager(); - rabbitMQBrokerManager.startBroker("5673"); + rabbitMQBrokerManager.startBroker(); } @AfterMethod(alwaysRun = true) @@ -52,10 +52,10 @@ public class RabbitMQSinkTest { public void testOpenAndWriteSink() throws Exception { Map<String, Object> configs = new HashMap<>(); configs.put("host", "localhost"); - configs.put("port", "5673"); + configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort())); configs.put("virtualHost", "default"); - configs.put("username", "guest"); - configs.put("password", "guest"); + configs.put("username", rabbitMQBrokerManager.getUser()); + configs.put("password", rabbitMQBrokerManager.getPassword()); configs.put("connectionName", "test-connection"); configs.put("requestedChannelMax", "0"); configs.put("requestedFrameMax", "0"); diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java index c798179f60e..4f139880ec1 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java @@ -37,7 +37,7 @@ public class RabbitMQSourceTest { @BeforeMethod public void setUp() throws Exception { rabbitMQBrokerManager = new RabbitMQBrokerManager(); - rabbitMQBrokerManager.startBroker("5672"); + rabbitMQBrokerManager.startBroker(); } @AfterMethod(alwaysRun = true) @@ -49,10 +49,10 @@ public class RabbitMQSourceTest { public void testOpenAndWriteSink() throws Exception { Map<String, Object> configs = new HashMap<>(); configs.put("host", "localhost"); - configs.put("port", "5672"); + configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort())); configs.put("virtualHost", "default"); - configs.put("username", "guest"); - configs.put("password", "guest"); + configs.put("username", rabbitMQBrokerManager.getUser()); + configs.put("password", rabbitMQBrokerManager.getPassword()); configs.put("queueName", "test-queue"); configs.put("connectionName", "test-connection"); configs.put("requestedChannelMax", "0"); diff --git a/pulsar-io/rabbitmq/src/test/resources/qpid.json b/pulsar-io/rabbitmq/src/test/resources/qpid.json deleted file mode 100644 index 419e9cc1e4a..00000000000 --- a/pulsar-io/rabbitmq/src/test/resources/qpid.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "name": "${broker.name}", - "modelVersion": "2.0", - "authenticationproviders": [ - { - "name": "plain", - "type": "Plain", - "secureOnlyMechanisms": [], - "users": [ - { - "name": "guest", - "password": "guest", - "type": "managed" - } - ] - } - ], - "brokerloggers": [ - { - "name": "console", - "type": "Console", - "brokerloginclusionrules": [ - { - "name": "Root", - "type": "NameAndLevel", - "level": "WARN", - "loggerName": "ROOT" - }, - { - "name": "Qpid", - "type": "NameAndLevel", - "level": "INFO", - "loggerName": "org.apache.qpid.*" - }, - { - "name": "Operational", - "type": "NameAndLevel", - "level": "INFO", - "loggerName": "qpid.message.*" - }, - { - "name": "Statistics", - "type": "NameAndLevel", - "level": "INFO", - "loggerName": "qpid.statistics.*" - } - ] - } - ], - "ports": [ - { - "name": "AMQP", - "port": "${qpid.amqp_port}", - "authenticationProvider": "plain", - "protocols": [ - "AMQP_0_9_1" - ] - } - ], - "virtualhostnodes": [ - { - "name": "default", - "type": "Memory", - "defaultVirtualHostNode": "true", - "virtualHostInitialConfiguration": "{\"type\": \"Memory\"}" - } - ] -} \ No newline at end of file
