This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new d9d98df ARTEMIS-2910: consider message annotations when determining
routing type used for auto-creation with anonymous producers
new ddef389 This closes #3269
d9d98df is described below
commit d9d98dfa8ac50468acd7c06194d9fb7ebda71dd9
Author: Robbie Gemmell <[email protected]>
AuthorDate: Mon Sep 21 18:07:26 2020 +0100
ARTEMIS-2910: consider message annotations when determining routing type
used for auto-creation with anonymous producers
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 8 +-
.../transport/amqp/client/AmqpMessage.java | 20 ++
.../integration/amqp/AmqpAnonymousRelayTest.java | 236 +++++++++++++++++++++
.../integration/amqp/JMSMessageProducerTest.java | 46 +++-
4 files changed, 304 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index bd17817..25cb4ab 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -464,6 +464,7 @@ public class AMQPSessionCallback implements SessionCallback
{
context.incrementSettle();
+ RoutingType routingType = null;
if (address != null) {
message.setAddress(address);
} else {
@@ -474,10 +475,15 @@ public class AMQPSessionCallback implements
SessionCallback {
rejectMessage(context, delivery, Symbol.valueOf("failed"),
"Missing 'to' field for message sent to an anonymous producer");
return;
}
+
+ routingType = message.getRoutingType();
}
//here check queue-autocreation
- RoutingType routingType = context.getRoutingType(receiver, address);
+ if (routingType == null) {
+ routingType = context.getRoutingType(receiver, address);
+ }
+
if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
}
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index bd5551c..99ade07 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -32,6 +32,7 @@ import
org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
@@ -612,6 +613,25 @@ public class AmqpMessage {
}
/**
+ * Attempts to retrieve the message body as a String from an AmqpValue body.
+ *
+ * @return the string
+ * @throws NoSuchElementException if the body does not contain a AmqpValue
with String.
+ */
+ public String getText() throws NoSuchElementException {
+ Section body = getWrappedMessage().getBody();
+ if (body instanceof AmqpValue) {
+ AmqpValue value = (AmqpValue) body;
+
+ if (value.getValue() instanceof String) {
+ return (String) value.getValue();
+ }
+ }
+
+ throw new NoSuchElementException("Message does not contain a String
body");
+ }
+
+ /**
* Sets a byte array value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
index 1743624..98938d7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnonymousRelayTest.java
@@ -18,6 +18,12 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.AddressQueryResult;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -28,6 +34,10 @@ import org.junit.Test;
public class AmqpAnonymousRelayTest extends AmqpClientTestSupport {
+ private static final String AUTO_CREATION_QUEUE_PREFIX =
"AmqpAnonymousRelayTest-AutoCreateQueues.";
+ private static final String AUTO_CREATION_TOPIC_PREFIX =
"AmqpAnonymousRelayTest-AutoCreateTopics.";
+
+ // Disable auto-creation in the general config created by the superclass,
we add specific prefixed areas with it enabled
@Override
protected boolean isAutoCreateQueues() {
return false;
@@ -38,6 +48,232 @@ public class AmqpAnonymousRelayTest extends
AmqpClientTestSupport {
return false;
}
+ // Additional address configuration for auto creation of queues and topics
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ super.configureAddressPolicy(server);
+
+ AddressSettings autoCreateQueueAddressSettings = new AddressSettings();
+ autoCreateQueueAddressSettings.setAutoCreateQueues(true);
+ autoCreateQueueAddressSettings.setAutoCreateAddresses(true);
+
autoCreateQueueAddressSettings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
+
autoCreateQueueAddressSettings.setDefaultQueueRoutingType(RoutingType.ANYCAST);
+
+
server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_QUEUE_PREFIX
+ "#", autoCreateQueueAddressSettings);
+
+ AddressSettings autoCreateTopicAddressSettings = new AddressSettings();
+ autoCreateTopicAddressSettings.setAutoCreateQueues(true);
+ autoCreateTopicAddressSettings.setAutoCreateAddresses(true);
+
autoCreateTopicAddressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
+
autoCreateTopicAddressSettings.setDefaultQueueRoutingType(RoutingType.MULTICAST);
+
+
server.getConfiguration().getAddressesSettings().put(AUTO_CREATION_TOPIC_PREFIX
+ "#", autoCreateTopicAddressSettings);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageOnAnonymousProducerCausesQueueAutoCreation()
throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // We use an address in the QUEUE prefixed auto-creation area to ensure
the broker picks this up
+ // and creates a queue, in the absense of any other message annotation /
terminus capability config.
+ String queueName = AUTO_CREATION_QUEUE_PREFIX + getQueueName();
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+ message.setAddress(queueName);
+ message.setText(getTestName());
+
+ AddressQueryResult addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+ sender.close();
+
+ addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertTrue(addressQueryResult.isExists());
+
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create a receiver and verify it can consume the message from the
auto-created queue
+ AmqpReceiver receiver = session.createReceiver(queueName);
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received);
+ assertEquals(getTestName(), received.getText());
+ received.accept();
+
+ receiver.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageOnAnonymousProducerCausesTopicAutoCreation()
throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ // We use an address in the TOPIC prefixed auto-creation area to ensure
the broker picks this up
+ // and creates a topic, in the absense of any other message annotation /
terminus capability config.
+ String topicName = AUTO_CREATION_TOPIC_PREFIX + getTopicName();
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+
+ message.setAddress(topicName);
+ message.setText("creating-topic-address");
+
+ AddressQueryResult addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+
+ addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertTrue(addressQueryResult.isExists());
+
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create 2 receivers and verify they can both consume a new message
sent to the auto-created topic
+ AmqpReceiver receiver1 = session.createReceiver(topicName);
+ AmqpReceiver receiver2 = session.createReceiver(topicName);
+ receiver1.flow(1);
+ receiver2.flow(1);
+
+ AmqpMessage message2 = new AmqpMessage();
+ message2.setAddress(topicName);
+ message2.setText(getTestName());
+
+ sender.send(message2);
+
+ AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received1);
+ assertEquals(getTestName(), received1.getText());
+ received1.accept();
+
+ AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received2);
+ assertEquals(getTestName(), received2.getText());
+ received1.accept();
+
+ receiver1.close();
+ receiver2.close();
+ sender.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void
testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesQueueAutoCreation()
throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+
message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(),
AMQPMessageSupport.QUEUE_TYPE);
+
+ // We deliberately use the TOPIC prefixed auto-creation area, not the
QUEUE prefix, to ensure
+ // we get a queue because the broker inspects the value we send on
the message, and not just
+ // because it was taken as a default from the address settings.
+ String queueName = AUTO_CREATION_TOPIC_PREFIX + getQueueName();
+
+ message.setAddress(queueName);
+ message.setText(getTestName());
+
+ AddressQueryResult addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+ sender.close();
+
+ addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(queueName));
+ assertTrue(addressQueryResult.isExists());
+
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create a receiver and verify it can consume the message from the
auto-created queue
+ AmqpReceiver receiver = session.createReceiver(queueName);
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received);
+ assertEquals(getTestName(), received.getText());
+ received.accept();
+
+ receiver.close();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void
testSendMessageOnAnonymousProducerWithDestinationTypeAnnotationCausesTopicAutoCreation()
throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createAnonymousSender();
+ AmqpMessage message = new AmqpMessage();
+
message.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(),
AMQPMessageSupport.TOPIC_TYPE);
+
+ // We deliberately use the QUEUE prefixed auto-creation area, not the
TOPIC prefix, to ensure
+ // we get a topic because the broker inspects the value we send on
the message, and not just
+ // because it was taken as a default from the address settings.
+ String topicName = AUTO_CREATION_QUEUE_PREFIX + getTopicName();
+ message.setAddress(topicName);
+ message.setText("creating-topic-address");
+
+ AddressQueryResult addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertFalse(addressQueryResult.isExists());
+
+ sender.send(message);
+
+ addressQueryResult =
server.addressQuery(SimpleString.toSimpleString(topicName));
+ assertTrue(addressQueryResult.isExists());
+
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertTrue(addressQueryResult.isAutoCreated());
+
+ // Create 2 receivers and verify they can both consume a new message
sent to the auto-created topic
+ AmqpReceiver receiver1 = session.createReceiver(topicName);
+ AmqpReceiver receiver2 = session.createReceiver(topicName);
+ receiver1.flow(1);
+ receiver2.flow(1);
+
+ AmqpMessage message2 = new AmqpMessage();
+
message2.setMessageAnnotation(AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION.toString(),
AMQPMessageSupport.TOPIC_TYPE);
+ message2.setAddress(topicName);
+ message2.setText(getTestName());
+
+ sender.send(message2);
+
+ AmqpMessage received1 = receiver1.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received1);
+ assertEquals(getTestName(), received1.getText());
+ received1.accept();
+
+ AmqpMessage received2 = receiver2.receive(10, TimeUnit.SECONDS);
+ assertNotNull("Should have read message", received2);
+ assertEquals(getTestName(), received2.getText());
+ received1.accept();
+
+ receiver1.close();
+ receiver2.close();
+ sender.close();
+ } finally {
+ connection.close();
+ }
+ }
+
@Test(timeout = 60000)
public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws
Exception {
AmqpClient client = createAmqpClient();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
index 2125ed8..fddc19c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageProducerTest.java
@@ -37,6 +37,35 @@ import org.junit.Test;
public class JMSMessageProducerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
+ public void testAnonymousProducerWithQueueAutoCreation() throws Exception {
+ Connection connection = createConnection();
+
+ try {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ String queueName = UUID.randomUUID().toString() + ":" +
getQueueName();
+ Queue queue = session.createQueue(queueName);
+ MessageProducer p = session.createProducer(null);
+
+ TextMessage message = session.createTextMessage();
+ message.setText(getTestName());
+ // This will auto-create the address, and be retained for subsequent
consumption
+ p.send(queue, message);
+
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+ p.send(queue, message);
+ Message msg = consumer.receive(2000);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals(getTestName(), ((TextMessage)msg).getText());
+ consumer.close();
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 30000)
public void testAnonymousProducer() throws Exception {
Connection connection = createConnection();
@@ -71,25 +100,32 @@ public class JMSMessageProducerTest extends
JMSClientTestSupport {
}
@Test(timeout = 30000)
- public void testAnonymousProducerWithAutoCreation() throws Exception {
+ public void testAnonymousProducerWithTopicAutoCreation() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic(UUID.randomUUID().toString());
+ String topicName = UUID.randomUUID().toString() + ":" +
getQueueName();
+ Topic topic = session.createTopic(topicName);
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
- message.setText("hello");
- // this will auto-create the address
+ message.setText("creating-topic-address");
+ // This will auto-create the address, but msg will be discarded as
there are no consumers
p.send(topic, message);
{
+ // This will create a new consumer, on the topic address,
verifying it can attach
+ // and then receives a further sent message
MessageConsumer consumer = session.createConsumer(topic);
- p.send(topic, message);
+ Message message2 = message =
session.createTextMessage(getTestName());
+
+ p.send(topic, message2);
+
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
+ assertEquals(getTestName(), ((TextMessage)msg).getText());
consumer.close();
}
} finally {