This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new fad1f5274c ARTEMIS-4799 Fix broker connection receiver attach handling
fad1f5274c is described below

commit fad1f5274cc609d01c8b83514c9cfa4ad0b4fa2e
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jun 6 14:53:02 2024 -0400

    ARTEMIS-4799 Fix broker connection receiver attach handling
    
    The receiver attach in broker connection does not wait for the remote
    attach to arrive before creating the broker side receiver plumbing which
    leads to the broker treating the remote sender as an anonymous relay when
    it is not and should not be. Await the remote attach in response to the
    attach sent by the broker connection to finish the link setup but use the
    locally defined target address vs the remote to route the incoming messages.
---
 .../amqp/connect/AMQPBrokerConnection.java         | 125 ++++++--
 .../logger/ActiveMQAMQPProtocolMessageBundle.java  |   2 +
 .../connect/AMQPBrokerConnectionReceiverTest.java  | 332 +++++++++++++++++++++
 3 files changed, 432 insertions(+), 27 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 6b3c6b45da..7617b95921 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -75,6 +75,8 @@ import 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederati
 import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
 import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
 import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
+import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
@@ -84,6 +86,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMes
 import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
+import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
 import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
@@ -284,13 +287,10 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          Symbol[] dispatchCapability = new 
Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
          connectSender(queue, queue.getAddress().toString(), null, null, null, 
null, dispatchCapability, null);
          connectReceiver(protonRemotingConnection, session, sessionContext, 
queue, dispatchCapability);
-      } else {
-         if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.SENDER) {
-            connectSender(queue, queue.getAddress().toString(), null, null, 
null, null, null, null);
-         }
-         if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.RECEIVER) {
-            connectReceiver(protonRemotingConnection, session, sessionContext, 
queue);
-         }
+      } else if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.SENDER) {
+         connectSender(queue, queue.getAddress().toString(), null, null, null, 
null, null, null);
+      } else if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.RECEIVER) {
+         connectReceiver(protonRemotingConnection, session, sessionContext, 
queue);
       }
    }
 
@@ -644,34 +644,105 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
       }
 
       protonRemotingConnection.getAmqpConnection().runLater(() -> {
-
-         if (receivers.contains(queue)) {
+         if (!receivers.add(queue)) {
             logger.debug("Receiver for queue {} already exists, just giving 
up", queue);
             return;
          }
-         receivers.add(queue);
-         Receiver receiver = session.receiver(queue.getAddress().toString() + 
":" + UUIDGenerator.getInstance().generateStringUUID());
-         receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
-         receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
-         Target target = new Target();
-         target.setAddress(queue.getAddress().toString());
-         receiver.setTarget(target);
-
-         Source source = new Source();
-         source.setAddress(queue.getAddress().toString());
-         receiver.setSource(source);
-
-         if (capabilities != null) {
-            source.setCapabilities(capabilities);
-         }
 
-         receiver.open();
-         protonRemotingConnection.getAmqpConnection().flush();
          try {
-            sessionContext.addReceiver(receiver);
+            final String linkName = queue.getAddress().toString() + ":" + 
UUIDGenerator.getInstance().generateStringUUID();
+            final Receiver receiver = session.receiver(linkName);
+            final String queueAddress = queue.getAddress().toString();
+
+            final Target target = new Target();
+            target.setAddress(queueAddress);
+            final Source source = new Source();
+            source.setAddress(queueAddress);
+            if (capabilities != null) {
+               source.setCapabilities(capabilities);
+            }
+
+            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+            receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+            receiver.setTarget(target);
+            receiver.setSource(source);
+            receiver.open();
+
+            final ScheduledFuture<?> openTimeoutTask;
+            final AtomicBoolean openTimedOut = new AtomicBoolean(false);
+
+            if (getConnectionTimeout() > 0) {
+               openTimeoutTask = server.getScheduledPool().schedule(() -> {
+                  openTimedOut.set(true);
+                  
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), 
lastRetryCounter);
+               }, getConnectionTimeout(), TimeUnit.MILLISECONDS);
+            } else {
+               openTimeoutTask = null;
+            }
+
+            // Await the remote attach before creating the broker receiver in 
order to impose a timeout
+            // on the attach response and then try and create the local server 
receiver context and finish
+            // the wiring.
+            receiver.attachments().set(AMQP_LINK_INITIALIZER_KEY, 
Runnable.class, () -> {
+               try {
+                  if (openTimeoutTask != null) {
+                     openTimeoutTask.cancel(false);
+                  }
+
+                  if (openTimedOut.get()) {
+                     return; // Timed out before remote attach arrived
+                  }
+
+                  if (receiver.getRemoteSource() != null) {
+                     logger.trace("AMQP Broker Connection Receiver {} 
completed open", linkName);
+                  } else {
+                     logger.debug("AMQP Broker Connection Receiver {} rejected 
by remote", linkName);
+                     
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.receiverLinkRefused(queueAddress),
 lastRetryCounter);
+                     return;
+                  }
+
+                  sessionContext.addReceiver(receiver, (r, s) -> {
+                     // Returns a customized server receiver context that will 
respect the locally initiated state
+                     // when the receiver is initialized vs the remotely sent 
target as we want to ensure we attach
+                     // the receiver to the address we set in our local state.
+                     return new 
ProtonServerReceiverContext(sessionContext.getSessionSPI(),
+                                                            
sessionContext.getAMQPConnectionContext(),
+                                                            sessionContext, 
receiver) {
+
+                        @Override
+                        public void initialize() throws Exception {
+                           initialized = true;
+                           address = SimpleString.of(target.getAddress());
+                           defRoutingType = 
getRoutingType(target.getCapabilities(), address);
+
+                           try {
+                              // Check if the queue that triggered the attach 
still exists or has it been removed
+                              // before the attach response arrived from the 
remote peer.
+                              if (!sessionSPI.queueQuery(queue.getName(), 
queue.getRoutingType(), false).isExists()) {
+                                 throw 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
+                              }
+                           } catch (ActiveMQAMQPException e) {
+                              receivers.remove(queue);
+                              throw e;
+                           } catch (Exception e) {
+                              logger.debug(e.getMessage(), e);
+                              receivers.remove(queue);
+                              throw new 
ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+                           }
+
+                           flow();
+                        }
+                     };
+                  });
+               } catch (Exception e) {
+                  error(e);
+               }
+            });
          } catch (Exception e) {
             error(e);
          }
+
+         protonRemotingConnection.getAmqpConnection().flush();
       });
    }
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
index 64ca31ed82..68be6c3278 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
@@ -115,4 +115,6 @@ public interface ActiveMQAMQPProtocolMessageBundle {
    @Message(id = 119028, value = "Malformed Federation event message: {}")
    ActiveMQException malformedFederationEventMessage(String message);
 
+   @Message(id = 119029, value =  "Receiver link refused for address {}")
+   ActiveMQAMQPIllegalStateException receiverLinkRefused(String address);
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java
new file mode 100644
index 0000000000..199cf48d06
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.qpid.proton.amqp.transport.LinkError;
+import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the Receiver functionality on AMQP broker connections
+ */
+@Timeout(20)
+public class AMQPBrokerConnectionReceiverTest extends AmqpClientTestSupport {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,CORE";
+   }
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      // Creates the broker used to make the outgoing connection. The port 
passed is for
+      // that brokers acceptor. The test server connected to by the broker 
binds to a random port.
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Test
+   public void testBrokerConnectionCreatesReceiverOnRemote() throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofReceiver().respondInKind();
+         peer.expectFlow();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         LOG.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBrokerConnectionElement element = new 
AMQPBrokerConnectionElement();
+         element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+         element.setName(getTestName());
+         element.setMatchAddress("test");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);
+         amqpConnection.addElement(element);
+         amqpConnection.setAutostart(true);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         server.createQueue(new 
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+                                                          .setAddress("test")
+                                                          
.setAutoCreated(false));
+
+         peer.waitForScriptToComplete();
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete();
+         peer.close();
+      }
+   }
+
+   @Test
+   public void testIncomingMessageWithNoToFieldArrivesOnConfiguredAddress() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         LOG.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBrokerConnectionElement element = new 
AMQPBrokerConnectionElement();
+         element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+         element.setName(getTestName());
+         element.setMatchAddress("test");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);
+         amqpConnection.addElement(element);
+         amqpConnection.setAutostart(true);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         
peer.expectAttach().ofReceiver().withSource().withAddress("test").and()
+                                         
.withTarget().withAddress("test").and()
+                                         .respondInKind();
+         peer.expectFlow();
+         peer.remoteTransfer().withDeliveryId(1)
+                              .withBody().withString("test-body").also()
+                              .queue();
+         peer.expectDisposition().withState().accepted();
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final Topic topic = session.createTopic("test");
+            final MessageConsumer consumer = session.createConsumer(topic);
+
+            connection.start();
+
+            final Message received = consumer.receive(5_000);
+            assertNotNull(received);
+
+            consumer.close();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete();
+         peer.close();
+      }
+   }
+
+   @Test
+   public void testIncomingMessageWithToFieldArrivesOnConfiguredAddress() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         LOG.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBrokerConnectionElement element = new 
AMQPBrokerConnectionElement();
+         element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+         element.setName(getTestName());
+         element.setMatchAddress("test");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);
+         amqpConnection.addElement(element);
+         amqpConnection.setAutostart(true);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         
peer.expectAttach().ofReceiver().withSource().withAddress("test").and()
+                                         
.withTarget().withAddress("test").and()
+                                         .respondInKind();
+         peer.expectFlow();
+         peer.remoteTransfer().withDeliveryId(1)
+                              
.withProperties().withTo("should-not-be-used").also()
+                              .withBody().withString("test-body").also()
+                              .queue();
+         peer.expectDisposition().withState().accepted();
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final Topic topic = session.createTopic("test");
+            final MessageConsumer consumer = session.createConsumer(topic);
+
+            connection.start();
+
+            final Message received = consumer.receive(5_000);
+            assertNotNull(received);
+
+            consumer.close();
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete();
+         peer.close();
+      }
+   }
+
+   @Test
+   public void testBrokerConnectionRetriesReceiverOnRemoteIfAttachRejected() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofReceiver().reject(true, 
LinkError.DETACH_FORCED.toString(), "Attach refused");
+         peer.expectDetach().optional();
+         peer.expectClose().optional();
+         peer.expectConnectionToDrop();
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofReceiver().respondInKind();
+         peer.expectFlow();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         LOG.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBrokerConnectionElement element = new 
AMQPBrokerConnectionElement();
+         element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+         element.setName(getTestName());
+         element.setMatchAddress("test");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(10);
+         amqpConnection.setRetryInterval(100);
+         amqpConnection.addElement(element);
+         amqpConnection.setAutostart(true);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         server.createQueue(new 
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+                                                          .setAddress("test")
+                                                          
.setAutoCreated(false));
+
+         peer.waitForScriptToComplete();
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete();
+         peer.close();
+      }
+   }
+
+   @Test
+   public void 
testBrokerConnectionRetriesReceiverOnRemoteIfTargetQueueRemovedAndLaterAddedBack()
 throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.execute(() -> {
+            try {
+               server.createQueue(new 
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress("test")
+                                                                
.setAutoCreated(false));
+            } catch (Exception e) {
+               LOG.warn("Error on creating server address and queue: ", e);
+            }
+         }).queue();
+         peer.expectAttach().ofReceiver();
+         peer.execute(() -> {
+            try {
+               server.removeAddressInfo(SimpleString.of("test"), null, true);
+            } catch (Exception e) {
+               LOG.warn("Error on removing server address and queue: ", e);
+            }
+            peer.respondToLastAttach().now();
+         }).queue();
+         peer.expectDetach().respond();
+         peer.execute(() -> {
+            try {
+               server.createQueue(new 
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress("test")
+                                                                
.setAutoCreated(false));
+            } catch (Exception e) {
+               LOG.warn("Error on creating server address and queue: ", e);
+            }
+         }).queue();
+         peer.expectAttach().ofReceiver().respondInKind();
+         peer.expectFlow();
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         LOG.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPBrokerConnectionElement element = new 
AMQPBrokerConnectionElement();
+         element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+         element.setName(getTestName());
+         element.setMatchAddress("test");
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(10);
+         amqpConnection.setRetryInterval(50);
+         amqpConnection.addElement(element);
+         amqpConnection.setAutostart(true);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete();
+         peer.expectClose();
+         peer.remoteClose().now();
+         peer.waitForScriptToComplete();
+         peer.close();
+      }
+   }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to