gemmellr commented on code in PR #5920:
URL: https://github.com/apache/activemq-artemis/pull/5920#discussion_r2343541349


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void 
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
          remoteServer.stop();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");
+      localAddressPolicy2.addToIncludes("test");
+      localAddressPolicy2.setAutoDelete(false);
+      localAddressPolicy2.setAutoDeleteDelay(-1L);
+      localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederatedBrokerConnectionElement element1 = new 
AMQPFederatedBrokerConnectionElement();
+      element1.setName(getTestName() + ":1");
+      element1.addLocalAddressPolicy(localAddressPolicy1);
+
+      final AMQPFederatedBrokerConnectionElement element2 = new 
AMQPFederatedBrokerConnectionElement();
+      element2.setName(getTestName() + "2");

Review Comment:
   ":2" for consistency?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -6880,6 +6880,159 @@ private void 
doTestDrainReceiverOnTransientErrorsConfiguredAtFederationLevel(boo
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand() throws 
Exception {
+      
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationSourceDoesNotTreatTargetFilteredFederationReceiversAsLocalDemand()
 throws Exception {
+      
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(false);
+   }

Review Comment:
   Boolean args the wrong way round?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void 
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
          remoteServer.stop();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");
+      localAddressPolicy2.addToIncludes("test");
+      localAddressPolicy2.setAutoDelete(false);
+      localAddressPolicy2.setAutoDeleteDelay(-1L);
+      localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederatedBrokerConnectionElement element1 = new 
AMQPFederatedBrokerConnectionElement();
+      element1.setName(getTestName() + ":1");
+      element1.addLocalAddressPolicy(localAddressPolicy1);
+
+      final AMQPFederatedBrokerConnectionElement element2 = new 
AMQPFederatedBrokerConnectionElement();
+      element2.setName(getTestName() + "2");
+      element2.addLocalAddressPolicy(localAddressPolicy2);
+
+      final AMQPBrokerConnectConfiguration amqpConnection1 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection1.addElement(element1);
+
+      final AMQPBrokerConnectConfiguration amqpConnection2 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT);
+      amqpConnection2.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection2.addElement(element2);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection1);
+      remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connectionL = factoryLocal.createConnection();
+           Connection connectionR = factoryRemote.createConnection()) {
+
+         final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+         final Topic topic = sessionL.createTopic("test");
+
+         final MessageConsumer consumerL = sessionL.createConsumer(topic);
+         final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+         connectionL.start();
+         connectionR.start();
+
+         final SimpleString addressName = SimpleString.of("test");
+
+         Wait.assertTrue(() -> server.addressQuery(addressName).isExists(), 
5_000, 50);
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(addressName).isExists(), 5_000, 50);
+
+         // Captures state of JMS consumer and federation consumer attached on 
each node
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() == 2, 10_000, 50);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 2, 10_000, 50);
+
+         // Without active consumers the federation bindings should not 
sustain each other
+
+         consumerL.close();
+
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() == 1, 10_000, 50);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 1, 10_000, 50);
+
+         consumerR.close();
+
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() == 0, 10_000, 50);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 0, 10_000, 50);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnOneConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {

Review Comment:
   OneConnections -> OneConnection



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void 
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
          remoteServer.stop();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");

Review Comment:
   local-test-policy-2 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to