This is an automated email from the ASF dual-hosted git repository. fmariani pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 99b01346845 CAMEL-19610: Upgrade to RocketMQ v5 99b01346845 is described below commit 99b0134684548c775c27ba1870084d437c7ce15b Author: Croway <federico.mariani.1...@gmail.com> AuthorDate: Tue Jul 18 12:07:26 2023 +0200 CAMEL-19610: Upgrade to RocketMQ v5 --- components/camel-rocketmq/pom.xml | 31 +----- .../rocketmq/RocketMQRequestReplyRouteTest.java | 39 ++++--- .../component/rocketmq/RocketMQRouteTest.java | 30 +++--- .../rocketmq/infra/EmbeddedRocketMQServer.java | 101 ------------------- parent/pom.xml | 2 +- pom.xml | 1 + test-infra/camel-test-infra-rocketmq/pom.xml | 54 ++++++++++ .../src/main/resources/META-INF/MANIFEST.MF | 0 .../infra/rocketmq/common/RocketMQProperties.java | 31 ++++++ .../rocketmq/services/RocketMQBrokerContainer.java | 49 +++++++++ .../infra/rocketmq/services/RocketMQContainer.java | 112 +++++++++++++++++++++ .../services/RocketMQNameserverContainer.java | 39 +++++++ .../infra/rocketmq/services/RocketMQService.java | 35 +++++++ .../rocketmq/services/RocketMQServiceFactory.java | 35 +++++++ .../src/test/resources/broker1/broker1.conf | 24 +++++ .../src/test/resources/broker2/broker2.conf | 24 +++++ test-infra/pom.xml | 1 + 17 files changed, 440 insertions(+), 168 deletions(-) diff --git a/components/camel-rocketmq/pom.xml b/components/camel-rocketmq/pom.xml index 695f40801d1..8aa781e868e 100644 --- a/components/camel-rocketmq/pom.xml +++ b/components/camel-rocketmq/pom.xml @@ -32,10 +32,6 @@ <name>Camel :: RocketMQ</name> <description>Camel RocketMQ Component</description> - <properties> - <camel.surefire.fork.additional-vmargs>--add-opens java.base/java.nio=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED</camel.surefire.fork.additional-vmargs> - </properties> - <dependencies> <dependency> <groupId>org.apache.camel</groupId> @@ -54,31 +50,14 @@ <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-junit5</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-namesrv</artifactId> - <version>${rocketmq-version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-broker</artifactId> - <version>${rocketmq-version}</version> + <artifactId>camel-test-infra-rocketmq</artifactId> + <version>${project.version}</version> + <type>test-jar</type> <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-test</artifactId> - <version>${rocketmq-version}</version> - <exclusions> - <exclusion> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-junit5</artifactId> <scope>test</scope> </dependency> </dependencies> diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java index 78f801834ae..8d0650b1517 100644 --- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java +++ b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRequestReplyRouteTest.java @@ -17,34 +17,31 @@ package org.apache.camel.component.rocketmq; +import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.camel.CamelContext; import org.apache.camel.ExchangePattern; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer; +import org.apache.camel.test.infra.rocketmq.services.RocketMQService; +import org.apache.camel.test.infra.rocketmq.services.RocketMQServiceFactory; import org.apache.camel.test.junit5.CamelTestSupport; -import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.namesrv.NamesrvController; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; public class RocketMQRequestReplyRouteTest extends CamelTestSupport { - private static final int NAMESRV_PORT = 59877; - - private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT; - private static final String START_ENDPOINT_URI = "rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1"; private static final String INTERMEDIATE_ENDPOINT_URI = "rocketmq:INTERMEDIATE_TOPIC" + @@ -58,23 +55,20 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport { private static final String EXPECTED_MESSAGE = "Hi."; - private static NamesrvController namesrvController; - - private static BrokerController brokerController; - private MockEndpoint resultEndpoint; private DefaultMQPushConsumer replierConsumer; private DefaultMQProducer replierProducer; + @RegisterExtension + public static RocketMQService rocketMQService = RocketMQServiceFactory.createService(); + @BeforeAll static void beforeAll() throws Exception { - namesrvController = EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT); - brokerController = EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR); - EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "START_TOPIC"); - EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "INTERMEDIATE_TOPIC"); - EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "REPLY_TO_TOPIC"); + rocketMQService.createTopic("START_TOPIC"); + rocketMQService.createTopic("INTERMEDIATE_TOPIC"); + rocketMQService.createTopic("REPLY_TO_TOPIC"); } @Override @@ -83,10 +77,10 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport { super.setUp(); resultEndpoint = (MockEndpoint) context.getEndpoint(RESULT_ENDPOINT_URI); replierProducer = new DefaultMQProducer("replierProducer"); - replierProducer.setNamesrvAddr(NAMESRV_ADDR); + replierProducer.setNamesrvAddr(rocketMQService.nameserverAddress()); replierProducer.start(); replierConsumer = new DefaultMQPushConsumer("replierConsumer"); - replierConsumer.setNamesrvAddr(NAMESRV_ADDR); + replierConsumer.setNamesrvAddr(rocketMQService.nameserverAddress()); replierConsumer.subscribe("INTERMEDIATE_TOPIC", "*"); replierConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, unused) -> { MessageExt messageExt = msgs.get(0); @@ -106,7 +100,7 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport { protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); RocketMQComponent rocketMQComponent = new RocketMQComponent(); - rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR); + rocketMQComponent.setNamesrvAddr(rocketMQService.nameserverAddress()); camelContext.addComponent("rocketmq", rocketMQComponent); return camelContext; } @@ -141,8 +135,9 @@ public class RocketMQRequestReplyRouteTest extends CamelTestSupport { } @AfterAll - public static void afterAll() { - brokerController.shutdown(); - namesrvController.shutdown(); + public static void afterAll() throws IOException, InterruptedException { + rocketMQService.deleteTopic("START_TOPIC"); + rocketMQService.deleteTopic("INTERMEDIATE_TOPIC"); + rocketMQService.deleteTopic("REPLY_TO_TOPIC"); } } diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java index 8deba43ad8d..d1eaed5c72e 100644 --- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java +++ b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/RocketMQRouteTest.java @@ -17,41 +17,36 @@ package org.apache.camel.component.rocketmq; +import java.io.IOException; + import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.component.rocketmq.infra.EmbeddedRocketMQServer; +import org.apache.camel.test.infra.rocketmq.services.RocketMQService; +import org.apache.camel.test.infra.rocketmq.services.RocketMQServiceFactory; import org.apache.camel.test.junit5.CamelTestSupport; -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.namesrv.NamesrvController; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; public class RocketMQRouteTest extends CamelTestSupport { public static final String EXPECTED_MESSAGE = "hello, RocketMQ."; - private static final int NAMESRV_PORT = 59876; - - private static final String NAMESRV_ADDR = "127.0.0.1:" + NAMESRV_PORT; - private static final String START_ENDPOINT_URI = "rocketmq:START_TOPIC?producerGroup=p1&consumerGroup=c1&sendTag=startTag"; private static final String RESULT_ENDPOINT_URI = "mock:result"; - private static NamesrvController namesrvController; - - private static BrokerController brokerController; - private MockEndpoint resultEndpoint; + @RegisterExtension + public static RocketMQService rocketMQService = RocketMQServiceFactory.createService(); + @BeforeAll static void beforeAll() throws Exception { - namesrvController = EmbeddedRocketMQServer.createAndStartNamesrv(NAMESRV_PORT); - brokerController = EmbeddedRocketMQServer.createAndStartBroker(NAMESRV_ADDR); - EmbeddedRocketMQServer.createTopic(NAMESRV_ADDR, "DefaultCluster", "START_TOPIC"); + rocketMQService.createTopic("START_TOPIC"); } @Override @@ -65,7 +60,7 @@ public class RocketMQRouteTest extends CamelTestSupport { protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); RocketMQComponent rocketMQComponent = new RocketMQComponent(); - rocketMQComponent.setNamesrvAddr(NAMESRV_ADDR); + rocketMQComponent.setNamesrvAddr(rocketMQService.nameserverAddress()); camelContext.addComponent("rocketmq", rocketMQComponent); return camelContext; } @@ -93,8 +88,7 @@ public class RocketMQRouteTest extends CamelTestSupport { } @AfterAll - public static void afterAll() { - brokerController.shutdown(); - namesrvController.shutdown(); + public static void afterAll() throws IOException, InterruptedException { + rocketMQService.deleteTopic("START_TOPIC"); } } diff --git a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java b/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java deleted file mode 100644 index 153f6c4ca47..00000000000 --- a/components/camel-rocketmq/src/test/java/org/apache/camel/component/rocketmq/infra/EmbeddedRocketMQServer.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.component.rocketmq.infra; - -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.namesrv.NamesrvConfig; -import org.apache.rocketmq.namesrv.NamesrvController; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.apache.rocketmq.test.util.MQAdmin; -import org.apache.rocketmq.test.util.TestUtils; - -public final class EmbeddedRocketMQServer { - - private static final AtomicInteger BROKER_INDEX = new AtomicInteger(); - - private static final AtomicInteger BROKER_PORTS = new AtomicInteger(61000); - - private EmbeddedRocketMQServer() { - } - - public static NamesrvController createAndStartNamesrv(int port) throws Exception { - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(port); - NamesrvController result = new NamesrvController(new NamesrvConfig(), serverConfig); - result.initialize(); - result.start(); - return result; - } - - public static BrokerController createAndStartBroker(String nsAddr) throws Exception { - NettyServerConfig nettyServerConfig = new NettyServerConfig(); - nettyServerConfig.setListenPort(BROKER_PORTS.getAndIncrement()); - BrokerController result = new BrokerController( - prepareBrokerConfig(nsAddr), nettyServerConfig, new NettyClientConfig(), prepareMessageStoreConfig()); - result.initialize(); - result.start(); - return result; - } - - private static BrokerConfig prepareBrokerConfig(final String nsAddr) { - BrokerConfig brokerConfig = new BrokerConfig(); - brokerConfig.setNamesrvAddr(nsAddr); - brokerConfig.setBrokerName("CamelRocketMQBroker" + BROKER_INDEX.getAndIncrement()); - brokerConfig.setBrokerIP1("127.0.0.1"); - brokerConfig.setSendMessageThreadPoolNums(1); - brokerConfig.setPutMessageFutureThreadPoolNums(1); - brokerConfig.setPullMessageThreadPoolNums(1); - brokerConfig.setProcessReplyMessageThreadPoolNums(1); - brokerConfig.setQueryMessageThreadPoolNums(1); - brokerConfig.setAdminBrokerThreadPoolNums(1); - brokerConfig.setClientManageThreadPoolNums(1); - brokerConfig.setConsumerManageThreadPoolNums(1); - brokerConfig.setHeartbeatThreadPoolNums(1); - return brokerConfig; - } - - private static MessageStoreConfig prepareMessageStoreConfig() { - String baseDir = System.getProperty("java.io.tmpdir") + "/embedded_rocketmq/" + System.currentTimeMillis(); - MessageStoreConfig storeConfig = new MessageStoreConfig(); - storeConfig.setStorePathRootDir(baseDir); - storeConfig.setStorePathCommitLog(baseDir + "/commitlog"); - storeConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10); - storeConfig.setMaxIndexNum(100); - storeConfig.setMaxHashSlotNum(400); - storeConfig.setHaListenPort(BROKER_PORTS.getAndIncrement()); - return storeConfig; - } - - public static void createTopic(String namesrvAddr, String defaultCluster, String topic) throws TimeoutException { - long startTime = System.currentTimeMillis(); - while (!MQAdmin.createTopic(namesrvAddr, defaultCluster, topic, 4, 3)) { - if (System.currentTimeMillis() - startTime > 30 * 1000) { - throw new TimeoutException( - String.format("Failed to create topic [%s] after %d ms", topic, - System.currentTimeMillis() - startTime)); - } - TestUtils.waitForMoment(500); - } - } -} diff --git a/parent/pom.xml b/parent/pom.xml index ce6e4fd75e2..2ba089114b4 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -415,7 +415,7 @@ <rest-assured-version>5.3.1</rest-assured-version> <roaster-version>2.28.0.Final</roaster-version> <robotframework-version>4.1.2</robotframework-version> - <rocketmq-version>4.9.7</rocketmq-version> + <rocketmq-version>5.1.3</rocketmq-version> <rome-version>2.1.0</rome-version> <rssreader-version>3.4.5</rssreader-version> <rxjava2-version>2.2.21</rxjava2-version> diff --git a/pom.xml b/pom.xml index 68269a4c855..1b9a1ec7b7c 100644 --- a/pom.xml +++ b/pom.xml @@ -343,6 +343,7 @@ </excludes> <mapping> <Adapter>CAMEL_PROPERTIES_STYLE</Adapter> + <conf>SCRIPT_STYLE</conf> <Dockerfile.jvm>SCRIPT_STYLE</Dockerfile.jvm> <Dockerfile.legacy-jar>SCRIPT_STYLE</Dockerfile.legacy-jar> <Dockerfile.native-micro>SCRIPT_STYLE</Dockerfile.native-micro> diff --git a/test-infra/camel-test-infra-rocketmq/pom.xml b/test-infra/camel-test-infra-rocketmq/pom.xml new file mode 100644 index 00000000000..696f098d2ee --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>camel-test-infra-parent</artifactId> + <groupId>org.apache.camel</groupId> + <relativePath>../camel-test-infra-parent/pom.xml</relativePath> + <version>4.0.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>camel-test-infra-rocketmq</artifactId> + <name>Camel :: Test Infra :: RocketMQ</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>${awaitility-version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <version>${testcontainers-version}</version> + </dependency> + </dependencies> + + +</project> diff --git a/test-infra/camel-test-infra-rocketmq/src/main/resources/META-INF/MANIFEST.MF b/test-infra/camel-test-infra-rocketmq/src/main/resources/META-INF/MANIFEST.MF new file mode 100644 index 00000000000..e69de29bb2d diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java new file mode 100644 index 00000000000..065d1040e5e --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/common/RocketMQProperties.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.rocketmq.common; + +public final class RocketMQProperties { + + public static final String ROCKETMQ_VERSION_PROPERTY = "itest.rocketmq.container.image.version"; + public static final String ROCKETMQ_IMAGE_PROPERTY = "itest.rocketmq.container.image"; + public static final int ROCKETMQ_NAMESRV_PORT = 9876; + public static final int ROCKETMQ_BROKER1_PORT = 10909; + public static final int ROCKETMQ_BROKER2_PORT = 10911; + public static final int ROCKETMQ_BROKER3_PORT = 10912; + + private RocketMQProperties() { + + } +} diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java new file mode 100644 index 00000000000..a3b5f48a132 --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQBrokerContainer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.rocketmq.services; + +import java.util.Collections; + +import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; + +public class RocketMQBrokerContainer extends GenericContainer<RocketMQBrokerContainer> { + + public RocketMQBrokerContainer(Network network, String confName) { + super(RocketMQContainer.ROCKETMQ_IMAGE); + + withNetwork(network); + withExposedPorts(RocketMQProperties.ROCKETMQ_BROKER3_PORT, + RocketMQProperties.ROCKETMQ_BROKER2_PORT, + RocketMQProperties.ROCKETMQ_BROKER1_PORT); + withEnv("NAMESRV_ADDR", "nameserver:9876"); + withClasspathResourceMapping(confName + "/" + confName + ".conf", + "/opt/rocketmq-" + RocketMQContainer.ROCKETMQ_VERSION + "/conf/broker.conf", + BindMode.READ_WRITE); + + withTmpFs(Collections.singletonMap("/home/rocketmq/store", "rw")); + withTmpFs(Collections.singletonMap("/home/rocketmq/logs", "rw")); + withCommand("sh", "mqbroker", + "-c", "/opt/rocketmq-" + RocketMQContainer.ROCKETMQ_VERSION + "/conf/broker.conf"); + + waitingFor(Wait.forListeningPort()); + withCreateContainerCmdModifier(cmd -> cmd.withName(confName)); + } +} diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java new file mode 100644 index 00000000000..8850356e393 --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQContainer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.rocketmq.services; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.test.infra.common.services.ContainerService; +import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.Network; + +public class RocketMQContainer implements RocketMQService, ContainerService<RocketMQNameserverContainer> { + private static final Logger LOG = LoggerFactory.getLogger(RocketMQContainer.class); + public static final String ROCKETMQ_VERSION = System.getProperty(RocketMQProperties.ROCKETMQ_VERSION_PROPERTY, + "5.1.3"); + public static final String ROCKETMQ_IMAGE = System.getProperty(RocketMQProperties.ROCKETMQ_IMAGE_PROPERTY, + "apache/rocketmq:" + ROCKETMQ_VERSION); + + private final RocketMQNameserverContainer nameserverContainer; + private final RocketMQBrokerContainer brokerContainer1; + private final RocketMQBrokerContainer brokerContainer2; + + public RocketMQContainer() { + Network network = Network.newNetwork(); + + nameserverContainer = new RocketMQNameserverContainer(network); + + brokerContainer1 = new RocketMQBrokerContainer(network, "broker1"); + brokerContainer2 = new RocketMQBrokerContainer(network, "broker2"); + } + + @Override + public RocketMQNameserverContainer getContainer() { + return nameserverContainer; + } + + @Override + public void registerProperties() { + + } + + @Override + public void initialize() { + nameserverContainer.start(); + LOG.info("Apache RocketMQ running at address {}", nameserverAddress()); + + brokerContainer1.start(); + brokerContainer2.start(); + } + + @Override + public void shutdown() { + nameserverContainer.stop(); + brokerContainer1.stop(); + brokerContainer2.stop(); + } + + @Override + public void afterTestExecution(ExtensionContext extensionContext) throws Exception { + + } + + @Override + public void beforeTestExecution(ExtensionContext extensionContext) throws Exception { + + } + + public void createTopic(String topic) { + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollDelay(100, TimeUnit.MILLISECONDS).until(() -> { + Container.ExecResult execResult = brokerContainer1.execInContainer( + "sh", "mqadmin", "updateTopic", "-n", "nameserver:9876", "-t", + topic, "-c", "DefaultCluster"); + + LOG.info(execResult.getExitCode() + " " + execResult.getStderr() + " " + execResult.getStdout()); + + return execResult.getStdout() != null && execResult.getStdout().contains("success"); + }); + } + + public void deleteTopic(String topic) throws IOException, InterruptedException { + brokerContainer1.execInContainer( + "sh", "mqadmin", "deleteTopic", "-n", "nameserver:9876", "-t", + topic); + } + + @Override + public String nameserverAddress() { + return nameserverContainer.getHost() + ":" + + nameserverContainer.getMappedPort(RocketMQProperties.ROCKETMQ_NAMESRV_PORT); + } +} diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java new file mode 100644 index 00000000000..fb3e57d5256 --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQNameserverContainer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.rocketmq.services; + +import java.util.Collections; + +import org.apache.camel.test.infra.rocketmq.common.RocketMQProperties; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; + +public class RocketMQNameserverContainer extends GenericContainer<RocketMQNameserverContainer> { + public RocketMQNameserverContainer(Network network) { + super(RocketMQContainer.ROCKETMQ_IMAGE); + + withNetwork(network); + withNetworkAliases("nameserver"); + addExposedPort(RocketMQProperties.ROCKETMQ_NAMESRV_PORT); + withTmpFs(Collections.singletonMap("/home/rocketmq/logs", "rw")); + withCommand("sh", "mqnamesrv"); + withCreateContainerCmdModifier(cmd -> cmd.withName("nameserver")); + + waitingFor(Wait.forListeningPort()); + } +} diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java new file mode 100644 index 00000000000..0334269d392 --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.rocketmq.services; + +import java.io.IOException; + +import org.apache.camel.test.infra.common.services.TestService; +import org.junit.jupiter.api.extension.AfterTestExecutionCallback; +import org.junit.jupiter.api.extension.BeforeTestExecutionCallback; + +public interface RocketMQService extends TestService, BeforeTestExecutionCallback, AfterTestExecutionCallback { + String nameserverAddress(); + + default String defaultCluster() { + return "DefaultCluster"; + } + + void createTopic(String topic); + + void deleteTopic(String topic) throws IOException, InterruptedException; +} diff --git a/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java new file mode 100644 index 00000000000..58124d9f73e --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/java/org/apache/camel/test/infra/rocketmq/services/RocketMQServiceFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.test.infra.rocketmq.services; + +import org.apache.camel.test.infra.common.services.SimpleTestServiceBuilder; + +public final class RocketMQServiceFactory { + private RocketMQServiceFactory() { + + } + + public static SimpleTestServiceBuilder<RocketMQService> builder() { + return new SimpleTestServiceBuilder<>("rocketmq"); + } + + public static RocketMQService createService() { + return builder() + .addLocalMapping(RocketMQContainer::new) + .build(); + } +} diff --git a/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf new file mode 100644 index 00000000000..4e24028a2af --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker1/broker1.conf @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +brokerClusterName = DefaultCluster +brokerName = broker-a +brokerId = 0 +deleteWhen = 04 +fileReservedTime = 48 +brokerRole = ASYNC_MASTER +flushDiskType = ASYNC_FLUSH \ No newline at end of file diff --git a/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf new file mode 100644 index 00000000000..66b7fde12ba --- /dev/null +++ b/test-infra/camel-test-infra-rocketmq/src/test/resources/broker2/broker2.conf @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +brokerClusterName = DefaultCluster +brokerName = broker-b +brokerId = 0 +deleteWhen = 04 +fileReservedTime = 48 +brokerRole = ASYNC_MASTER +flushDiskType = ASYNC_FLUSH \ No newline at end of file diff --git a/test-infra/pom.xml b/test-infra/pom.xml index 5a0627502bd..f27953aa22f 100644 --- a/test-infra/pom.xml +++ b/test-infra/pom.xml @@ -60,6 +60,7 @@ <module>camel-test-infra-nats</module> <module>camel-test-infra-pulsar</module> <module>camel-test-infra-redis</module> + <module>camel-test-infra-rocketmq</module> <module>camel-test-infra-xmpp</module> <module>camel-test-infra-zookeeper</module> <module>camel-test-infra-postgres</module>