gemmellr commented on code in PR #5793: URL: https://github.com/apache/activemq-artemis/pull/5793#discussion_r2163692859
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java: ########## @@ -195,4 +196,18 @@ public boolean isReceiverDemandTrackingDisabled() { return configuration.isReceiverDemandTrackingDisabled(); } } + + /** + * {@return <code>true</code> if bridge from address policies are configured to prefer using shared durable address subscriptions} + */ + public boolean isPreferSharedDuableSubscriptions() { Review Comment: Duable -> Durable ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java: ########## @@ -1270,4 +1274,99 @@ public void testDurableAddressSubscriptionRecoveredOnRestart() throws Exception assertEquals("red", receivedAfter.getStringProperty("color")); } } + + @Test + @Timeout(20) + public void testSharedDurableAddressSubscriptionRecoveredOnRestart() throws Exception { + logger.info("Test started: {}", getTestName()); + + final String filterString = "color='red'"; + + final AMQPBridgeAddressPolicyElement bridgeAddressPolicy = new AMQPBridgeAddressPolicyElement(); + bridgeAddressPolicy.setName("test-policy"); + bridgeAddressPolicy.setUseDurableSubscriptions(true); + bridgeAddressPolicy.setFilter(filterString); + bridgeAddressPolicy.addToIncludes(getTestName()); + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromAddressPolicy(bridgeAddressPolicy); + element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + SERVER_PORT_REMOTE); + amqpConnection.setReconnectAttempts(10); // Limit reconnects + amqpConnection.setRetryInterval(50); + amqpConnection.addElement(element); + + server.getConfiguration().addAMQPConnection(amqpConnection); + remoteServer.start(); + server.start(); + + // Create an address with a binding to simulate demand from a consumer + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + // Wait for the bridge to form to the remote and capture the durable subscription name + Wait.assertEquals(1L, () -> remoteServer.bindingQuery(SimpleString.of(getTestName()), false).getQueueNames().size(), 500_000, 50); + + // The actual subscription queue for the "shared" bridge receivers should be a stable queue + final String subscriptionQueueName = remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString(); + + assertNotNull(subscriptionQueueName); + assertTrue(subscriptionQueueName.contains("amqp-bridge-")); + assertTrue(subscriptionQueueName.contains(getTestName())); + + final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = remoteServer.locateQueue(subscriptionQueueName); + + assertNotNull(subscriptionQueue); + Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000, 100); + assertTrue(subscriptionQueue.isDurable()); + assertEquals(filterString, subscriptionQueue.getFilter().getFilterString().toString()); + + server.stop(); + + Wait.assertEquals(1L, () -> remoteServer.bindingQuery(SimpleString.of(getTestName()), false).getQueueNames().size(), 5_000, 50); + Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 100); + + final ConnectionFactory factoryRemote = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE); + + try (Connection connection = factoryRemote.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName()); + final MessageProducer producerL = session.createProducer(topic); + final TextMessage message = session.createTextMessage("Hello World"); + + message.setStringProperty("color", "green"); + producerL.send(message); + message.setStringProperty("color", "red"); + producerL.send(message); + + Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(), 5_000, 100); + } + + server.start(); + + // Server should re-attach and recover the subscription and take the message + Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000, 100); + Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 100); + + final ConnectionFactory factoryLocal = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT); + + try (Connection connection = factoryLocal.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final Topic topic = session.createTopic(getTestName() + "::" + getTestName()); // Access our pre-created queue via FQQN + final MessageConsumer consumer = session.createConsumer(topic); + + connection.start(); + + final Message receivedAfter = consumer.receive(5_000); + + assertNotNull(receivedAfter); + assertTrue(receivedAfter instanceof TextMessage); + assertEquals("Hello World", ((TextMessage) receivedAfter).getText()); + assertTrue(receivedAfter.propertyExists("color")); + assertEquals("red", receivedAfter.getStringProperty("color")); + } Review Comment: For completeness I'd send another matching message after its restored to verify that also comes through as expected. ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java: ########## @@ -237,26 +237,30 @@ public ProtonClientProtocolManager createClientManager() { @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { - return internalConnectionEntry(remotingConnection, false, null, null); + return internalConnectionEntry(remotingConnection, false, null, null, null, null); } /** * This method is not part of the ProtocolManager interface because it only makes sense on AMQP. More specifically on * AMQP Bridges */ public ConnectionEntry createOutgoingConnectionEntry(Connection remotingConnection) { - return internalConnectionEntry(remotingConnection, true, null, null); + return internalConnectionEntry(remotingConnection, true, null, null, null, null); } public ConnectionEntry createOutgoingConnectionEntry(Connection remotingConnection, ClientSASLFactory saslFactory) { - return internalConnectionEntry(remotingConnection, true, saslFactory, null); + return internalConnectionEntry(remotingConnection, true, saslFactory, null, null, null); } public ConnectionEntry createOutgoingConnectionEntry(Connection remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> connectionProperties) { - return internalConnectionEntry(remotingConnection, true, saslFactory, connectionProperties); + return internalConnectionEntry(remotingConnection, true, saslFactory, connectionProperties, null, null); } - private ConnectionEntry internalConnectionEntry(Connection remotingConnection, boolean outgoing, ClientSASLFactory saslFactory, Map<Symbol, Object> connectionProperties) { + public ConnectionEntry createOutgoingConnectionEntry(Connection remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> connectionProperties, Symbol[] offeredCapabilities, Symbol[] desiredCapabilities) { + return internalConnectionEntry(remotingConnection, true, saslFactory, connectionProperties, offeredCapabilities, desiredCapabilities); Review Comment: offeredCapabilities doesnt look to be used ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java: ########## @@ -362,4 +364,18 @@ public long getAutoDeleteDurableSubscriptionDelay() { return DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY; } } + + /** + * {@return <code>true</code> if bridge from address policies are configured to prefer using shared durable address subscriptions} + */ + public boolean isPreferSharedDuableSubscriptions() { Review Comment: Duable -> Durable ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java: ########## @@ -3644,6 +3647,238 @@ public void testBridgeCreateJMSStyleDurableSubscriptionWhenConfiguredTo() throws } } + @Test + @Timeout(20) + public void testNewSharedDurableBridgeReceiverCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond().withOfferedCapabilities(SHARED_SUBS.toString()); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.info("Test started, peer listening on: {}", remoteURI); + + final AMQPBridgeAddressPolicyElement receiveFromAddress = new AMQPBridgeAddressPolicyElement(); + receiveFromAddress.setName("address-policy"); + receiveFromAddress.setUseDurableSubscriptions(true); + receiveFromAddress.addToIncludes(getTestName()); + + final AMQPBridgeBrokerConnectionElement element = new AMQPBridgeBrokerConnectionElement(); + element.setName(getTestName()); + element.addBridgeFromAddressPolicy(receiveFromAddress); + element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0); + element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true"); + + final AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.addElement(element); + + final AtomicReference<Attach> capturedAttach1 = new AtomicReference<>(); + final AtomicReference<Attach> capturedAttach2 = new AtomicReference<>(); + + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withCapture(attach -> capturedAttach1.set(attach)) + .withTarget().withAddress(getTestName()).also() + .withSource().withAddress(getTestName()) + .withDurable(TerminusDurability.UNSETTLED_STATE) + .withExpiryPolicy(TerminusExpiryPolicy.NEVER) + .withDistributionMode(AmqpSupport.COPY.toString()) + .withCapabilities(SHARED.toString()) + .also() + .withName(allOf(containsString(getTestName()), + containsString("address-receiver"), + containsString("amqp-bridge"), + containsString(server.getNodeID().toString()))) + .respond(); + peer.expectFlow().withLinkCredit(1000); + peer.expectFlow().withLinkCredit(1000).withDrain(true) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDetach().respond().afterDelay(50); // Defer the detach response for a bit + + server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), RoutingType.MULTICAST)); + + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + // Create demand on the address which creates a bridge receiver then let it close which + // should shut down that bridge receiver. We removed the idle timeout wait so that we + // send a detach almost immediately and then add demand again before the remote has likely + // sent its detach response. + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName())); + + connection.start(); + + consumer.receiveNoWait(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofReceiver() + .withCapture(attach -> capturedAttach2.set(attach)) + .withTarget().withAddress(getTestName()).also() + .withSource().withAddress(getTestName()) + .withDurable(TerminusDurability.UNSETTLED_STATE) + .withExpiryPolicy(TerminusExpiryPolicy.NEVER) + .withDistributionMode(AmqpSupport.COPY.toString()) + .withCapabilities(SHARED.toString()) + .also() + .withName(allOf(containsString(getTestName()), + containsString("address-receiver"), + containsString("amqp-bridge"), + containsString(server.getNodeID().toString()))) + .respond(); + peer.expectFlow().withLinkCredit(1000); + peer.expectFlow().withLinkCredit(1000).withDrain(true) + .respond() + .withLinkCredit(0).withDeliveryCount(1000).withDrain(true); + peer.expectDetach().respond(); + + // Create demand on the address which creates a bridge receiver again quickly which + // can trigger a new receiver before the previous one was fully closed with a Detach + // response and get stuck because it will steal the link in proton and not be treated + // as a new attach for this consumer. + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName())); + + connection.start(); + + consumer.receiveNoWait(); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + // Shared subs should be used and the sequence number is in the link name means they are not euqal Review Comment: euqal - > equal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact