gemmellr commented on code in PR #5793:
URL: https://github.com/apache/activemq-artemis/pull/5793#discussion_r2163692859
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java:
##########
@@ -195,4 +196,18 @@ public boolean isReceiverDemandTrackingDisabled() {
return configuration.isReceiverDemandTrackingDisabled();
}
}
+
+ /**
+ * {@return <code>true</code> if bridge from address policies are
configured to prefer using shared durable address subscriptions}
+ */
+ public boolean isPreferSharedDuableSubscriptions() {
Review Comment:
Duable -> Durable
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java:
##########
@@ -1270,4 +1274,99 @@ public void
testDurableAddressSubscriptionRecoveredOnRestart() throws Exception
assertEquals("red", receivedAfter.getStringProperty("color"));
}
}
+
+ @Test
+ @Timeout(20)
+ public void testSharedDurableAddressSubscriptionRecoveredOnRestart() throws
Exception {
+ logger.info("Test started: {}", getTestName());
+
+ final String filterString = "color='red'";
+
+ final AMQPBridgeAddressPolicyElement bridgeAddressPolicy = new
AMQPBridgeAddressPolicyElement();
+ bridgeAddressPolicy.setName("test-policy");
+ bridgeAddressPolicy.setUseDurableSubscriptions(true);
+ bridgeAddressPolicy.setFilter(filterString);
+ bridgeAddressPolicy.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(bridgeAddressPolicy);
+ element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:"
+ SERVER_PORT_REMOTE);
+ amqpConnection.setReconnectAttempts(10); // Limit reconnects
+ amqpConnection.setRetryInterval(50);
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ remoteServer.start();
+ server.start();
+
+ // Create an address with a binding to simulate demand from a consumer
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+ // Wait for the bridge to form to the remote and capture the durable
subscription name
+ Wait.assertEquals(1L, () ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size(), 500_000, 50);
+
+ // The actual subscription queue for the "shared" bridge receivers
should be a stable queue
+ final String subscriptionQueueName =
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+ assertNotNull(subscriptionQueueName);
+ assertTrue(subscriptionQueueName.contains("amqp-bridge-"));
+ assertTrue(subscriptionQueueName.contains(getTestName()));
+
+ final org.apache.activemq.artemis.core.server.Queue subscriptionQueue =
remoteServer.locateQueue(subscriptionQueueName);
+
+ assertNotNull(subscriptionQueue);
+ Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+ assertTrue(subscriptionQueue.isDurable());
+ assertEquals(filterString,
subscriptionQueue.getFilter().getFilterString().toString());
+
+ server.stop();
+
+ Wait.assertEquals(1L, () ->
remoteServer.bindingQuery(SimpleString.of(getTestName()),
false).getQueueNames().size(), 5_000, 50);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+
+ final ConnectionFactory factoryRemote =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+ try (Connection connection = factoryRemote.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName());
+ final MessageProducer producerL = session.createProducer(topic);
+ final TextMessage message = session.createTextMessage("Hello World");
+
+ message.setStringProperty("color", "green");
+ producerL.send(message);
+ message.setStringProperty("color", "red");
+ producerL.send(message);
+
+ Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(),
5_000, 100);
+ }
+
+ server.start();
+
+ // Server should re-attach and recover the subscription and take the
message
+ Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000,
100);
+ Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000,
100);
+
+ final ConnectionFactory factoryLocal =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+ try (Connection connection = factoryLocal.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic(getTestName() + "::" +
getTestName()); // Access our pre-created queue via FQQN
+ final MessageConsumer consumer = session.createConsumer(topic);
+
+ connection.start();
+
+ final Message receivedAfter = consumer.receive(5_000);
+
+ assertNotNull(receivedAfter);
+ assertTrue(receivedAfter instanceof TextMessage);
+ assertEquals("Hello World", ((TextMessage) receivedAfter).getText());
+ assertTrue(receivedAfter.propertyExists("color"));
+ assertEquals("red", receivedAfter.getStringProperty("color"));
+ }
Review Comment:
For completeness I'd send another matching message after its restored to
verify that also comes through as expected.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java:
##########
@@ -237,26 +237,30 @@ public ProtonClientProtocolManager createClientManager() {
@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed,
Connection remotingConnection) {
- return internalConnectionEntry(remotingConnection, false, null, null);
+ return internalConnectionEntry(remotingConnection, false, null, null,
null, null);
}
/**
* This method is not part of the ProtocolManager interface because it only
makes sense on AMQP. More specifically on
* AMQP Bridges
*/
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection) {
- return internalConnectionEntry(remotingConnection, true, null, null);
+ return internalConnectionEntry(remotingConnection, true, null, null,
null, null);
}
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
null);
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
null, null, null);
}
public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties) {
- return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties);
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, null, null);
}
- private ConnectionEntry internalConnectionEntry(Connection
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory,
Map<Symbol, Object> connectionProperties) {
+ public ConnectionEntry createOutgoingConnectionEntry(Connection
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object>
connectionProperties, Symbol[] offeredCapabilities, Symbol[]
desiredCapabilities) {
+ return internalConnectionEntry(remotingConnection, true, saslFactory,
connectionProperties, offeredCapabilities, desiredCapabilities);
Review Comment:
offeredCapabilities doesnt look to be used
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java:
##########
@@ -362,4 +364,18 @@ public long getAutoDeleteDurableSubscriptionDelay() {
return DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY;
}
}
+
+ /**
+ * {@return <code>true</code> if bridge from address policies are
configured to prefer using shared durable address subscriptions}
+ */
+ public boolean isPreferSharedDuableSubscriptions() {
Review Comment:
Duable -> Durable
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java:
##########
@@ -3644,6 +3647,238 @@ public void
testBridgeCreateJMSStyleDurableSubscriptionWhenConfiguredTo() throws
}
}
+ @Test
+ @Timeout(20)
+ public void
testNewSharedDurableBridgeReceiverCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+
peer.expectOpen().respond().withOfferedCapabilities(SHARED_SUBS.toString());
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBridgeAddressPolicyElement receiveFromAddress = new
AMQPBridgeAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.setUseDurableSubscriptions(true);
+ receiveFromAddress.addToIncludes(getTestName());
+
+ final AMQPBridgeBrokerConnectionElement element = new
AMQPBridgeBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addBridgeFromAddressPolicy(receiveFromAddress);
+ element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+ element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ final AtomicReference<Attach> capturedAttach1 = new
AtomicReference<>();
+ final AtomicReference<Attach> capturedAttach2 = new
AtomicReference<>();
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture(attach -> capturedAttach1.set(attach))
+ .withTarget().withAddress(getTestName()).also()
+ .withSource().withAddress(getTestName())
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+
.withDistributionMode(AmqpSupport.COPY.toString())
+ .withCapabilities(SHARED.toString())
+ .also()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+ containsString("amqp-bridge"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond().afterDelay(50); // Defer the detach
response for a bit
+
+ server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()),
RoutingType.MULTICAST));
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ // Create demand on the address which creates a bridge receiver then
let it close which
+ // should shut down that bridge receiver. We removed the idle timeout
wait so that we
+ // send a detach almost immediately and then add demand again before
the remote has likely
+ // sent its detach response.
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ consumer.receiveNoWait();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+ .withCapture(attach -> capturedAttach2.set(attach))
+ .withTarget().withAddress(getTestName()).also()
+ .withSource().withAddress(getTestName())
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+
.withDistributionMode(AmqpSupport.COPY.toString())
+ .withCapabilities(SHARED.toString())
+ .also()
+ .withName(allOf(containsString(getTestName()),
+ containsString("address-receiver"),
+ containsString("amqp-bridge"),
+
containsString(server.getNodeID().toString())))
+ .respond();
+ peer.expectFlow().withLinkCredit(1000);
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach().respond();
+
+ // Create demand on the address which creates a bridge receiver again
quickly which
+ // can trigger a new receiver before the previous one was fully
closed with a Detach
+ // response and get stuck because it will steal the link in proton
and not be treated
+ // as a new attach for this consumer.
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic(getTestName()));
+
+ connection.start();
+
+ consumer.receiveNoWait();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ // Shared subs should be used and the sequence number is in the link
name means they are not euqal
Review Comment:
euqal - > equal
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact