This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new fad1f5274c ARTEMIS-4799 Fix broker connection receiver attach handling
fad1f5274c is described below
commit fad1f5274cc609d01c8b83514c9cfa4ad0b4fa2e
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Jun 6 14:53:02 2024 -0400
ARTEMIS-4799 Fix broker connection receiver attach handling
The receiver attach in broker connection does not wait for the remote
attach to arrive before creating the broker side receiver plumbing which
leads to the broker treating the remote sender as an anonymous relay when
it is not and should not be. Await the remote attach in response to the
attach sent by the broker connection to finish the link setup but use the
locally defined target address vs the remote to route the incoming messages.
---
.../amqp/connect/AMQPBrokerConnection.java | 125 ++++++--
.../logger/ActiveMQAMQPProtocolMessageBundle.java | 2 +
.../connect/AMQPBrokerConnectionReceiverTest.java | 332 +++++++++++++++++++++
3 files changed, 432 insertions(+), 27 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 6b3c6b45da..7617b95921 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -75,6 +75,8 @@ import
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederati
import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
+import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
@@ -84,6 +86,7 @@ import
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreLargeMes
import
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageWriter;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.MessageWriter;
+import
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
@@ -284,13 +287,10 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
Symbol[] dispatchCapability = new
Symbol[]{AMQPMirrorControllerSource.QPID_DISPATCH_WAYPOINT_CAPABILITY};
connectSender(queue, queue.getAddress().toString(), null, null, null,
null, dispatchCapability, null);
connectReceiver(protonRemotingConnection, session, sessionContext,
queue, dispatchCapability);
- } else {
- if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.SENDER) {
- connectSender(queue, queue.getAddress().toString(), null, null,
null, null, null, null);
- }
- if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.RECEIVER) {
- connectReceiver(protonRemotingConnection, session, sessionContext,
queue);
- }
+ } else if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.SENDER) {
+ connectSender(queue, queue.getAddress().toString(), null, null, null,
null, null, null);
+ } else if (connectionElement.getType() ==
AMQPBrokerConnectionAddressType.RECEIVER) {
+ connectReceiver(protonRemotingConnection, session, sessionContext,
queue);
}
}
@@ -644,34 +644,105 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
}
protonRemotingConnection.getAmqpConnection().runLater(() -> {
-
- if (receivers.contains(queue)) {
+ if (!receivers.add(queue)) {
logger.debug("Receiver for queue {} already exists, just giving
up", queue);
return;
}
- receivers.add(queue);
- Receiver receiver = session.receiver(queue.getAddress().toString() +
":" + UUIDGenerator.getInstance().generateStringUUID());
- receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
- Target target = new Target();
- target.setAddress(queue.getAddress().toString());
- receiver.setTarget(target);
-
- Source source = new Source();
- source.setAddress(queue.getAddress().toString());
- receiver.setSource(source);
-
- if (capabilities != null) {
- source.setCapabilities(capabilities);
- }
- receiver.open();
- protonRemotingConnection.getAmqpConnection().flush();
try {
- sessionContext.addReceiver(receiver);
+ final String linkName = queue.getAddress().toString() + ":" +
UUIDGenerator.getInstance().generateStringUUID();
+ final Receiver receiver = session.receiver(linkName);
+ final String queueAddress = queue.getAddress().toString();
+
+ final Target target = new Target();
+ target.setAddress(queueAddress);
+ final Source source = new Source();
+ source.setAddress(queueAddress);
+ if (capabilities != null) {
+ source.setCapabilities(capabilities);
+ }
+
+ receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ receiver.setTarget(target);
+ receiver.setSource(source);
+ receiver.open();
+
+ final ScheduledFuture<?> openTimeoutTask;
+ final AtomicBoolean openTimedOut = new AtomicBoolean(false);
+
+ if (getConnectionTimeout() > 0) {
+ openTimeoutTask = server.getScheduledPool().schedule(() -> {
+ openTimedOut.set(true);
+
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(),
lastRetryCounter);
+ }, getConnectionTimeout(), TimeUnit.MILLISECONDS);
+ } else {
+ openTimeoutTask = null;
+ }
+
+ // Await the remote attach before creating the broker receiver in
order to impose a timeout
+ // on the attach response and then try and create the local server
receiver context and finish
+ // the wiring.
+ receiver.attachments().set(AMQP_LINK_INITIALIZER_KEY,
Runnable.class, () -> {
+ try {
+ if (openTimeoutTask != null) {
+ openTimeoutTask.cancel(false);
+ }
+
+ if (openTimedOut.get()) {
+ return; // Timed out before remote attach arrived
+ }
+
+ if (receiver.getRemoteSource() != null) {
+ logger.trace("AMQP Broker Connection Receiver {}
completed open", linkName);
+ } else {
+ logger.debug("AMQP Broker Connection Receiver {} rejected
by remote", linkName);
+
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.receiverLinkRefused(queueAddress),
lastRetryCounter);
+ return;
+ }
+
+ sessionContext.addReceiver(receiver, (r, s) -> {
+ // Returns a customized server receiver context that will
respect the locally initiated state
+ // when the receiver is initialized vs the remotely sent
target as we want to ensure we attach
+ // the receiver to the address we set in our local state.
+ return new
ProtonServerReceiverContext(sessionContext.getSessionSPI(),
+
sessionContext.getAMQPConnectionContext(),
+ sessionContext,
receiver) {
+
+ @Override
+ public void initialize() throws Exception {
+ initialized = true;
+ address = SimpleString.of(target.getAddress());
+ defRoutingType =
getRoutingType(target.getCapabilities(), address);
+
+ try {
+ // Check if the queue that triggered the attach
still exists or has it been removed
+ // before the attach response arrived from the
remote peer.
+ if (!sessionSPI.queueQuery(queue.getName(),
queue.getRoutingType(), false).isExists()) {
+ throw
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(address.toString());
+ }
+ } catch (ActiveMQAMQPException e) {
+ receivers.remove(queue);
+ throw e;
+ } catch (Exception e) {
+ logger.debug(e.getMessage(), e);
+ receivers.remove(queue);
+ throw new
ActiveMQAMQPInternalErrorException(e.getMessage(), e);
+ }
+
+ flow();
+ }
+ };
+ });
+ } catch (Exception e) {
+ error(e);
+ }
+ });
} catch (Exception e) {
error(e);
}
+
+ protonRemotingConnection.getAmqpConnection().flush();
});
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
index 64ca31ed82..68be6c3278 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java
@@ -115,4 +115,6 @@ public interface ActiveMQAMQPProtocolMessageBundle {
@Message(id = 119028, value = "Malformed Federation event message: {}")
ActiveMQException malformedFederationEventMessage(String message);
+ @Message(id = 119029, value = "Receiver link refused for address {}")
+ ActiveMQAMQPIllegalStateException receiverLinkRefused(String address);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java
new file mode 100644
index 0000000000..199cf48d06
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBrokerConnectionReceiverTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.qpid.proton.amqp.transport.LinkError;
+import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the Receiver functionality on AMQP broker connections
+ */
+@Timeout(20)
+public class AMQPBrokerConnectionReceiverTest extends AmqpClientTestSupport {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,CORE";
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ // Creates the broker used to make the outgoing connection. The port
passed is for
+ // that brokers acceptor. The test server connected to by the broker
binds to a random port.
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Test
+ public void testBrokerConnectionCreatesReceiverOnRemote() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respondInKind();
+ peer.expectFlow();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBrokerConnectionElement element = new
AMQPBrokerConnectionElement();
+ element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+ element.setName(getTestName());
+ element.setMatchAddress("test");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);
+ amqpConnection.addElement(element);
+ amqpConnection.setAutostart(true);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.createQueue(new
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("test")
+
.setAutoCreated(false));
+
+ peer.waitForScriptToComplete();
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+ peer.close();
+ }
+ }
+
+ @Test
+ public void testIncomingMessageWithNoToFieldArrivesOnConfiguredAddress()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBrokerConnectionElement element = new
AMQPBrokerConnectionElement();
+ element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+ element.setName(getTestName());
+ element.setMatchAddress("test");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);
+ amqpConnection.addElement(element);
+ amqpConnection.setAutostart(true);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
peer.expectAttach().ofReceiver().withSource().withAddress("test").and()
+
.withTarget().withAddress("test").and()
+ .respondInKind();
+ peer.expectFlow();
+ peer.remoteTransfer().withDeliveryId(1)
+ .withBody().withString("test-body").also()
+ .queue();
+ peer.expectDisposition().withState().accepted();
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic("test");
+ final MessageConsumer consumer = session.createConsumer(topic);
+
+ connection.start();
+
+ final Message received = consumer.receive(5_000);
+ assertNotNull(received);
+
+ consumer.close();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+ peer.close();
+ }
+ }
+
+ @Test
+ public void testIncomingMessageWithToFieldArrivesOnConfiguredAddress()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBrokerConnectionElement element = new
AMQPBrokerConnectionElement();
+ element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+ element.setName(getTestName());
+ element.setMatchAddress("test");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);
+ amqpConnection.addElement(element);
+ amqpConnection.setAutostart(true);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
peer.expectAttach().ofReceiver().withSource().withAddress("test").and()
+
.withTarget().withAddress("test").and()
+ .respondInKind();
+ peer.expectFlow();
+ peer.remoteTransfer().withDeliveryId(1)
+
.withProperties().withTo("should-not-be-used").also()
+ .withBody().withString("test-body").also()
+ .queue();
+ peer.expectDisposition().withState().accepted();
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic topic = session.createTopic("test");
+ final MessageConsumer consumer = session.createConsumer(topic);
+
+ connection.start();
+
+ final Message received = consumer.receive(5_000);
+ assertNotNull(received);
+
+ consumer.close();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+ peer.close();
+ }
+ }
+
+ @Test
+ public void testBrokerConnectionRetriesReceiverOnRemoteIfAttachRejected()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().reject(true,
LinkError.DETACH_FORCED.toString(), "Attach refused");
+ peer.expectDetach().optional();
+ peer.expectClose().optional();
+ peer.expectConnectionToDrop();
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respondInKind();
+ peer.expectFlow();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBrokerConnectionElement element = new
AMQPBrokerConnectionElement();
+ element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+ element.setName(getTestName());
+ element.setMatchAddress("test");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(10);
+ amqpConnection.setRetryInterval(100);
+ amqpConnection.addElement(element);
+ amqpConnection.setAutostart(true);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.createQueue(new
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("test")
+
.setAutoCreated(false));
+
+ peer.waitForScriptToComplete();
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+ peer.close();
+ }
+ }
+
+ @Test
+ public void
testBrokerConnectionRetriesReceiverOnRemoteIfTargetQueueRemovedAndLaterAddedBack()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.execute(() -> {
+ try {
+ server.createQueue(new
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+
.setAddress("test")
+
.setAutoCreated(false));
+ } catch (Exception e) {
+ LOG.warn("Error on creating server address and queue: ", e);
+ }
+ }).queue();
+ peer.expectAttach().ofReceiver();
+ peer.execute(() -> {
+ try {
+ server.removeAddressInfo(SimpleString.of("test"), null, true);
+ } catch (Exception e) {
+ LOG.warn("Error on removing server address and queue: ", e);
+ }
+ peer.respondToLastAttach().now();
+ }).queue();
+ peer.expectDetach().respond();
+ peer.execute(() -> {
+ try {
+ server.createQueue(new
QueueConfiguration("test").setRoutingType(RoutingType.ANYCAST)
+
.setAddress("test")
+
.setAutoCreated(false));
+ } catch (Exception e) {
+ LOG.warn("Error on creating server address and queue: ", e);
+ }
+ }).queue();
+ peer.expectAttach().ofReceiver().respondInKind();
+ peer.expectFlow();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPBrokerConnectionElement element = new
AMQPBrokerConnectionElement();
+ element.setType(AMQPBrokerConnectionAddressType.RECEIVER);
+ element.setName(getTestName());
+ element.setMatchAddress("test");
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(10);
+ amqpConnection.setRetryInterval(50);
+ amqpConnection.addElement(element);
+ amqpConnection.setAutostart(true);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose();
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete();
+ peer.close();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact