This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d80401faba3 [improve][io] Replace Qpid in tests with RabbitMQ in
Testcontainers and upgrade RabbitMQ client version (#25085)
d80401faba3 is described below
commit d80401faba37d3707c5e1eb92fff29f8040c2a64
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Dec 17 05:16:28 2025 +0200
[improve][io] Replace Qpid in tests with RabbitMQ in Testcontainers and
upgrade RabbitMQ client version (#25085)
---
pom.xml | 2 +-
pulsar-io/rabbitmq/pom.xml | 19 +-----
.../pulsar/io/rabbitmq/RabbitMQBrokerManager.java | 42 +++++++------
.../pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java | 8 +--
.../io/rabbitmq/source/RabbitMQSourceTest.java | 8 +--
pulsar-io/rabbitmq/src/test/resources/qpid.json | 68 ----------------------
6 files changed, 31 insertions(+), 116 deletions(-)
diff --git a/pom.xml b/pom.xml
index 4f665a86c0d..a88767dcfd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -232,7 +232,7 @@ flexible messaging model and an intuitive client
API.</description>
<cassandra.version>3.11.2</cassandra.version>
<aerospike-client.version>4.5.0</aerospike-client.version>
<kafka-client.version>3.9.1</kafka-client.version>
- <rabbitmq-client.version>5.18.0</rabbitmq-client.version>
+ <rabbitmq-client.version>5.28.0</rabbitmq-client.version>
<aws-sdk.version>1.12.788</aws-sdk.version>
<aws-sdk2.version>2.32.28</aws-sdk2.version>
<avro.version>1.12.0</avro.version>
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 60fed90d55a..bd3282ae207 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -81,24 +81,9 @@
</dependency>
<dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker</artifactId>
- <version>9.2.0</version>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>rabbitmq</artifactId>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-bdbstore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.qpid</groupId>
-
<artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-broker-plugins-derby-store</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
index 4ff8c61e4f4..ef69608eec5 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
@@ -18,36 +18,34 @@
*/
package org.apache.pulsar.io.rabbitmq;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.qpid.server.SystemLauncher;
-import org.apache.qpid.server.model.SystemConfig;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
public class RabbitMQBrokerManager {
+ private RabbitMQContainer rabbitMQContainer;
- private final SystemLauncher systemLauncher = new SystemLauncher();
-
- public void startBroker(String port) throws Exception {
- Map<String, Object> brokerOptions = getBrokerOptions(port);
- systemLauncher.startup(brokerOptions);
+ public void startBroker() throws Exception {
+ rabbitMQContainer = new
RabbitMQContainer(DockerImageName.parse("rabbitmq:3.7.25-management-alpine"));
+ rabbitMQContainer.withVhost("default");
+ rabbitMQContainer.start();
}
public void stopBroker() {
- systemLauncher.shutdown();
+ if (rabbitMQContainer != null) {
+ rabbitMQContainer.stop();
+ rabbitMQContainer = null;
+ }
+ }
+
+ public int getPort() {
+ return rabbitMQContainer.getAmqpPort();
}
- Map<String, Object> getBrokerOptions(String port) throws Exception {
- Path tmpFolder = Files.createTempDirectory("qpidWork");
- Map<String, Object> config = new HashMap<>();
- config.put("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
- config.put("qpid.amqp_port", port);
+ public String getUser() {
+ return rabbitMQContainer.getAdminUsername();
+ }
- Map<String, Object> context = new HashMap<>();
- context.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION,
"classpath:qpid.json");
- context.put(SystemConfig.TYPE, "Memory");
- context.put(SystemConfig.CONTEXT, config);
- return context;
+ public String getPassword() {
+ return rabbitMQContainer.getAdminPassword();
}
}
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
index f03a36ce114..51d09163829 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -40,7 +40,7 @@ public class RabbitMQSinkTest {
@BeforeMethod
public void setUp() throws Exception {
rabbitMQBrokerManager = new RabbitMQBrokerManager();
- rabbitMQBrokerManager.startBroker("5673");
+ rabbitMQBrokerManager.startBroker();
}
@AfterMethod(alwaysRun = true)
@@ -52,10 +52,10 @@ public class RabbitMQSinkTest {
public void testOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
- configs.put("port", "5673");
+ configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort()));
configs.put("virtualHost", "default");
- configs.put("username", "guest");
- configs.put("password", "guest");
+ configs.put("username", rabbitMQBrokerManager.getUser());
+ configs.put("password", rabbitMQBrokerManager.getPassword());
configs.put("connectionName", "test-connection");
configs.put("requestedChannelMax", "0");
configs.put("requestedFrameMax", "0");
diff --git
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
index c798179f60e..4f139880ec1 100644
---
a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
+++
b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
@@ -37,7 +37,7 @@ public class RabbitMQSourceTest {
@BeforeMethod
public void setUp() throws Exception {
rabbitMQBrokerManager = new RabbitMQBrokerManager();
- rabbitMQBrokerManager.startBroker("5672");
+ rabbitMQBrokerManager.startBroker();
}
@AfterMethod(alwaysRun = true)
@@ -49,10 +49,10 @@ public class RabbitMQSourceTest {
public void testOpenAndWriteSink() throws Exception {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "localhost");
- configs.put("port", "5672");
+ configs.put("port", String.valueOf(rabbitMQBrokerManager.getPort()));
configs.put("virtualHost", "default");
- configs.put("username", "guest");
- configs.put("password", "guest");
+ configs.put("username", rabbitMQBrokerManager.getUser());
+ configs.put("password", rabbitMQBrokerManager.getPassword());
configs.put("queueName", "test-queue");
configs.put("connectionName", "test-connection");
configs.put("requestedChannelMax", "0");
diff --git a/pulsar-io/rabbitmq/src/test/resources/qpid.json
b/pulsar-io/rabbitmq/src/test/resources/qpid.json
deleted file mode 100644
index 419e9cc1e4a..00000000000
--- a/pulsar-io/rabbitmq/src/test/resources/qpid.json
+++ /dev/null
@@ -1,68 +0,0 @@
-{
- "name": "${broker.name}",
- "modelVersion": "2.0",
- "authenticationproviders": [
- {
- "name": "plain",
- "type": "Plain",
- "secureOnlyMechanisms": [],
- "users": [
- {
- "name": "guest",
- "password": "guest",
- "type": "managed"
- }
- ]
- }
- ],
- "brokerloggers": [
- {
- "name": "console",
- "type": "Console",
- "brokerloginclusionrules": [
- {
- "name": "Root",
- "type": "NameAndLevel",
- "level": "WARN",
- "loggerName": "ROOT"
- },
- {
- "name": "Qpid",
- "type": "NameAndLevel",
- "level": "INFO",
- "loggerName": "org.apache.qpid.*"
- },
- {
- "name": "Operational",
- "type": "NameAndLevel",
- "level": "INFO",
- "loggerName": "qpid.message.*"
- },
- {
- "name": "Statistics",
- "type": "NameAndLevel",
- "level": "INFO",
- "loggerName": "qpid.statistics.*"
- }
- ]
- }
- ],
- "ports": [
- {
- "name": "AMQP",
- "port": "${qpid.amqp_port}",
- "authenticationProvider": "plain",
- "protocols": [
- "AMQP_0_9_1"
- ]
- }
- ],
- "virtualhostnodes": [
- {
- "name": "default",
- "type": "Memory",
- "defaultVirtualHostNode": "true",
- "virtualHostInitialConfiguration": "{\"type\": \"Memory\"}"
- }
- ]
-}
\ No newline at end of file