This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push:
new edc9dce CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary
reply queue is not handled correctly - Add QueueRecoveryListener to update
replyTo and rebind the new temporary queue - Add integration test using
RabbitMQ Management HTTP API
edc9dce is described below
commit edc9dce8db5e72ffc5097b19fe093655c563b89d
Author: Robert Szczesiak <[email protected]>
AuthorDate: Thu Mar 26 14:35:23 2020 +0100
CAMEL-14789: camel-rabbitmq - Automatic recovery of temporary reply queue
is not handled correctly
- Add QueueRecoveryListener to update replyTo and rebind the new temporary
queue
- Add integration test using RabbitMQ Management HTTP API
---
components/camel-rabbitmq/pom.xml | 5 +
components/camel-rabbitmq/readme.txt | 2 +-
.../rabbitmq/reply/TemporaryQueueReplyManager.java | 17 ++
.../RabbitMQTemporaryQueueAutoRecoveryIntTest.java | 179 +++++++++++++++++++++
4 files changed, 202 insertions(+), 1 deletion(-)
diff --git a/components/camel-rabbitmq/pom.xml
b/components/camel-rabbitmq/pom.xml
index 1749cc7..0c45aee 100644
--- a/components/camel-rabbitmq/pom.xml
+++ b/components/camel-rabbitmq/pom.xml
@@ -104,6 +104,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-http4</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/components/camel-rabbitmq/readme.txt
b/components/camel-rabbitmq/readme.txt
index ef005d5..d812391 100644
--- a/components/camel-rabbitmq/readme.txt
+++ b/components/camel-rabbitmq/readme.txt
@@ -7,7 +7,7 @@ The integration tests requires a running RabbitMQ broker.
The broker can be run via Docker:
- docker run -it -p 5672:5672 -e RABBITMQ_DEFAULT_USER=cameltest -e
RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit
rabbitmq:3
+ docker run -it -p 5672:5672 -p 15672:15672 -e
RABBITMQ_DEFAULT_USER=cameltest -e RABBITMQ_DEFAULT_PASS=cameltest --hostname
my-rabbit --name some-rabbit rabbitmq:3-management
Or to install RabbitMQ as standalone and then configure it:
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index b375e22..a54b4f2 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -24,6 +24,7 @@ import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -94,6 +95,22 @@ public class TemporaryQueueReplyManager extends
ReplyManagerSupport {
//TODO check for the RabbitMQConstants.EXCHANGE_NAME header
channel.queueBind(getReplyTo(), endpoint.getExchangeName(),
getReplyTo());
+ //Add QueueRecoveryListener to notify when temporary queue name
changes due to recovery
+ if (conn instanceof AutorecoveringConnection) {
+ ((AutorecoveringConnection)
conn).addQueueRecoveryListener((oldName, newName) -> {
+ log.debug("Temporary queue name {} was changed to {}. Updating
replyTo.", oldName, newName);
+ setReplyTo(newName);
+
+ log.debug("Trying to rebind the new temporary queue to update
routingKey");
+ try {
+ channel.queueBind(newName, endpoint.getExchangeName(),
newName);
+ channel.queueUnbind(newName, endpoint.getExchangeName(),
oldName);
+ } catch (IOException e) {
+ log.warn("Failed to bind or unbind a queue. This exception
is ignored.", e);
+ }
+ });
+ }
+
consumer = new RabbitConsumer(this, channel);
consumer.start();
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
new file mode 100644
index 0000000..a62a85a
--- /dev/null
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQTemporaryQueueAutoRecoveryIntTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.stream.StreamSupport;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Integration test to check if temporary queue's name change is properly
handled after auto recovering
+ * caused by connection failure.
+ * This test takes advantage of RabbitMQ Management HTTP API provided by
RabbitMQ Management Plugin.
+ */
+public class RabbitMQTemporaryQueueAutoRecoveryIntTest extends
AbstractRabbitMQIntTest {
+
+ private static final String EXCHANGE = "ex_temp-queue-test";
+ private static final String QUEUE = "q_temp-queue-test";
+ private static final String ROUTING_KEY = "k_temp-queue-test";
+ private static final String TEMP_QUEUE_NAME = "tempQueueName";
+ private static final String TEMP_QUEUE_CONN_NAME = "tempQueueConnName";
+ private static final String REQUEST = "Foo request";
+ private static final String REPLY = "Bar reply";
+
+ @Produce(uri = "direct:rabbitMQ")
+ protected ProducerTemplate directRabbitMQProducer;
+
+ @Produce(uri = "direct:rabbitMQApi-forceCloseConnection")
+ protected ProducerTemplate forceCloseConnectionProducer;
+
+ @Produce(uri = "direct:rabbitMQApi-getExchangeBindings")
+ protected ProducerTemplate getExchangeBindingsProducer;
+
+ @EndpointInject(uri = "rabbitmq:" + EXCHANGE +
"?addresses=localhost:5672&username=cameltest&password=cameltest"
+ + "&autoAck=false&queue=" + QUEUE + "&routingKey=" + ROUTING_KEY)
+ private Endpoint rabbitMQEndpoint;
+
+ @EndpointInject(uri =
"http4:localhost:15672/api?authMethod=Basic&authUsername=cameltest&authPassword=cameltest")
+ private Endpoint rabbitMQApiEndpoint;
+
+ @EndpointInject(uri = "mock:consuming")
+ private MockEndpoint consumingMockEndpoint;
+
+ @EndpointInject(uri = "mock:producing")
+ private MockEndpoint producingMockEndpoint;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+
+ log.info("Building routes...");
+
+ from("direct:rabbitMQ")
+ .id("producingRoute")
+ .log("Sending message to RabbitMQ broker")
+ .to(rabbitMQEndpoint)
+ .to(producingMockEndpoint);
+
+ from(rabbitMQEndpoint)
+ .id("consumingRoute")
+ .log("Receiving message from RabbitMQ broker")
+ .to(consumingMockEndpoint)
+ .setBody(simple(REPLY));
+
+ from("direct:rabbitMQApi-forceCloseConnection")
+ .id("forceCloseConnectionRoute")
+ .log("Getting temporary queue's connection name")
+ .setHeader(Exchange.HTTP_PATH,
simple("/queues/%2F/${header." + TEMP_QUEUE_NAME + "}"))
+ .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+ .to(rabbitMQApiEndpoint)
+ .process(exchange -> {
+ String responseJsonString =
exchange.getMessage().getBody(String.class);
+ ObjectNode node = new
ObjectMapper().readValue(responseJsonString, ObjectNode.class);
+ String connectionName =
node.at("/owner_pid_details/name").asText();
+
exchange.getMessage().setHeader(TEMP_QUEUE_CONN_NAME, connectionName);
+ })
+ .log("Force closing temporary queue's connection")
+ .setHeader(Exchange.HTTP_PATH,
simple("/connections/${header." + TEMP_QUEUE_CONN_NAME + "}"))
+ .setHeader(Exchange.HTTP_METHOD, simple("DELETE"))
+ .to(rabbitMQApiEndpoint);
+
+ from("direct:rabbitMQApi-getExchangeBindings")
+ .id("getExchangeBindingsRoute")
+ .log("Getting temporary queue's routing key to verify
rebinding was successful")
+ .setHeader(Exchange.HTTP_PATH,
simple("/exchanges/%2F/" + EXCHANGE + "/bindings/source"))
+ .setHeader(Exchange.HTTP_METHOD, simple("GET"))
+ .to(rabbitMQApiEndpoint)
+ .process(exchange -> {
+ String responseJsonString =
exchange.getMessage().getBody(String.class);
+ String tempQueueName =
exchange.getMessage().getHeader(TEMP_QUEUE_NAME, String.class);
+ ArrayNode node = new
ObjectMapper().readValue(responseJsonString, ArrayNode.class);
+ String tempQueueRoutingKey =
StreamSupport.stream(node.spliterator(), false)
+ .filter(binding ->
tempQueueName.equals(binding.get("destination").textValue()))
+ .findFirst()
+ .map(binding ->
binding.get("routing_key").textValue())
+ .orElse(null);
+ exchange.getMessage().setBody(tempQueueRoutingKey);
+ });
+ }
+ };
+ }
+
+ /**
+ * <p><b>NOTE:</b>Make sure RabbitMQ Management Plugin is enabled
+ * and ConnectionFactory#automaticRecovery is set to <code>true</code>
(default)</p>
+ * <ul>
+ * <li>Send first PRC request that automatically creates server-named
temporary reply queue</li>
+ * <li>Send another PRC request to verify reply-to property stays the same
+ * if no connection failure occurred</li>
+ * <li>Wait a few seconds to ensure all necessary bindings are created
+ * and seen by the RabbitMQ Management HTTP API</li>
+ * <li>Forcibly close temporary reply queue's connection and wait another
few seconds
+ * to let it recover automatically</li>
+ * <li>Send one last RPC request and verify reply-to property is changed
+ * (assuming the new server-generated name will not be exactly the
same)</li>
+ * <li>Get new temporary queue's bindings and verify routing key matches
queue name</li>
+ * </ul>
+ *
+ * @throws InterruptedException when Thread#sleep is interrupted
+ */
+ @Test
+ public void testReplyToAndBindingsUpdated() throws InterruptedException {
+
+ consumingMockEndpoint.expectedMessageCount(3);
+ producingMockEndpoint.expectedMessageCount(3);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToOriginal =
consumingMockEndpoint.getExchanges().get(0).getMessage().getHeader(RabbitMQConstants.REPLY_TO,
String.class);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToVerify =
consumingMockEndpoint.getExchanges().get(1).getMessage().getHeader(RabbitMQConstants.REPLY_TO,
String.class);
+
+ Thread.sleep(7000);
+
+ forceCloseConnectionProducer.sendBodyAndHeader(null, TEMP_QUEUE_NAME,
replyToOriginal);
+ Thread.sleep(7000);
+
+ directRabbitMQProducer.requestBody(REQUEST);
+ String replyToRecovered =
consumingMockEndpoint.getExchanges().get(2).getMessage().getHeader(RabbitMQConstants.REPLY_TO,
String.class);
+
+ String tempQueueRoutingKey = (String)
getExchangeBindingsProducer.requestBodyAndHeader(null, TEMP_QUEUE_NAME,
replyToRecovered);
+
+ assertEquals(replyToVerify, replyToOriginal);
+ assertNotEquals(replyToRecovered, replyToOriginal);
+ assertEquals(tempQueueRoutingKey, replyToRecovered);
+ consumingMockEndpoint.assertIsSatisfied();
+ producingMockEndpoint.assertIsSatisfied();
+ }
+}