[
https://issues.apache.org/jira/browse/ARTEMIS-5656?focusedWorklogId=982899&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-982899
]
ASF GitHub Bot logged work on ARTEMIS-5656:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Sep/25 09:13
Start Date: 12/Sep/25 09:13
Worklog Time Spent: 10m
Work Description: gemmellr commented on code in PR #5920:
URL: https://github.com/apache/activemq-artemis/pull/5920#discussion_r2343541349
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
remoteServer.stop();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
+ localAddressPolicy2.addToIncludes("test");
+ localAddressPolicy2.setAutoDelete(false);
+ localAddressPolicy2.setAutoDeleteDelay(-1L);
+ localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ element1.setName(getTestName() + ":1");
+ element1.addLocalAddressPolicy(localAddressPolicy1);
+
+ final AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+ element2.setName(getTestName() + "2");
Review Comment:
":2" for consistency?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -6880,6 +6880,159 @@ private void
doTestDrainReceiverOnTransientErrorsConfiguredAtFederationLevel(boo
}
}
+ @Test
+ @Timeout(20)
+ public void
testFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand() throws
Exception {
+
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(true);
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationSourceDoesNotTreatTargetFilteredFederationReceiversAsLocalDemand()
throws Exception {
+
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(false);
+ }
Review Comment:
Boolean args the wrong way round?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
remoteServer.stop();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
+ localAddressPolicy2.addToIncludes("test");
+ localAddressPolicy2.setAutoDelete(false);
+ localAddressPolicy2.setAutoDeleteDelay(-1L);
+ localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ element1.setName(getTestName() + ":1");
+ element1.addLocalAddressPolicy(localAddressPolicy1);
+
+ final AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+ element2.setName(getTestName() + "2");
+ element2.addLocalAddressPolicy(localAddressPolicy2);
+
+ final AMQPBrokerConnectConfiguration amqpConnection1 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection1.addElement(element1);
+
+ final AMQPBrokerConnectConfiguration amqpConnection2 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT);
+ amqpConnection2.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection2.addElement(element2);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection1);
+ remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connectionL = factoryLocal.createConnection();
+ Connection connectionR = factoryRemote.createConnection()) {
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionL.createTopic("test");
+
+ final MessageConsumer consumerL = sessionL.createConsumer(topic);
+ final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+ connectionL.start();
+ connectionR.start();
+
+ final SimpleString addressName = SimpleString.of("test");
+
+ Wait.assertTrue(() -> server.addressQuery(addressName).isExists(),
5_000, 50);
+ Wait.assertTrue(() ->
remoteServer.addressQuery(addressName).isExists(), 5_000, 50);
+
+ // Captures state of JMS consumer and federation consumer attached on
each node
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() == 2, 10_000, 50);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() == 2, 10_000, 50);
+
+ // Without active consumers the federation bindings should not
sustain each other
+
+ consumerL.close();
+
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() == 1, 10_000, 50);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() == 1, 10_000, 50);
+
+ consumerR.close();
+
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() == 0, 10_000, 50);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() == 0, 10_000, 50);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnOneConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
Review Comment:
OneConnections -> OneConnection
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
remoteServer.stop();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
Review Comment:
local-test-policy-2 ?
Issue Time Tracking
-------------------
Worklog Id: (was: 982899)
Time Spent: 0.5h (was: 20m)
> Prevent bi-directional AMQP Address Federation from creating unneeded links
> ---------------------------------------------------------------------------
>
> Key: ARTEMIS-5656
> URL: https://issues.apache.org/jira/browse/ARTEMIS-5656
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Components: AMQP
> Affects Versions: 2.42.0
> Reporter: Timothy A. Bish
> Assignee: Timothy A. Bish
> Priority: Minor
> Labels: pull-request-available
> Fix For: 2.43.0
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> When bi-directional address federation is configured links from the opposing
> broker are treated as demand when they should generally not be since only
> non-federation consumers should cause creation of return links to the source
> broker. By preventing this artificial demand we can reduce the amount of
> sustained federation links that either can't send back to the source because
> we prevent loops or are sending to a remote that might have to actual local
> demand in more complex configuration like a hub and spoke topology.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact