gemmellr commented on code in PR #5920:
URL: https://github.com/apache/activemq-artemis/pull/5920#discussion_r2343541349
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
remoteServer.stop();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
+ localAddressPolicy2.addToIncludes("test");
+ localAddressPolicy2.setAutoDelete(false);
+ localAddressPolicy2.setAutoDeleteDelay(-1L);
+ localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ element1.setName(getTestName() + ":1");
+ element1.addLocalAddressPolicy(localAddressPolicy1);
+
+ final AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+ element2.setName(getTestName() + "2");
Review Comment:
":2" for consistency?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -6880,6 +6880,159 @@ private void
doTestDrainReceiverOnTransientErrorsConfiguredAtFederationLevel(boo
}
}
+ @Test
+ @Timeout(20)
+ public void
testFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand() throws
Exception {
+
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(true);
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationSourceDoesNotTreatTargetFilteredFederationReceiversAsLocalDemand()
throws Exception {
+
doTestFederationSourceDoesNotTreatTargetFederationReceiversAsLocalDemand(false);
+ }
Review Comment:
Boolean args the wrong way round?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
remoteServer.stop();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
+ localAddressPolicy2.addToIncludes("test");
+ localAddressPolicy2.setAutoDelete(false);
+ localAddressPolicy2.setAutoDeleteDelay(-1L);
+ localAddressPolicy2.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy2.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ element1.setName(getTestName() + ":1");
+ element1.addLocalAddressPolicy(localAddressPolicy1);
+
+ final AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+ element2.setName(getTestName() + "2");
+ element2.addLocalAddressPolicy(localAddressPolicy2);
+
+ final AMQPBrokerConnectConfiguration amqpConnection1 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection1.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection1.addElement(element1);
+
+ final AMQPBrokerConnectConfiguration amqpConnection2 =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT);
+ amqpConnection2.setReconnectAttempts(10);// Limit reconnects
+ amqpConnection2.addElement(element2);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection1);
+ remoteServer.getConfiguration().addAMQPConnection(amqpConnection2);
+ remoteServer.start();
+ server.start();
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connectionL = factoryLocal.createConnection();
+ Connection connectionR = factoryRemote.createConnection()) {
+
+ final Session sessionL =
connectionL.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Session sessionR =
connectionR.createSession(Session.AUTO_ACKNOWLEDGE);
+
+ final Topic topic = sessionL.createTopic("test");
+
+ final MessageConsumer consumerL = sessionL.createConsumer(topic);
+ final MessageConsumer consumerR = sessionR.createConsumer(topic);
+
+ connectionL.start();
+ connectionR.start();
+
+ final SimpleString addressName = SimpleString.of("test");
+
+ Wait.assertTrue(() -> server.addressQuery(addressName).isExists(),
5_000, 50);
+ Wait.assertTrue(() ->
remoteServer.addressQuery(addressName).isExists(), 5_000, 50);
+
+ // Captures state of JMS consumer and federation consumer attached on
each node
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() == 2, 10_000, 50);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() == 2, 10_000, 50);
+
+ // Without active consumers the federation bindings should not
sustain each other
+
+ consumerL.close();
+
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() == 1, 10_000, 50);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() == 1, 10_000, 50);
+
+ consumerR.close();
+
+ Wait.assertTrue(() -> server.bindingQuery(addressName,
false).getQueueNames().size() == 0, 10_000, 50);
+ Wait.assertTrue(() -> remoteServer.bindingQuery(addressName,
false).getQueueNames().size() == 0, 10_000, 50);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnOneConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
Review Comment:
OneConnections -> OneConnection
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationServerToServerTest.java:
##########
@@ -1923,4 +1924,163 @@ private void
doTestRemoteAddressFederationAppliesConsumerFilterIfConfigured(bool
remoteServer.stop();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testAddressFederatedOnTwoConnectionsDoesNotCreateSelfSustainingLoop() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy1 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy1.setName("local-test-policy");
+ localAddressPolicy1.addToIncludes("test");
+ localAddressPolicy1.setAutoDelete(false);
+ localAddressPolicy1.setAutoDeleteDelay(-1L);
+ localAddressPolicy1.setAutoDeleteMessageCount(-1L);
+ localAddressPolicy1.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederationAddressPolicyElement localAddressPolicy2 = new
AMQPFederationAddressPolicyElement();
+ localAddressPolicy2.setName("remote-test-policy");
Review Comment:
local-test-policy-2 ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact