[ 
https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=504732&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-504732
 ]

ASF GitHub Bot logged work on ARTEMIS-2937:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Oct/20 14:11
            Start Date: 26/Oct/20 14:11
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on a change in pull request 
#3294:
URL: https://github.com/apache/activemq-artemis/pull/3294#discussion_r511989530



##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
##########
@@ -245,6 +254,142 @@ private String getText(boolean large, int i) {
       }
    }
 
+   /**
+    * Delivery annotations should be gone on the receiving side
+    * @throws Exception
+    */
+   @Test
+   public void testLargeMessagesWithDeliveryAnnotations() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      // Get the Queue View early to avoid racing the delivery.
+      final Queue queueView = locateQueue(server_2, "TEST");
+      final Queue queueViewReplica = locateQueue(server_2, "TEST");
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");
+            message.setText(getText(true, i));
+            sender.send(message);
+         }
+         sender.close();
+         connection.close();
+      }
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueView::getMessageCount);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, queueViewReplica::getMessageCount);
+
+      { // receiver on replica
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+         // Now try and get the message
+
+         AmqpReceiver receiver = session.createReceiver("TEST");
+         receiver.flow(NUMBER_OF_MESSAGES);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+            Assert.assertEquals(getText(true, i), received.getText());
+            Assert.assertNull(received.getDeliveryAnnotation("gone"));
+         }
+         receiver.flow(1);
+         Assert.assertNull(receiver.receiveNoWait());
+
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testReplicaNoAddressOnMessage() throws Exception {
+      server.setIdentity("targetServer");
+      server.start();
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true);
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+
+      int NUMBER_OF_MESSAGES = 20;
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo("TEST").addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server_2.createQueue(new 
QueueConfiguration("TEST").setRoutingType(RoutingType.ANYCAST).setAddress("TEST").setAutoCreated(false));
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
+
+      { // sender
+         AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + 
AMQP_PORT_2), null, null);
+         AmqpConnection connection = addConnection(client.connect());
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender("TEST");
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setDeliveryAnnotation("gone", "test");

Review comment:
       there was already a comment actually on the test
      /**
       * Delivery annotations should be gone on the receiving side
       * @throws Exception
       */
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 504732)
    Time Spent: 23h 20m  (was: 23h 10m)

> AMQP Server Connectivity
> ------------------------
>
>                 Key: ARTEMIS-2937
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2937
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>          Components: AMQP
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.16.0
>
>          Time Spent: 23h 20m
>  Remaining Estimate: 0h
>
> This feature adds server side connectivity.
>  
> It is possible to link two brokers directly using AMQP with this feature, and 
> have a Queue transferring messages to another broker directly. 
>  
> For this we would have options called <sender and <receiver
>  
>  
> it would also be possible to use qpid-dispatch as an intermediary between 
> clients and the brokers (or eventually between brokers), on that case the 
> option will be <peer
>  
> it would also be possible to use <mirror with a few option to replicate data 
> between two brokers, bringing the possibility of using it for Disaster & 
> Recovery and Failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to