[ 
https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=504703&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-504703
 ]

ASF GitHub Bot logged work on ARTEMIS-2937:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Oct/20 13:08
            Start Date: 26/Oct/20 13:08
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3294:
URL: https://github.com/apache/activemq-artemis/pull/3294#discussion_r511909881



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -42,9 +42,9 @@
 
    private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerSource.class);
 
-   public static final Symbol EVENT_TYPE = 
Symbol.getSymbol("x-opt-activemq-mirror-type");
-   public static final Symbol ADDRESS = 
Symbol.getSymbol("x-opt-activemq-mirror-address");
-   public static final Symbol QUEUE = 
Symbol.getSymbol("x-opt-activemq-mirror-queue");
+   public static final Symbol EVENT_TYPE = 
Symbol.getSymbol("x-opt-AMQ-ev-type");
+   public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-AMQ-adr");
+   public static final Symbol QUEUE = Symbol.getSymbol("x-opt-AMQ-queue");

Review comment:
       Whilst I know its an abreviation, I think mixing lower and uppercase 
like this would be best avoided and 'x-opt-amq-<foo>' would be better. It reads 
nicer I think, and I'd expect it to be less likely to be incorrectly used later 
(case being important).
   
   I would personally leave the 'mirror' distinction in their name somehow, 
even if just 'm' somewhare, otherwise they seem rather generic and could 
potentially clash with other functionality later.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testReplicaNoAddressOnMessage() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");

Review comment:
       If this isnt under test here (apears so since its being tested above) it 
so should be removed.
   
   Perhaps a comment saying what IS under test would be good (the other test 
seemingly already doing the same).

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testReplicaNoAddressOnMessage() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(false, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(false, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());

Review comment:
       The AMQP test client does receiveNoWait entirely locally so this wont be 
having the desired effect. Neeed to do something else (JMX?), add a delay, or 
just remove it since its not the focus of the test.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
##########
@@ -321,11 +322,16 @@ private void sendMessage(AMQPMessage message) throws 
Exception {
       }
 
       Long internalID = (Long) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
+      String internalAddress = (String) 
AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, 
INTERNAL_DESTINATION);
 
       if (internalID != null) {
          message.setBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY, internalID);
       }
 
+      if (internalAddress != null) {
+         message.setAddress(internalAddress);
+      }

Review comment:
       Is this for the case where an AMQP message doesnt have 'to' populated? 
(I see a test that looks to be checking that works). Passing the queue name and 
setting the 'message address' so the second broker can handle it?
   
   Would it still do this same thing when the original message did have an AMQP 
'to' field set? It seems like it might be a waste in that case, can it perhaps 
be made to discern that case and not send the annotation?

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());

Review comment:
       The AMQP test client does receiveNoWait entirely locally so this wont be 
having the desired effect. Neeed to do something else (JMX?), add a delay, or 
just remove it since its not the focus of the test.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -56,7 +56,8 @@
    public static final Symbol POST_ACK = Symbol.getSymbol("postAck");
 
    // Delivery annotation property used on mirror control routing and Ack
-   public static final Symbol INTERNAL_ID = 
Symbol.getSymbol("x-opt-ativemq-internal-id");
+   public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-AMQ-id");
+   public static final Symbol INTERNAL_DESTINATION = 
Symbol.getSymbol("x-opt-AMQ-dest");

Review comment:
       Same as above comment.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));

Review comment:
       Can we use a test-named queue here + below? Makes life simpler later 
than reusing the same names for every test. Using a parameter rather than lots 
of literal also makes it easier to see where the name is used and/or easily 
change it if needed.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testReplicaNoAddressOnMessage() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(false, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(false, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));

Review comment:
       Shuld be removed as above. In its place I'd expect to see an assertion 
on the message address being null (that seeming to be what is under test?)

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testReplicaNoAddressOnMessage() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));

Review comment:
       Comment as previous test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 504703)
    Time Spent: 20h 50m  (was: 20h 40m)

> AMQP Server Connectivity
> ------------------------
>
>                 Key: ARTEMIS-2937
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2937
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>          Components: AMQP
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.16.0
>
>          Time Spent: 20h 50m
>  Remaining Estimate: 0h
>
> This feature adds server side connectivity.
>  
> It is possible to link two brokers directly using AMQP with this feature, and 
> have a Queue transferring messages to another broker directly. 
>  
> For this we would have options called <sender and <receiver
>  
>  
> it would also be possible to use qpid-dispatch as an intermediary between 
> clients and the brokers (or eventually between brokers), on that case the 
> option will be <peer
>  
> it would also be possible to use <mirror with a few option to replicate data 
> between two brokers, bringing the possibility of using it for Disaster & 
> Recovery and Failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to