gemmellr commented on code in PR #5793:
URL: https://github.com/apache/activemq-artemis/pull/5793#discussion_r2163692859


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeReceiverConfiguration.java:
##########
@@ -195,4 +196,18 @@ public boolean isReceiverDemandTrackingDisabled() {
          return configuration.isReceiverDemandTrackingDisabled();
       }
    }
+
+   /**
+    * {@return <code>true</code> if bridge from address policies are 
configured to prefer using shared durable address subscriptions}
+    */
+   public boolean isPreferSharedDuableSubscriptions() {

Review Comment:
   Duable -> Durable



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeServerToServerTest.java:
##########
@@ -1270,4 +1274,99 @@ public void 
testDurableAddressSubscriptionRecoveredOnRestart() throws Exception
          assertEquals("red", receivedAfter.getStringProperty("color"));
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void testSharedDurableAddressSubscriptionRecoveredOnRestart() throws 
Exception {
+      logger.info("Test started: {}", getTestName());
+
+      final String filterString = "color='red'";
+
+      final AMQPBridgeAddressPolicyElement bridgeAddressPolicy = new 
AMQPBridgeAddressPolicyElement();
+      bridgeAddressPolicy.setName("test-policy");
+      bridgeAddressPolicy.setUseDurableSubscriptions(true);
+      bridgeAddressPolicy.setFilter(filterString);
+      bridgeAddressPolicy.addToIncludes(getTestName());
+
+      final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+      element.setName(getTestName());
+      element.addBridgeFromAddressPolicy(bridgeAddressPolicy);
+      element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+      final AMQPBrokerConnectConfiguration amqpConnection =
+         new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" 
+ SERVER_PORT_REMOTE);
+      amqpConnection.setReconnectAttempts(10); // Limit reconnects
+      amqpConnection.setRetryInterval(50);
+      amqpConnection.addElement(element);
+
+      server.getConfiguration().addAMQPConnection(amqpConnection);
+      remoteServer.start();
+      server.start();
+
+      // Create an address with a binding to simulate demand from a consumer
+      
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
+                                                             
.setAddress(getTestName())
+                                                             
.setAutoCreated(false));
+      // Wait for the bridge to form to the remote and capture the durable 
subscription name
+      Wait.assertEquals(1L, () -> 
remoteServer.bindingQuery(SimpleString.of(getTestName()), 
false).getQueueNames().size(), 500_000, 50);
+
+      // The actual subscription queue for the "shared" bridge receivers 
should be a stable queue
+      final String subscriptionQueueName = 
remoteServer.bindingQuery(SimpleString.of(getTestName())).getQueueNames().get(0).toString();
+
+      assertNotNull(subscriptionQueueName);
+      assertTrue(subscriptionQueueName.contains("amqp-bridge-"));
+      assertTrue(subscriptionQueueName.contains(getTestName()));
+
+      final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = 
remoteServer.locateQueue(subscriptionQueueName);
+
+      assertNotNull(subscriptionQueue);
+      Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+      assertTrue(subscriptionQueue.isDurable());
+      assertEquals(filterString, 
subscriptionQueue.getFilter().getFilterString().toString());
+
+      server.stop();
+
+      Wait.assertEquals(1L, () -> 
remoteServer.bindingQuery(SimpleString.of(getTestName()), 
false).getQueueNames().size(), 5_000, 50);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+
+      final ConnectionFactory factoryRemote = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT_REMOTE);
+
+      try (Connection connection = factoryRemote.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName());
+         final MessageProducer producerL = session.createProducer(topic);
+         final TextMessage message = session.createTextMessage("Hello World");
+
+         message.setStringProperty("color", "green");
+         producerL.send(message);
+         message.setStringProperty("color", "red");
+         producerL.send(message);
+
+         Wait.assertEquals(1L, () -> subscriptionQueue.getMessageCount(), 
5_000, 100);
+      }
+
+      server.start();
+
+      // Server should re-attach and recover the subscription and take the 
message
+      Wait.assertEquals(1L, () -> subscriptionQueue.getConsumerCount(), 5_000, 
100);
+      Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 5_000, 
100);
+
+      final ConnectionFactory factoryLocal = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + SERVER_PORT);
+
+      try (Connection connection = factoryLocal.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final Topic topic = session.createTopic(getTestName() + "::" + 
getTestName()); // Access our pre-created queue via FQQN
+         final MessageConsumer consumer = session.createConsumer(topic);
+
+         connection.start();
+
+         final Message receivedAfter = consumer.receive(5_000);
+
+         assertNotNull(receivedAfter);
+         assertTrue(receivedAfter instanceof TextMessage);
+         assertEquals("Hello World", ((TextMessage) receivedAfter).getText());
+         assertTrue(receivedAfter.propertyExists("color"));
+         assertEquals("red", receivedAfter.getStringProperty("color"));
+      }

Review Comment:
   For completeness I'd send another matching message after its restored to 
verify that also comes through as expected.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java:
##########
@@ -237,26 +237,30 @@ public ProtonClientProtocolManager createClientManager() {
 
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection remotingConnection) {
-      return internalConnectionEntry(remotingConnection, false, null, null);
+      return internalConnectionEntry(remotingConnection, false, null, null, 
null, null);
    }
 
    /**
     * This method is not part of the ProtocolManager interface because it only 
makes sense on AMQP. More specifically on
     * AMQP Bridges
     */
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection) {
-      return internalConnectionEntry(remotingConnection, true, null, null);
+      return internalConnectionEntry(remotingConnection, true, null, null, 
null, null);
    }
 
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
null);
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
null, null, null);
    }
 
    public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties) {
-      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties);
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, null, null);
    }
 
-   private ConnectionEntry internalConnectionEntry(Connection 
remotingConnection, boolean outgoing, ClientSASLFactory saslFactory, 
Map<Symbol, Object> connectionProperties) {
+   public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory, Map<Symbol, Object> 
connectionProperties, Symbol[] offeredCapabilities, Symbol[] 
desiredCapabilities) {
+      return internalConnectionEntry(remotingConnection, true, saslFactory, 
connectionProperties, offeredCapabilities, desiredCapabilities);

Review Comment:
   offeredCapabilities doesnt look to be used



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/bridge/AMQPBridgeConfiguration.java:
##########
@@ -362,4 +364,18 @@ public long getAutoDeleteDurableSubscriptionDelay() {
          return DEFAULT_AUTO_DELETE_DURABLE_SUBSCRIPTION_DELAY;
       }
    }
+
+   /**
+    * {@return <code>true</code> if bridge from address policies are 
configured to prefer using shared durable address subscriptions}
+    */
+   public boolean isPreferSharedDuableSubscriptions() {

Review Comment:
   Duable -> Durable



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeFromAddressTest.java:
##########
@@ -3644,6 +3647,238 @@ public void 
testBridgeCreateJMSStyleDurableSubscriptionWhenConfiguredTo() throws
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testNewSharedDurableBridgeReceiverCreatedWhenDemandRemovedAndAddedWithDelayedPreviousDetach()
 throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         
peer.expectOpen().respond().withOfferedCapabilities(SHARED_SUBS.toString());
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBridgeAddressPolicyElement receiveFromAddress = new 
AMQPBridgeAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.setUseDurableSubscriptions(true);
+         receiveFromAddress.addToIncludes(getTestName());
+
+         final AMQPBridgeBrokerConnectionElement element = new 
AMQPBridgeBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addBridgeFromAddressPolicy(receiveFromAddress);
+         element.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+         element.addProperty(PREFER_SHARED_DURABLE_SUBSCRIPTIONS, "true");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         final AtomicReference<Attach> capturedAttach1 = new 
AtomicReference<>();
+         final AtomicReference<Attach> capturedAttach2 = new 
AtomicReference<>();
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            .withCapture(attach -> capturedAttach1.set(attach))
+                            .withTarget().withAddress(getTestName()).also()
+                            .withSource().withAddress(getTestName())
+                                         
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                         
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+                                         
.withDistributionMode(AmqpSupport.COPY.toString())
+                                         .withCapabilities(SHARED.toString())
+                                         .also()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            containsString("amqp-bridge"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+         peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                          .respond()
+                          
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+         peer.expectDetach().respond().afterDelay(50); // Defer the detach 
response for a bit
+
+         server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), 
RoutingType.MULTICAST));
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         // Create demand on the address which creates a bridge receiver then 
let it close which
+         // should shut down that bridge receiver. We removed the idle timeout 
wait so that we
+         // send a detach almost immediately and then add demand again before 
the remote has likely
+         // sent its detach response.
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            consumer.receiveNoWait();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            .withCapture(attach -> capturedAttach2.set(attach))
+                            .withTarget().withAddress(getTestName()).also()
+                            .withSource().withAddress(getTestName())
+                                         
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                         
.withExpiryPolicy(TerminusExpiryPolicy.NEVER)
+                                         
.withDistributionMode(AmqpSupport.COPY.toString())
+                                         .withCapabilities(SHARED.toString())
+                                         .also()
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("address-receiver"),
+                                            containsString("amqp-bridge"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respond();
+         peer.expectFlow().withLinkCredit(1000);
+         peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                          .respond()
+                          
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+         peer.expectDetach().respond();
+
+         // Create demand on the address which creates a bridge receiver again 
quickly which
+         // can trigger a new receiver before the previous one was fully 
closed with a Detach
+         // response and get stuck because it will steal the link in proton 
and not be treated
+         // as a new attach for this consumer.
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createTopic(getTestName()));
+
+            connection.start();
+
+            consumer.receiveNoWait();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Shared subs should be used and the sequence number is in the link 
name means they are not euqal

Review Comment:
   euqal - > equal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to