[ https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=504703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-504703 ]
ASF GitHub Bot logged work on ARTEMIS-2937: ------------------------------------------- Author: ASF GitHub Bot Created on: 26/Oct/20 13:08 Start Date: 26/Oct/20 13:08 Worklog Time Spent: 10m Work Description: gemmellr commented on a change in pull request #3294: URL: https://github.com/apache/activemq-artemis/pull/3294#discussion_r511909881 ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java ########## @@ -42,9 +42,9 @@ private static final Logger logger = Logger.getLogger(AMQPMirrorControllerSource.class); - public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-activemq-mirror-type"); - public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-activemq-mirror-address"); - public static final Symbol QUEUE = Symbol.getSymbol("x-opt-activemq-mirror-queue"); + public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-AMQ-ev-type"); + public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-AMQ-adr"); + public static final Symbol QUEUE = Symbol.getSymbol("x-opt-AMQ-queue"); Review comment: Whilst I know its an abreviation, I think mixing lower and uppercase like this would be best avoided and 'x-opt-amq-<foo>' would be better. It reads nicer I think, and I'd expect it to be less likely to be incorrectly used later (case being important). I would personally leave the 'mirror' distinction in their name somehow, even if just 'm' somewhare, otherwise they seem rather generic and could potentially clash with other functionality later. ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = locateQueue(server_2, "TEST"); + final Queue queueViewReplica = locateQueue(server_2, "TEST"); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(true, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount); + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(true, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); + + connection.close(); + } + } + + + @Test + public void testReplicaNoAddressOnMessage() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); Review comment: If this isnt under test here (apears so since its being tested above) it so should be removed. Perhaps a comment saying what IS under test would be good (the other test seemingly already doing the same). ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = locateQueue(server_2, "TEST"); + final Queue queueViewReplica = locateQueue(server_2, "TEST"); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(true, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount); + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(true, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); + + connection.close(); + } + } + + + @Test + public void testReplicaNoAddressOnMessage() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(false, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(false, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); Review comment: The AMQP test client does receiveNoWait entirely locally so this wont be having the desired effect. Neeed to do something else (JMX?), add a delay, or just remove it since its not the focus of the test. ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java ########## @@ -321,11 +322,16 @@ private void sendMessage(AMQPMessage message) throws Exception { } Long internalID = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID); + String internalAddress = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_DESTINATION); if (internalID != null) { message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID); } + if (internalAddress != null) { + message.setAddress(internalAddress); + } Review comment: Is this for the case where an AMQP message doesnt have 'to' populated? (I see a test that looks to be checking that works). Passing the queue name and setting the 'message address' so the second broker can handle it? Would it still do this same thing when the original message did have an AMQP 'to' field set? It seems like it might be a waste in that case, can it perhaps be made to discern that case and not send the annotation? ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = locateQueue(server_2, "TEST"); + final Queue queueViewReplica = locateQueue(server_2, "TEST"); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(true, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount); + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(true, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); Review comment: The AMQP test client does receiveNoWait entirely locally so this wont be having the desired effect. Neeed to do something else (JMX?), add a delay, or just remove it since its not the focus of the test. ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java ########## @@ -56,7 +56,8 @@ public static final Symbol POST_ACK = Symbol.getSymbol("postAck"); // Delivery annotation property used on mirror control routing and Ack - public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-ativemq-internal-id"); + public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-AMQ-id"); + public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-AMQ-dest"); Review comment: Same as above comment. ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); Review comment: Can we use a test-named queue here + below? Makes life simpler later than reusing the same names for every test. Using a parameter rather than lots of literal also makes it easier to see where the name is used and/or easily change it if needed. ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = locateQueue(server_2, "TEST"); + final Queue queueViewReplica = locateQueue(server_2, "TEST"); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(true, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount); + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(true, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); + + connection.close(); + } + } + + + @Test + public void testReplicaNoAddressOnMessage() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(false, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(false, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); Review comment: Shuld be removed as above. In its place I'd expect to see an assertion on the message address being null (that seeming to be what is under test?) ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java ########## @@ -245,6 +254,142 @@ private String getText(boolean large, int i) { } } + /** + * Delivery annotations should be gone on the receiving side + * @throws Exception + */ + @Test + public void testLargeMessagesWithDeliveryAnnotations() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); + + Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214")); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = locateQueue(server_2, "TEST"); + final Queue queueViewReplica = locateQueue(server_2, "TEST"); + + { // sender + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("TEST"); + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage message = new AmqpMessage(); + message.setDeliveryAnnotation("gone", "test"); + message.setText(getText(true, i)); + sender.send(message); + } + sender.close(); + connection.close(); + } + + Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount); + Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount); + + { // receiver on replica + AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + // Now try and get the message + + AmqpReceiver receiver = session.createReceiver("TEST"); + receiver.flow(NUMBER_OF_MESSAGES); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + Assert.assertEquals(getText(true, i), received.getText()); + Assert.assertNull(received.getDeliveryAnnotation("gone")); + } + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); + + connection.close(); + } + } + + + @Test + public void testReplicaNoAddressOnMessage() throws Exception { + server.setIdentity("targetServer"); + server.start(); + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + + int NUMBER_OF_MESSAGES = 20; + + server_2.start(); + Wait.assertTrue(server_2::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false)); + server_2.createQueue(new QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false)); Review comment: Comment as previous test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 504703) Time Spent: 20h 50m (was: 20h 40m) > AMQP Server Connectivity > ------------------------ > > Key: ARTEMIS-2937 > URL: https://issues.apache.org/jira/browse/ARTEMIS-2937 > Project: ActiveMQ Artemis > Issue Type: New Feature > Components: AMQP > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.16.0 > > Time Spent: 20h 50m > Remaining Estimate: 0h > > This feature adds server side connectivity. > > It is possible to link two brokers directly using AMQP with this feature, and > have a Queue transferring messages to another broker directly. > > For this we would have options called <sender and <receiver > > > it would also be possible to use qpid-dispatch as an intermediary between > clients and the brokers (or eventually between brokers), on that case the > option will be <peer > > it would also be possible to use <mirror with a few option to replicate data > between two brokers, bringing the possibility of using it for Disaster & > Recovery and Failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)