[ 
https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=505782&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-505782
 ]

ASF GitHub Bot logged work on ARTEMIS-2937:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Oct/20 16:06
            Start Date: 28/Oct/20 16:06
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on a change in pull request #3316:
URL: https://github.com/apache/activemq-artemis/pull/3316#discussion_r513399360



##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -20,39 +20,82 @@
 import java.util.Comparator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.function.ToLongFunction;
+
+import io.netty.util.collection.LongObjectHashMap;
 
 /**
  * A linked list implementation which allows multiple iterators to exist at 
the same time on the queue, and which see any
  * elements added or removed from the queue either directly or via iterators.
- *
+ * <p>
  * This class is not thread safe.
  */
 public class LinkedListImpl<E> implements LinkedList<E> {
 
    private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
 
    private final Node<E> head = new NodeHolder<>(null);
-
+   private final Comparator<E> comparator;
+   LongObjectHashMap<Node<E>> nodeMap;
    private Node<E> tail = null;
-
    private int size;
-
    // We store in an array rather than a Map for the best performance
    private volatile Iterator[] iters;
-
    private int numIters;
-
    private int nextIndex;
-
-   private final Comparator<E> comparator;
+   private ToLongFunction<E> idSupplier;
 
    public LinkedListImpl() {
-      this(null);
+      this(null, null);
    }
 
    public LinkedListImpl(Comparator<E> comparator) {
+      this(comparator, null);
+   }
+
+   public LinkedListImpl(Comparator<E> comparator, ToLongFunction<E> supplier) 
{
       iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
       this.comparator = comparator;
+      this.idSupplier = supplier;
+      if (idSupplier != null) {
+         this.nodeMap = newLongHashMap();
+      } else {
+         this.nodeMap = null;
+      }
+   }
+
+   @Override
+   public void clearID() {
+      idSupplier = null;
+      if (nodeMap != null) {
+         nodeMap.clear();
+         nodeMap = null;
+      }
+   }

Review comment:
       This isnt used, tested, or similarly included on the PriorityLinkedList.
   
   Should it just be removed for now, or added to the PriorityLinkedList and 
tested?

##########
File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
##########
@@ -72,9 +115,43 @@ public void addHead(E e) {
          node.next.prev = node;
       }
 
+      itemAdded(node, e);
+
       size++;
    }
 
+   @Override
+   public E removeWithID(long id) {
+      if (nodeMap == null) {
+         return null;
+      }
+
+      Node<E> node = nodeMap.get(id);
+      if (node == null) {
+         return null;
+      }
+
+      if (node.prev != null) {
+         removeAfter(node.prev);
+      }

Review comment:
       Since there appears to be a fixed 'head' node that always exists, and 
doesnt seem to ever be in the nodeMap, what does it mean if node.prev is null 
here?
   
   I ask as it seems odd that removal with id could return a value as in this 
case, but appear not to do anything at all to update the main list structure? 
When can that happen? (I may be missing something obvious here..)

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.BrokerConnection;
+import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import 
org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.jboss.logging.Logger;
+
+public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection {
+
+   private static final Logger logger = 
Logger.getLogger(AMQPBrokerConnection.class);
+
+   private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
+   private final ProtonProtocolManager protonProtocolManager;
+   private final ActiveMQServer server;
+   private final NettyConnector bridgesConnector;
+   private NettyConnection connection;
+   private Session session;
+   private AMQPSessionContext sessionContext;
+   private ActiveMQProtonRemotingConnection protonRemotingConnection;
+   private volatile boolean started = false;
+   private final AMQPBrokerConnectionManager bridgeManager;
+   private int retryCounter = 0;
+   private volatile ScheduledFuture reconnectFuture;
+   private Set<Queue> senders = new HashSet<>();
+   private Set<Queue> receivers = new HashSet<>();
+
+   final Executor connectExecutor;
+   final ScheduledExecutorService scheduledExecutorService;
+
+   /** This is just for logging.
+    *  the actual connection will come from the amqpConnection configuration*/
+   String host;
+
+   /** This is just for logging.
+    *  the actual connection will come from the amqpConnection configuration*/
+   int port;
+
+   public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager, 
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
+                               ProtonProtocolManager protonProtocolManager,
+                               ActiveMQServer server,
+                               NettyConnector bridgesConnector) {
+      this.bridgeManager = bridgeManager;
+      this.brokerConnectConfiguration = brokerConnectConfiguration;
+      this.protonProtocolManager = protonProtocolManager;
+      this.server = server;
+      this.bridgesConnector = bridgesConnector;
+      connectExecutor = server.getExecutorFactory().getExecutor();
+      scheduledExecutorService = server.getScheduledPool();
+   }
+
+   @Override
+   public String getName() {
+      return brokerConnectConfiguration.getName();
+   }
+
+   @Override
+   public String getProtocol() {
+      return "AMQP";
+   }
+
+   @Override
+   public boolean isStarted() {
+      return started;
+   }
+
+   @Override
+   public void stop() {
+      if (connection != null) {
+         connection.close();
+      }
+      ScheduledFuture scheduledFuture = reconnectFuture;
+      reconnectFuture = null;
+      if (scheduledFuture != null) {
+         scheduledFuture.cancel(true);
+      }
+      started = false;
+   }
+
+   @Override
+   public void start() throws Exception {
+      started = true;
+      server.getConfiguration().registerBrokerPlugin(this);
+      try {
+
+         for (AMQPBrokerConnectionElement connectionElement : 
brokerConnectConfiguration.getConnectionElements()) {
+            if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.MIRROR) {
+               
installMirrorController((AMQPMirrorBrokerConnectionElement)connectionElement, 
server);
+            }
+         }
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         return;
+      }
+      connectExecutor.execute(() -> doConnect());
+   }
+
+   public NettyConnection getConnection() {
+      return connection;
+   }
+
+   @Override
+   public void afterCreateQueue(Queue queue) {
+      connectExecutor.execute(() -> {
+         for (AMQPBrokerConnectionElement connectionElement : 
brokerConnectConfiguration.getConnectionElements()) {
+            validateMatching(queue, connectionElement);
+         }
+      });
+   }
+
+   public void validateMatching(Queue queue, AMQPBrokerConnectionElement 
connectionElement) {
+      if (connectionElement.getType() != 
AMQPBrokerConnectionAddressType.MIRROR) {
+         if (connectionElement.getQueueName() != null) {
+            if (queue.getName().equals(connectionElement.getQueueName())) {
+               createLink(queue, connectionElement);
+            }
+         } else if (connectionElement.match(queue.getAddress(), 
server.getConfiguration().getWildcardConfiguration())) {
+            createLink(queue, connectionElement);
+         }
+      }
+   }
+
+   public void createLink(Queue queue, AMQPBrokerConnectionElement 
connectionElement) {
+      if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) 
{
+         connectSender(queue, queue.getAddress().toString(), 
Symbol.valueOf("qd.waypoint"));
+         connectReceiver(protonRemotingConnection, session, sessionContext, 
queue, Symbol.valueOf("qd.waypoint"));
+      } else {
+         if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.SENDER) {
+            connectSender(queue, queue.getAddress().toString());
+         }
+         if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.RECEIVER) {
+            connectReceiver(protonRemotingConnection, session, sessionContext, 
queue);
+         }
+      }
+   }
+
+   private void doConnect() {
+      try {
+         List<TransportConfiguration> configurationList = 
brokerConnectConfiguration.getTransportConfigurations();
+
+         TransportConfiguration tpConfig = configurationList.get(0);
+
+         String hostOnParameter = 
ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, 
TransportConstants.DEFAULT_HOST, tpConfig.getParams());
+         int portOnParameter = 
ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 
TransportConstants.DEFAULT_PORT, tpConfig.getParams());
+         this.host = hostOnParameter;
+         this.port = portOnParameter;
+         connection = bridgesConnector.createConnection(null, hostOnParameter, 
portOnParameter);
+
+         if (connection == null) {
+            retryConnection();
+            return;
+         }
+
+         reconnectFuture = null;
+         retryCounter = 0;
+
+         // before we retry the connection we need to remove any previous links
+         // as they will need to be recreated
+         senders.clear();
+         receivers.clear();
+
+         ClientSASLFactory saslFactory = null;
+
+         if (brokerConnectConfiguration.getUser() != null && 
brokerConnectConfiguration.getPassword() != null) {
+            saslFactory = availableMechanims -> {
+               if (availableMechanims != null && 
Arrays.asList(availableMechanims).contains("PLAIN")) {
+                  return new 
PlainSASLMechanism(brokerConnectConfiguration.getUser(), 
brokerConnectConfiguration.getPassword());
+               } else {
+                  return null;
+               }
+            };
+         }
+
+         ConnectionEntry entry = 
protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
+         protonRemotingConnection = (ActiveMQProtonRemotingConnection) 
entry.connection;
+         connection.getChannel().pipeline().addLast(new 
AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), 
protonRemotingConnection.getAmqpConnection().getHandler()));
+
+         protonRemotingConnection.getAmqpConnection().runLater(() -> {
+            protonRemotingConnection.getAmqpConnection().open();
+            protonRemotingConnection.getAmqpConnection().flush();
+         });
+
+         session = 
protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
+         sessionContext = 
protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
+         protonRemotingConnection.getAmqpConnection().runLater(() -> {
+            session.open();
+            protonRemotingConnection.getAmqpConnection().flush();
+         });
+
+         if (brokerConnectConfiguration.getConnectionElements() != null) {
+            Stream<Binding> bindingStream = 
server.getPostOffice().getAllBindings();
+
+            bindingStream.forEach(binding -> {
+               if (binding instanceof QueueBinding) {
+                  Queue queue = ((QueueBinding) binding).getQueue();
+                  for (AMQPBrokerConnectionElement connectionElement : 
brokerConnectConfiguration.getConnectionElements()) {
+                     validateMatching(queue, connectionElement);
+                  }
+               }
+            });
+
+            for (AMQPBrokerConnectionElement connectionElement : 
brokerConnectConfiguration.getConnectionElements()) {
+               if (connectionElement.getType() == 
AMQPBrokerConnectionAddressType.MIRROR) {
+                  AMQPMirrorBrokerConnectionElement replica = 
(AMQPMirrorBrokerConnectionElement)connectionElement;
+                  Queue queue = 
server.locateQueue(replica.getSourceMirrorAddress());
+
+                  connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS);
+               }
+            }
+         }
+
+         protonRemotingConnection.getAmqpConnection().flush();

Review comment:
       This section appears like it probably isnt thread safe in its use of 
proton-j.
   
   The initial connection open and a flush is offloaded to the connection 
thread...then this thread proceeds to create a session object on the 
connection, though then offloads its open and flush and open call back to the 
connection thread. The next bit where it starts creating senders and recievers 
on this thread (if not more, depends whether the Stream returned was sequential 
or not) is probably safe as those bits offload to the connection thread I 
believe.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
##########
@@ -129,7 +156,7 @@ public AMQPConnectionContext(ProtonProtocolManager 
protocolManager,
       } else {
          nettyExecutor = new 
ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor());
       }
-      this.handler = new ProtonHandler(nettyExecutor, 
protocolManager.getServer().getExecutorFactory().getExecutor(), 
isIncomingConnection);
+      this.handler = new ProtonHandler(nettyExecutor, 
protocolManager.getServer().getExecutorFactory().getExecutor(), 
isIncomingConnection && saslClientFactory == null);

Review comment:
       The arg modified is called "isServer", and this says it may now change 
depending on whether saslClientFactory is null - yet the new bridge code allows 
it to be both null or not, depending on whether a user/pass is given. This 
seems strange.

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:
+
+- `match`: Match the sender or receiver to a specific address or __set__ of 
addresses, using a wildcard expression
+- `queue-name`: Configure the sender or receiver for a specific queue
+
+
+Some examples are shown below.
+
+Using address expressions:
+```xml
+<broker-connections>
+        <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+                <sender match="queues.#"/>
+                <!-- notice the local queues for remotequeues.# need to be 
created on this broker -->
+                <receiver match="remotequeues.#"/>
+        </amqp-connection>
+</broker-connections>
+
+<addresses>
+        <address name="remotequeues.A">
+                <anycast>
+                        <queue name="remoteQueueA"/>
+                </anycast>
+        </address>
+        <address name="queues.B">
+                 <anycast>
+                        <queue name="localQueueB"/>
+                </anycast>
+        </address>
+</addresses>
+```
+
+Using queue names:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+        <receiver queue-name="remoteQueueA"/>
+        <sender queue-name="localQueueB"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="remotequeues.A">
+        <anycast>
+           <queue name="remoteQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+        <anycast>
+           <queue name="localQueueB"/>
+        </anycast>
+     </address>
+</addresses>
+
+```
+*Important*: You can match a receiver only to a local queue that already 
exists. Therefore, if you are using receivers, make sure that you pre-create 
the queue locally. Otherwise, the broker cannot match the remote queues and 
addresses.
+
+*Important*: Do not create a sender and a receiver to the same destination. 
This creates an infinite loop of sends and receives.
+
+
+# Peers
+A peer broker connection element is a combination of sender and receivers. The 
ActiveMQ Artemis broker creates both a sender and a receiver for a peer 
element, and the endpoint knows how to deal with the pair without creating an 
infinite loop of sending and receiving messages.
+
+Currently, [Apache Qpid Dispatch 
Router](https://qpid.apache.org/components/dispatch-router/index.html) is a 
peer. ActiveMQ Artemis creates the pair of receivers and sender for each 
matching destination. These senders and receivers have special configuration to 
let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
+
+You can experiment with advanced networking scenarios with Qpid Dispatch 
Router and get a lot of benefit from the AMQP protocol and its ecosystem.
+
+With a peer, you have the same properties that you have on a sender and 
receiver. For example:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+       <peer match="queues.#"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="queues.A">
+        <anycast>
+           <queue name="localQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+     <anycast>
+        <queue name="localQueueB"/>
+     </anycast>
+    </address>
+</addresses>

Review comment:
       I think a simpler example of matching queue+address names would be a 
better starting point for most people. Regardless, an example with differing 
names should outline what its actually expected its going to do. It isnt at all 
clear here whether the waypointed address used by clients connecting to the 
router is meant to be the broker address name or the broker queue name.

##########
File path: examples/features/broker-connection/amqp-receiving-messages/pom.xml
##########
@@ -0,0 +1,165 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq.examples.broker-connection</groupId>
+      <artifactId>broker-connections</artifactId>
+      <version>2.16.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>amqp-receiving-messages</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
+
+   <properties>
+      <activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-jms-client</artifactId>
+         <version>${qpid.jms.version}</version>
+      </dependency>
+   </dependencies>
+
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>artemis-maven-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>create0</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <instance>${basedir}/target/server0</instance>
+                     <allowAnonymous>true</allowAnonymous>
+                     
<configuration>${basedir}/target/classes/activemq/server0</configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>create1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <instance>${basedir}/target/server1</instance>
+                     <allowAnonymous>true</allowAnonymous>
+                     
<configuration>${basedir}/target/classes/activemq/server1</configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
+                  </configuration>
+               </execution>
+               <!-- we first start broker 1, to avoid reconnecting statements 
-->
+               <execution>
+                  <id>start1</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <spawn>true</spawn>
+                     <location>${basedir}/target/server1</location>
+                     <testURI>tcp://localhost:5672</testURI>
+                     <args>
+                        <param>run</param>
+                     </args>
+                     <name>server1</name>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>start0</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <spawn>true</spawn>
+                     <ignore>${noServer}</ignore>
+                     <location>${basedir}/target/server0</location>
+                     <testURI>tcp://localhost:5671</testURI>

Review comment:
       I'd avoid using the AMQPS port for non-TLS examples.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
##########
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.BrokerConnection;
+import org.apache.activemq.artemis.core.server.Consumer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
+import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
+import 
org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
+import org.apache.activemq.artemis.protocol.amqp.proton.SenderController;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
+import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import 
org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
+import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.jboss.logging.Logger;
+
+public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection {
+
+   private static final Logger logger = 
Logger.getLogger(AMQPBrokerConnection.class);
+
+   private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
+   private final ProtonProtocolManager protonProtocolManager;
+   private final ActiveMQServer server;
+   private final NettyConnector bridgesConnector;
+   private NettyConnection connection;
+   private Session session;
+   private AMQPSessionContext sessionContext;
+   private ActiveMQProtonRemotingConnection protonRemotingConnection;
+   private volatile boolean started = false;
+   private final AMQPBrokerConnectionManager bridgeManager;
+   private int retryCounter = 0;
+   private volatile ScheduledFuture reconnectFuture;
+   private Set<Queue> senders = new HashSet<>();
+   private Set<Queue> receivers = new HashSet<>();
+
+   final Executor connectExecutor;
+   final ScheduledExecutorService scheduledExecutorService;
+
+   /** This is just for logging.
+    *  the actual connection will come from the amqpConnection configuration*/
+   String host;
+
+   /** This is just for logging.
+    *  the actual connection will come from the amqpConnection configuration*/
+   int port;
+
+   public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager, 
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
+                               ProtonProtocolManager protonProtocolManager,
+                               ActiveMQServer server,
+                               NettyConnector bridgesConnector) {
+      this.bridgeManager = bridgeManager;
+      this.brokerConnectConfiguration = brokerConnectConfiguration;
+      this.protonProtocolManager = protonProtocolManager;
+      this.server = server;
+      this.bridgesConnector = bridgesConnector;
+      connectExecutor = server.getExecutorFactory().getExecutor();
+      scheduledExecutorService = server.getScheduledPool();
+   }
+
+   @Override
+   public String getName() {
+      return brokerConnectConfiguration.getName();
+   }
+
+   @Override
+   public String getProtocol() {
+      return "AMQP";
+   }
+
+   @Override
+   public boolean isStarted() {
+      return started;
+   }
+
+   @Override
+   public void stop() {
+      if (connection != null) {
+         connection.close();
+      }

Review comment:
       If you stop a connection with mirror config that doesnt specify a source 
mirror address, what happens to the generated mirror queue and its contents?

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
##########
@@ -466,8 +466,10 @@ public void serverSend(ProtonServerReceiverContext context,
 
       RoutingType routingType = null;
       if (address != null) {
-         // Fixed-address producer
-         message.setAddress(address);
+         if (!address.toString().equals(message.getAddress())) {
+            // set Fixed-address producer if the message.properties.to address 
differs from the producer
+            message.setAddress(address);
+         }

Review comment:
       Would read better if the "// Fixed-address producer" bit was outside the 
inner if, where it was originally, to balance with the comment in the other leg 
( // Anonymous-relay producer, message must carry a To value)

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
##########
@@ -173,8 +177,35 @@ public ProtonProtocolManager setDirectDeliver(boolean 
directDeliver) {
       return this;
    }
 
+   /** for outgoing */
+   public ProtonClientProtocolManager createClientManager() {
+      ProtonClientProtocolManager clientOutgoing = new 
ProtonClientProtocolManager(factory, server);
+      return clientOutgoing;
+   }
+
    @Override
    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, 
Connection remotingConnection) {
+      return internalConnectionEntry(remotingConnection, false, null);
+   }
+
+   /** This method is not part of the ProtocolManager interface because it 
only makes sense on AMQP.
+    *  More specifically on AMQP Bridges */
+   public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection) {
+      return internalConnectionEntry(remotingConnection, true, null);
+   }
+
+   public ConnectionEntry createOutgoingConnectionEntry(Connection 
remotingConnection, ClientSASLFactory saslFactory) {
+      return internalConnectionEntry(remotingConnection, true, saslFactory);
+   }
+
+   /**
+    * AMQP is an agnostic protocol, client and server.
+    * This method is used also by the AMQPConenctionBridge where there is no 
acceptor in place.

Review comment:
       typo in AMQPConenctionBridge
   
   EDIT: Actually, the comment may be in the wrong place? This is a provate 
method so it cant directly be used by AMQPConnectionBridge

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:
+
+- `match`: Match the sender or receiver to a specific address or __set__ of 
addresses, using a wildcard expression
+- `queue-name`: Configure the sender or receiver for a specific queue
+
+
+Some examples are shown below.
+
+Using address expressions:
+```xml
+<broker-connections>
+        <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+                <sender match="queues.#"/>
+                <!-- notice the local queues for remotequeues.# need to be 
created on this broker -->
+                <receiver match="remotequeues.#"/>
+        </amqp-connection>
+</broker-connections>
+
+<addresses>
+        <address name="remotequeues.A">
+                <anycast>
+                        <queue name="remoteQueueA"/>
+                </anycast>
+        </address>
+        <address name="queues.B">
+                 <anycast>
+                        <queue name="localQueueB"/>
+                </anycast>
+        </address>
+</addresses>
+```
+
+Using queue names:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+        <receiver queue-name="remoteQueueA"/>
+        <sender queue-name="localQueueB"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="remotequeues.A">
+        <anycast>
+           <queue name="remoteQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+        <anycast>
+           <queue name="localQueueB"/>
+        </anycast>
+     </address>
+</addresses>

Review comment:
       Differing indentation levels. See also comment about clarifying what 
actually ends up happening, the *important* note below still doesnt really 
cover it. Maybe some diagrams / ascii art might.
   
   A simplified example using the same queue and address names might be a 
better start.

##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
##########
@@ -362,6 +362,7 @@ public PagingStore getOwner() {
 
    @Override
    public void setOwner(PagingStore owner) {
+      new Exception("Setting owner as " + 
owner.getStoreName()).printStackTrace();

Review comment:
       Debug stacktrace left over

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:
+
+- `match`: Match the sender or receiver to a specific address or __set__ of 
addresses, using a wildcard expression
+- `queue-name`: Configure the sender or receiver for a specific queue
+
+
+Some examples are shown below.
+
+Using address expressions:
+```xml
+<broker-connections>
+        <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+                <sender match="queues.#"/>
+                <!-- notice the local queues for remotequeues.# need to be 
created on this broker -->
+                <receiver match="remotequeues.#"/>
+        </amqp-connection>
+</broker-connections>
+
+<addresses>
+        <address name="remotequeues.A">
+                <anycast>
+                        <queue name="remoteQueueA"/>
+                </anycast>
+        </address>
+        <address name="queues.B">
+                 <anycast>
+                        <queue name="localQueueB"/>
+                </anycast>
+        </address>
+</addresses>
+```
+
+Using queue names:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+        <receiver queue-name="remoteQueueA"/>
+        <sender queue-name="localQueueB"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="remotequeues.A">
+        <anycast>
+           <queue name="remoteQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+        <anycast>
+           <queue name="localQueueB"/>
+        </anycast>
+     </address>
+</addresses>
+
+```
+*Important*: You can match a receiver only to a local queue that already 
exists. Therefore, if you are using receivers, make sure that you pre-create 
the queue locally. Otherwise, the broker cannot match the remote queues and 
addresses.
+
+*Important*: Do not create a sender and a receiver to the same destination. 
This creates an infinite loop of sends and receives.
+
+
+# Peers
+A peer broker connection element is a combination of sender and receivers. The 
ActiveMQ Artemis broker creates both a sender and a receiver for a peer 
element, and the endpoint knows how to deal with the pair without creating an 
infinite loop of sending and receiving messages.
+
+Currently, [Apache Qpid Dispatch 
Router](https://qpid.apache.org/components/dispatch-router/index.html) is a 
peer. ActiveMQ Artemis creates the pair of receivers and sender for each 
matching destination. These senders and receivers have special configuration to 
let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.

Review comment:
       Not unless you tell the router to, which this section doesnt currently 
clarify in any way. Its not clear from this what the feature even does. I dont 
think anyone reading this without fairly deep implementation knoweldge would be 
able to make this feature work using the information currently here.
   
   This section should mention that it configures the broker to connect to the 
router and make itself available for storing messsages on a waypoint address, 
such that the messages sent to the router on the address are message-rotued 
to/from the broker. There needs to be an example of the address configuration 
required for Dispatch to treat addresse as a waypoint.

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.jboss.logging.Logger;
+
+public class AMQPMirrorControllerSource implements MirrorController, 
ActiveMQComponent {
+
+   private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerSource.class);
+
+   public static final Symbol EVENT_TYPE = 
Symbol.getSymbol("x-opt-amq-mr-ev-type");
+   public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr");
+   public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
+
+   // Events:
+   public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
+   public static final Symbol DELETE_ADDRESS = 
Symbol.getSymbol("deleteAddress");
+   public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue");
+   public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue");
+   public static final Symbol ADDRESS_SCAN_START = 
Symbol.getSymbol("AddressCanStart");
+   public static final Symbol ADDRESS_SCAN_END = 
Symbol.getSymbol("AddressScanEnd");

Review comment:
       Starts with Capital while all the others are camelCase

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:

Review comment:
       Why the asymmetry that you can 'match' on addresses, but only specify 
exact queues? Why arent they both for addresses, or both for queues?
   
   Its also unclear from the doc here - is the sender created then sending from 
the queue(s) matched by the address 'match', or is it sending from the address 
matched to the config? (Distinction being important if address and queue names 
differ, like in the examples shown later)

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:
+
+- `match`: Match the sender or receiver to a specific address or __set__ of 
addresses, using a wildcard expression
+- `queue-name`: Configure the sender or receiver for a specific queue
+
+
+Some examples are shown below.
+
+Using address expressions:
+```xml
+<broker-connections>
+        <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+                <sender match="queues.#"/>
+                <!-- notice the local queues for remotequeues.# need to be 
created on this broker -->
+                <receiver match="remotequeues.#"/>
+        </amqp-connection>
+</broker-connections>
+
+<addresses>
+        <address name="remotequeues.A">
+                <anycast>
+                        <queue name="remoteQueueA"/>
+                </anycast>
+        </address>
+        <address name="queues.B">
+                 <anycast>
+                        <queue name="localQueueB"/>
+                </anycast>
+        </address>
+</addresses>
+```
+
+Using queue names:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+        <receiver queue-name="remoteQueueA"/>
+        <sender queue-name="localQueueB"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="remotequeues.A">
+        <anycast>
+           <queue name="remoteQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+        <anycast>
+           <queue name="localQueueB"/>
+        </anycast>
+     </address>
+</addresses>
+
+```
+*Important*: You can match a receiver only to a local queue that already 
exists. Therefore, if you are using receivers, make sure that you pre-create 
the queue locally. Otherwise, the broker cannot match the remote queues and 
addresses.
+
+*Important*: Do not create a sender and a receiver to the same destination. 
This creates an infinite loop of sends and receives.
+
+
+# Peers
+A peer broker connection element is a combination of sender and receivers. The 
ActiveMQ Artemis broker creates both a sender and a receiver for a peer 
element, and the endpoint knows how to deal with the pair without creating an 
infinite loop of sending and receiving messages.
+
+Currently, [Apache Qpid Dispatch 
Router](https://qpid.apache.org/components/dispatch-router/index.html) is a 
peer. ActiveMQ Artemis creates the pair of receivers and sender for each 
matching destination. These senders and receivers have special configuration to 
let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
+
+You can experiment with advanced networking scenarios with Qpid Dispatch 
Router and get a lot of benefit from the AMQP protocol and its ecosystem.
+
+With a peer, you have the same properties that you have on a sender and 
receiver. For example:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">

Review comment:
       Perhaps "my-router" would be a better name?

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:
+
+- `match`: Match the sender or receiver to a specific address or __set__ of 
addresses, using a wildcard expression
+- `queue-name`: Configure the sender or receiver for a specific queue
+
+
+Some examples are shown below.
+
+Using address expressions:
+```xml
+<broker-connections>
+        <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+                <sender match="queues.#"/>
+                <!-- notice the local queues for remotequeues.# need to be 
created on this broker -->
+                <receiver match="remotequeues.#"/>
+        </amqp-connection>
+</broker-connections>
+
+<addresses>
+        <address name="remotequeues.A">
+                <anycast>
+                        <queue name="remoteQueueA"/>
+                </anycast>
+        </address>
+        <address name="queues.B">
+                 <anycast>
+                        <queue name="localQueueB"/>
+                </anycast>
+        </address>
+</addresses>
+```
+
+Using queue names:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+        <receiver queue-name="remoteQueueA"/>
+        <sender queue-name="localQueueB"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="remotequeues.A">
+        <anycast>
+           <queue name="remoteQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+        <anycast>
+           <queue name="localQueueB"/>
+        </anycast>
+     </address>
+</addresses>
+
+```
+*Important*: You can match a receiver only to a local queue that already 
exists. Therefore, if you are using receivers, make sure that you pre-create 
the queue locally. Otherwise, the broker cannot match the remote queues and 
addresses.
+
+*Important*: Do not create a sender and a receiver to the same destination. 
This creates an infinite loop of sends and receives.
+
+
+# Peers
+A peer broker connection element is a combination of sender and receivers. The 
ActiveMQ Artemis broker creates both a sender and a receiver for a peer 
element, and the endpoint knows how to deal with the pair without creating an 
infinite loop of sending and receiving messages.
+
+Currently, [Apache Qpid Dispatch 
Router](https://qpid.apache.org/components/dispatch-router/index.html) is a 
peer. ActiveMQ Artemis creates the pair of receivers and sender for each 
matching destination. These senders and receivers have special configuration to 
let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
+
+You can experiment with advanced networking scenarios with Qpid Dispatch 
Router and get a lot of benefit from the AMQP protocol and its ecosystem.
+
+With a peer, you have the same properties that you have on a sender and 
receiver. For example:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+       <peer match="queues.#"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="queues.A">
+        <anycast>
+           <queue name="localQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+     <anycast>
+        <queue name="localQueueB"/>
+     </anycast>
+    </address>
+</addresses>
+```
+
+*Important:* Do not use this feature to connect to another broker, otherwise 
any message sent will be immediately ready to consume creating an infinite echo 
of sends and receives.
+
+# Mirror 
+The mirror option on the broker connection can capture events from the broker 
and pass them over the wire to another broker. This enables you to capture 
multiple asynchronous replicas. The following types of events are captured:
+
+* Message routing
+* Message acknowledgement
+* Queue and address creation
+* queue and address deletion
+
+When you configure a mirror, these events are captured from the broker, stored 
on a local queue, and later forwarded to a target destination on another 
ActiveMQ Artemis broker.
+
+To configure a mirror, you add a `<mirror>` element within the 
`<amqp-connection>` element.
+
+The local queue is called `source-mirror-address`
+
+You can specify the following optional arguments.
+
+* `queue-removal`: Specifies whether a queue- or address-removal event is 
sent. The default value is `true`.
+* `message-acknowledgements`: Specifies whether message acknowledgements are 
sent. The default value is `true`.
+* `queue-creation`: Specifies whether a queue- or address-creation event is 
sent. The default value is `true`.
+* `source-mirror-address`: By default, the mirror creates a non-durable 
temporary queue to store messages before they are sent to the other broker. If 
you define a name value for this property, an ANYCAST durable queue and address 
is created with the specified name.
+
+An example of a mirror configuration is shown below:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+            <mirror  queue-removal="true" queue-creation="true" 
message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/>
+    </amqp-connection>
+</broker-connections>
+```
+
+## Catch up on Mirror
+The broker will not send past events over the mirror. As the broker sends and 
receives messages, only a natural catch up would eventually happen.

Review comment:
       Think this could be clarified, im not sure what it means by 'natural 
catchup'. Is it saying, only events occuring and messages arriving on a server 
after the mirror is started, will be be mirrored? That is, existing messages on 
a queue, present before a mirror is configured, will not be mirrored?

##########
File path: examples/features/broker-connection/amqp-sending-messages/pom.xml
##########
@@ -0,0 +1,165 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq.examples.broker-connection</groupId>
+      <artifactId>broker-connections</artifactId>
+      <version>2.16.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>amqp-sending-messages</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>

Review comment:
       Name needs updated

##########
File path: 
examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+/**
+ * This example is demonstrating how messages are transferred from one broker 
towards another broker
+ * through the sender element on a AMQP Broker Connection.
+ */
+public class BrokerConnectionSenderSSL {
+
+   public static void main(final String[] args) throws Exception {
+      Connection connectionOnServer0 = null;
+      ConnectionFactory connectionFactoryServer0 = new 
JmsConnectionFactory("amqp://localhost:5671");

Review comment:
       I know the target of the example is SSL between the brokers...but it 
might make more sense to use SSL for the clients too. Then you would only need 
one acceptor.
   
   This basically makes it seem like SSL isnt being used.

##########
File path: 
examples/features/broker-connection/amqp-receiving-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionReceiver.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.jms.example;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+
+/**
+ * This example demonstrates how sessions created from a single connection can 
be load
+ * balanced across the different nodes of the cluster.
+ * <p>
+ * In this example there are three nodes and we use a round-robin client side 
load-balancing
+ * policy.
+ */

Review comment:
       Stale comment

##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.server.ActiveMQComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.AckReason;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.server.mirror.MirrorController;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import 
org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.jboss.logging.Logger;
+
+public class AMQPMirrorControllerSource implements MirrorController, 
ActiveMQComponent {
+
+   private static final Logger logger = 
Logger.getLogger(AMQPMirrorControllerSource.class);
+
+   public static final Symbol EVENT_TYPE = 
Symbol.getSymbol("x-opt-amq-mr-ev-type");
+   public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr");
+   public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu");
+
+   // Events:
+   public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress");
+   public static final Symbol DELETE_ADDRESS = 
Symbol.getSymbol("deleteAddress");
+   public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue");
+   public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue");
+   public static final Symbol ADDRESS_SCAN_START = 
Symbol.getSymbol("AddressCanStart");
+   public static final Symbol ADDRESS_SCAN_END = 
Symbol.getSymbol("AddressScanEnd");
+   public static final Symbol POST_ACK = Symbol.getSymbol("postAck");
+
+   // Delivery annotation property used on mirror control routing and Ack
+   public static final Symbol INTERNAL_ID = 
Symbol.getSymbol("x-opt-amq-mr-id");
+   public static final Symbol INTERNAL_DESTINATION = 
Symbol.getSymbol("x-opt-amq-mr-dst");
+
+   private static final ThreadLocal<MirrorControlRouting> mirrorControlRouting 
= ThreadLocal.withInitial(() -> new MirrorControlRouting(null));
+
+   final Queue snfQueue;
+   final ActiveMQServer server;
+   final boolean acks;
+
+   boolean started;
+
+   @Override
+   public void start() throws Exception {
+   }
+
+   @Override
+   public void stop() throws Exception {
+   }
+
+   @Override
+   public boolean isStarted() {
+      return started;
+   }
+
+   public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, 
boolean acks) {
+      this.snfQueue = snfQueue;
+      this.server = server;
+      this.acks = acks;
+   }
+
+   @Override
+   public void startAddressScan() throws Exception {
+      Message message = createMessage(null, null, ADDRESS_SCAN_START, null);
+      route(server, message);
+   }
+
+   @Override
+   public void endAddressScan() throws Exception {
+      Message message = createMessage(null, null, ADDRESS_SCAN_END, null);
+      route(server, message);
+   }
+
+   @Override
+   public void addAddress(AddressInfo addressInfo) throws Exception {
+      Message message = createMessage(addressInfo.getName(), null, 
ADD_ADDRESS, addressInfo.toJSON());
+      route(server, message);
+   }
+
+   @Override
+   public void deleteAddress(AddressInfo addressInfo) throws Exception {
+      Message message = createMessage(addressInfo.getName(), null, 
DELETE_ADDRESS, addressInfo.toJSON());
+      route(server, message);
+   }
+
+   @Override
+   public void createQueue(QueueConfiguration queueConfiguration) throws 
Exception {
+      Message message = createMessage(queueConfiguration.getAddress(), 
queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON());
+      route(server, message);
+   }
+
+   @Override
+   public void deleteQueue(SimpleString address, SimpleString queue) throws 
Exception {
+      Message message = createMessage(address, queue, DELETE_QUEUE, 
queue.toString());
+      route(server, message);
+   }
+
+   @Override
+   public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
+
+      try {
+         context.setReusable(false);
+         PagingStore storeOwner = null;
+         if (refs.size() > 0) {
+            storeOwner = refs.get(0).getOwner();
+         }
+         if (storeOwner != null && 
!storeOwner.getAddress().equals(message.getAddressSimpleString())) {
+            storeOwner = 
server.getPagingManager().getPageStore(message.getAddressSimpleString());
+         }
+         MessageReference ref = 
MessageReference.Factory.createReference(message, snfQueue, storeOwner);
+
+         snfQueue.refUp(ref);
+
+         Map<Symbol, Object> symbolObjectMap = new HashMap<>();

Review comment:
       "daMap" might be more descriptive, help readability

##########
File path: examples/features/broker-connection/amqp-sending-messages/pom.xml
##########
@@ -0,0 +1,165 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq.examples.broker-connection</groupId>
+      <artifactId>broker-connections</artifactId>
+      <version>2.16.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>amqp-sending-messages</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
+
+   <properties>
+      <activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-jms-client</artifactId>
+         <version>${qpid.jms.version}</version>
+      </dependency>
+   </dependencies>
+
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>artemis-maven-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>create0</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <instance>${basedir}/target/server0</instance>
+                     <allowAnonymous>true</allowAnonymous>
+                     
<configuration>${basedir}/target/classes/activemq/server0</configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>create1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <instance>${basedir}/target/server1</instance>
+                     <allowAnonymous>true</allowAnonymous>
+                     
<configuration>${basedir}/target/classes/activemq/server1</configuration>
+                     <!-- this makes it easier in certain envs -->
+                     <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
+                  </configuration>
+               </execution>
+               <!-- we first start broker 1, to avoid reconnecting statements 
-->
+               <execution>
+                  <id>start1</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <ignore>${noServer}</ignore>
+                     <spawn>true</spawn>
+                     <location>${basedir}/target/server1</location>
+                     <testURI>tcp://localhost:5672</testURI>
+                     <args>
+                        <param>run</param>
+                     </args>
+                     <name>server1</name>
+                  </configuration>
+               </execution>
+               <execution>
+                  <id>start0</id>
+                  <goals>
+                     <goal>cli</goal>
+                  </goals>
+                  <configuration>
+                     <spawn>true</spawn>
+                     <ignore>${noServer}</ignore>
+                     <location>${basedir}/target/server0</location>
+                     <testURI>tcp://localhost:5671</testURI>

Review comment:
       I'd avoid using the AMQPS port for non-TLS examples.

##########
File path: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
##########
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.net.URL;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import 
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ExecuteUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** This test will only be executed if you have qdrouterd available on your 
system, otherwise is ignored by an assume exception. */
+public class QpidDispatchPeerTest extends AmqpClientTestSupport {
+
+   ExecuteUtil.ProcessHolder qpidProcess;
+
+   /**
+    * This will validate if the environemnt has qdrouterd installed and if 
this test can be used or not.
+    */
+   @BeforeClass
+   public static void validateqdrotuer() {
+      try {
+         int result = ExecuteUtil.runCommand(true, "qdrouterd", "--version");
+         Assume.assumeTrue("qdrouterd does not exist", result == 0);
+      } catch (Exception e) {
+         e.printStackTrace();

Review comment:
       Given this will be the typical case, I think not printing (perhaps debug 
logging?) the stacktrace would be good, and instead having the assumption check 
say why it is skipping

##########
File path: examples/features/broker-connection/amqp-receiving-messages/pom.xml
##########
@@ -0,0 +1,165 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq.examples.broker-connection</groupId>
+      <artifactId>broker-connections</artifactId>
+      <version>2.16.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>amqp-receiving-messages</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>

Review comment:
       Name needs updating

##########
File path: 
tests/integration-tests/src/test/resources/QpidRouterPeerTest-qpidr.conf
##########
@@ -0,0 +1,51 @@
+ #
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #     http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #
+
+ router {
+     mode: standalone
+     id: INT.A
+ }
+#log {
+    #module: DEFAULT
+    #enable: trace+
+    #outputFile: /tmp/qdrouterd.log
+#}
+
+ # The broker connects into this port
+ listener {
+     saslMechanisms: ANONYMOUS
+     host: 0.0.0.0
+     role: route-container
+     linkCapacity: 1123
+     authenticatePeer: no
+     port: 24621
+ }
+

Review comment:
       Per the offline discussion yesterday, this listener isnt required and is 
somewhat misleading implying it has to be a route-container. The single normal 
(client) listener can be used.

##########
File path: docs/user-manual/en/amqp-broker-connections.md
##########
@@ -0,0 +1,238 @@
+# Broker Connections
+
+Instead of waiting for clients to connect, a broker can initiate a connection 
to another endpoint on a specific protocol.
+
+Currently, this feature supports only the AMQP protocol. However, in the 
future, it might be expanded to other protocols.
+
+You configure broker connections using a `<broker-connections>` XML element in 
the `broker.xml` configuration file.
+
+```xml
+<broker-connections>
+    ...
+</broker-connections>
+```
+
+# AMQP Server Connections
+
+An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. 
This means that the broker can connect to another AMQP server (not necessarily 
ActiveMQ Artemis) and create elements on that connection.
+
+To define an AMQP broker connection, add an `<amqp-connection>` element within 
the `<broker-connections` element in the `broker.xml` configuration file. For 
example:
+
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" 
retry-interval="100" reconnect-attempts="-1" user="john" password="doe">
+         ...
+    </amqp-connection>
+</broker-connections>
+```
+
+- `uri`: tcp://host:myport (this is a required argument)
+- `name`: Name of the connection used for management purposes
+- `user`: User name with which to connect to the endpoint (this is an optional 
argument)
+- `password`: Password with which to connect to the endpoint (this is an 
optional argument)
+- `retry-interval`: Time, in milliseconds to wait before retrying a connection 
after an error. The default value is `5000`.
+- `reconnect-attempts`: default is -1 meaning infinite
+- `auto-start` : Should the broker connection start automatically with the 
broker. Default is `true`. If false you need to call a management operation to 
start it.
+
+*Notice*: If you disable auto-start on the broker connection, the start of the 
broker connection will only happen after the management method 
`startBrokerConnection(connectionName)` is called on the ServerController.
+
+*Important*: The target endpoint needs permission for all operations that you 
configure. Therefore, If you are using a security manager, ensure that you 
perform the configured operations as a user with sufficient permissions.
+
+# AMQP Server Connection Operations
+The following types of operations are supported on a AMQP server connection:
+
+* Senders
+    * Messages received on specific queues are transferred to another endpoint
+* Receivers
+    * The broker pulls messages from another endpoint
+* Peers
+    * The broker creates both senders and receivers on another endpoint that 
knows how to handle them. Currently, this is implemented by Apache Qpid 
Dispatch.
+* Mirrors
+    * The broker uses an AMQP connection to another broker and duplicate 
messages and sends acknowledgements over the wire.
+
+## Senders and Receivers
+It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint 
simply by creating a sender or receiver broker connection element.
+
+For a `sender`, the broker creates a message consumer on a queue that sends 
messages to another AMQP endpoint.
+
+For a `receiver`, the broker creates a message producer on an address that 
receives messages from another AMQP endpoint.
+
+Both elements work like a message bridge. However, there is no additional 
overhead required to process messages. Senders and receivers behave just like 
any other consumer or producer in ActiveMQ Artemis.
+
+You can configure senders or receivers for specific queues. You can also match 
senders and receivers to specific addresses or _sets_ of addresses, using 
wildcard expressions. When configuring a sender or receiver, you can set the 
following properties:
+
+- `match`: Match the sender or receiver to a specific address or __set__ of 
addresses, using a wildcard expression
+- `queue-name`: Configure the sender or receiver for a specific queue
+
+
+Some examples are shown below.
+
+Using address expressions:
+```xml
+<broker-connections>
+        <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+                <sender match="queues.#"/>
+                <!-- notice the local queues for remotequeues.# need to be 
created on this broker -->
+                <receiver match="remotequeues.#"/>
+        </amqp-connection>
+</broker-connections>
+
+<addresses>
+        <address name="remotequeues.A">
+                <anycast>
+                        <queue name="remoteQueueA"/>
+                </anycast>
+        </address>
+        <address name="queues.B">
+                 <anycast>
+                        <queue name="localQueueB"/>
+                </anycast>
+        </address>
+</addresses>
+```
+
+Using queue names:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+        <receiver queue-name="remoteQueueA"/>
+        <sender queue-name="localQueueB"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="remotequeues.A">
+        <anycast>
+           <queue name="remoteQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+        <anycast>
+           <queue name="localQueueB"/>
+        </anycast>
+     </address>
+</addresses>
+
+```
+*Important*: You can match a receiver only to a local queue that already 
exists. Therefore, if you are using receivers, make sure that you pre-create 
the queue locally. Otherwise, the broker cannot match the remote queues and 
addresses.
+
+*Important*: Do not create a sender and a receiver to the same destination. 
This creates an infinite loop of sends and receives.
+
+
+# Peers
+A peer broker connection element is a combination of sender and receivers. The 
ActiveMQ Artemis broker creates both a sender and a receiver for a peer 
element, and the endpoint knows how to deal with the pair without creating an 
infinite loop of sending and receiving messages.
+
+Currently, [Apache Qpid Dispatch 
Router](https://qpid.apache.org/components/dispatch-router/index.html) is a 
peer. ActiveMQ Artemis creates the pair of receivers and sender for each 
matching destination. These senders and receivers have special configuration to 
let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis.
+
+You can experiment with advanced networking scenarios with Qpid Dispatch 
Router and get a lot of benefit from the AMQP protocol and its ecosystem.
+
+With a peer, you have the same properties that you have on a sender and 
receiver. For example:
+```xml
+<broker-connections>
+    <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
+       <peer match="queues.#"/>
+    </amqp-connection>
+</broker-connections>
+
+<addresses>
+     <address name="queues.A">
+        <anycast>
+           <queue name="localQueueA"/>
+        </anycast>
+     </address>
+     <address name="queues.B">
+     <anycast>
+        <queue name="localQueueB"/>
+     </anycast>
+    </address>
+</addresses>
+```
+
+*Important:* Do not use this feature to connect to another broker, otherwise 
any message sent will be immediately ready to consume creating an infinite echo 
of sends and receives.
+
+# Mirror 

Review comment:
       It occurred to me while looking this over again, that a given broker 
cant be the target of mirrors for more than 1 broker, since the message ID's 
passed back and forth between them (for the acking) are likely to clash. May be 
worth stating something around this in the docs?

##########
File path: examples/features/broker-connection/amqp-receiving-messages/pom.xml
##########
@@ -0,0 +1,165 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   <modelVersion>4.0.0</modelVersion>
+
+   <parent>
+      <groupId>org.apache.activemq.examples.broker-connection</groupId>
+      <artifactId>broker-connections</artifactId>
+      <version>2.16.0-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>amqp-receiving-messages</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name>
+
+   <properties>
+      <activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>org.apache.qpid</groupId>
+         <artifactId>qpid-jms-client</artifactId>
+         <version>${qpid.jms.version}</version>

Review comment:
       Is the version not dependency-managed somewhere already?

##########
File path: examples/features/broker-connection/amqp-sending-overssl/readme.md
##########
@@ -0,0 +1,15 @@
+# JMS SSL Example

Review comment:
       Readme needs updated

##########
File path: 
tests/integration-tests/src/test/resources/QpidRouterPeerTest-qpidr.conf
##########
@@ -0,0 +1,51 @@
+ #
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #     http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #
+
+ router {
+     mode: standalone
+     id: INT.A
+ }
+#log {
+    #module: DEFAULT
+    #enable: trace+
+    #outputFile: /tmp/qdrouterd.log
+#}
+
+ # The broker connects into this port
+ listener {
+     saslMechanisms: ANONYMOUS
+     host: 0.0.0.0
+     role: route-container
+     linkCapacity: 1123
+     authenticatePeer: no
+     port: 24621
+ }
+
+ # Clients connect to this port
+ listener {
+     saslMechanisms: ANONYMOUS
+     host: 0.0.0.0
+     linkCapacity: 555
+     role: normal
+     authenticatePeer: no
+     port: 24622
+ }

Review comment:
       I would remove the capacity config personally, most people wont need 
that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 505782)
    Time Spent: 27h 50m  (was: 27h 40m)

> AMQP Server Connectivity
> ------------------------
>
>                 Key: ARTEMIS-2937
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2937
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>          Components: AMQP
>            Reporter: Clebert Suconic
>            Assignee: Clebert Suconic
>            Priority: Major
>             Fix For: 2.16.0
>
>          Time Spent: 27h 50m
>  Remaining Estimate: 0h
>
> This feature adds server side connectivity.
>  
> It is possible to link two brokers directly using AMQP with this feature, and 
> have a Queue transferring messages to another broker directly. 
>  
> For this we would have options called <sender and <receiver
>  
>  
> it would also be possible to use qpid-dispatch as an intermediary between 
> clients and the brokers (or eventually between brokers), on that case the 
> option will be <peer
>  
> it would also be possible to use <mirror with a few option to replicate data 
> between two brokers, bringing the possibility of using it for Disaster & 
> Recovery and Failover.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to