[ 
https://issues.apache.org/jira/browse/ARTEMIS-5656?focusedWorklogId=982899&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-982899
 ]

ASF GitHub Bot logged work on ARTEMIS-5656:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Sep/25 09:13
            Start Date: 12/Sep/25 09:13
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #5920:
URL: https://github.com/apache/activemq-artemis/pull/5920#discussion_r2343541349


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void 
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
          remoteServer.stop();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");
+      localAddressPolicy2.addToIncludes("test");
+      localAddressPolicy2.setAutoDelete(false);
+      localAddressPolicy2.setAutoDeleteDelay(-1L);
+      localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederatedBrokerConnectionElement element1 = new 
AMQPFederatedBrokerConnectionElement();
+      element1.setName(getTestName() + ":1");
+      element1.addLocalAddressPolicy(localAddressPolicy1);
+
+      final AMQPFederatedBrokerConnectionElement element2 = new 
AMQPFederatedBrokerConnectionElement();
+      element2.setName(getTestName() + "2");

Review Comment:
   ":2" for consistency?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -6880,6 +6880,159 @@ private void 
doTestDrainReceiverOnTransientErrorsConfiguredAtFederationLevel(boo
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand() throws 
Exception {
+      
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationSourceDoesNotTreatTargetFilteredFederationReceiversAsLocalDemand()
 throws Exception {
+      
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(false);
+   }

Review Comment:
   Boolean args the wrong way round?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void 
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
          remoteServer.stop();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");
+      localAddressPolicy2.addToIncludes("test");
+      localAddressPolicy2.setAutoDelete(false);
+      localAddressPolicy2.setAutoDeleteDelay(-1L);
+      localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederatedBrokerConnectionElement element1 = new 
AMQPFederatedBrokerConnectionElement();
+      element1.setName(getTestName() + ":1");
+      element1.addLocalAddressPolicy(localAddressPolicy1);
+
+      final AMQPFederatedBrokerConnectionElement element2 = new 
AMQPFederatedBrokerConnectionElement();
+      element2.setName(getTestName() + "2");
+      element2.addLocalAddressPolicy(localAddressPolicy2);
+
+      final AMQPBrokerConnectConfiguration amqpConnection1 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection1.addElement(element1);
+
+      final AMQPBrokerConnectConfiguration amqpConnection2 =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT);
+      amqpConnection2.setReconnectAttempts(10);// Limit reconnects
+      amqpConnection2.addElement(element2);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection1);
+      remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
+      remoteServer.start();
+      server.start();
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connectionL = factoryLocal.createConnection();
+           Connection connectionR = factoryRemote.createConnection()) {
+
+         final Session sessionL = 
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Session sessionR = 
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+         final Topic topic = sessionL.createTopic("test");
+
+         final MessageConsumer consumerL = sessionL.createConsumer(topic);
+         final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+         connectionL.start();
+         connectionR.start();
+
+         final SimpleString addressName = SimpleString.of("test");
+
+         Wait.assertTrue(() -> server.addressQuery(addressName).isExists(), 
5_000, 50);
+         Wait.assertTrue(() -> 
remoteServer.addressQuery(addressName).isExists(), 5_000, 50);
+
+         // Captures state of JMS consumer and federation consumer attached on 
each node
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() == 2, 10_000, 50);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 2, 10_000, 50);
+
+         // Without active consumers the federation bindings should not 
sustain each other
+
+         consumerL.close();
+
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() == 1, 10_000, 50);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 1, 10_000, 50);
+
+         consumerR.close();
+
+         Wait.assertTrue(() -> server.bindingQuery(addressName, 
false).getQueueNames().size() == 0, 10_000, 50);
+         Wait.assertTrue(() -> remoteServer.bindingQuery(addressName, 
false).getQueueNames().size() == 0, 10_000, 50);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnOneConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {

Review Comment:
   OneConnections -> OneConnection



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void 
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
          remoteServer.stop();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy1 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy1.setName("local-test-policy");
+      localAddressPolicy1.addToIncludes("test");
+      localAddressPolicy1.setAutoDelete(false);
+      localAddressPolicy1.setAutoDeleteDelay(-1L);
+      localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+      localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+      final AMQPFederationAddressPolicyElement localAddressPolicy2 = new 
AMQPFederationAddressPolicyElement();
+      localAddressPolicy2.setName("remote-test-policy");

Review Comment:
   local-test-policy-2 ?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 982899)
    Time Spent: 0.5h  (was: 20m)

> Prevent bi-directional AMQP Address Federation from creating unneeded links
> ---------------------------------------------------------------------------
>
>                 Key: ARTEMIS-5656
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5656
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>          Components: AMQP
>    Affects Versions: 2.42.0
>            Reporter: Timothy A. Bish
>            Assignee: Timothy A. Bish
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 2.43.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When bi-directional address federation is configured links from the opposing 
> broker are treated as demand when they should generally not be since only 
> non-federation consumers should cause creation of return links to the source 
> broker.  By preventing this artificial demand we can reduce the amount of 
> sustained federation links that either can't send back to the source because 
> we prevent loops or are sending to a remote that might have to actual local 
> demand in more complex configuration like a hub and spoke topology.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to