[ https://issues.apache.org/jira/browse/ARTEMIS-2937?focusedWorklogId=505782&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-505782 ]
ASF GitHub Bot logged work on ARTEMIS-2937: ------------------------------------------- Author: ASF GitHub Bot Created on: 28/Oct/20 16:06 Start Date: 28/Oct/20 16:06 Worklog Time Spent: 10m Work Description: gemmellr commented on a change in pull request #3316: URL: https://github.com/apache/activemq-artemis/pull/3316#discussion_r513399360 ########## File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java ########## @@ -20,39 +20,82 @@ import java.util.Comparator; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.ToLongFunction; + +import io.netty.util.collection.LongObjectHashMap; /** * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any * elements added or removed from the queue either directly or via iterators. - * + * <p> * This class is not thread safe. */ public class LinkedListImpl<E> implements LinkedList<E> { private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10; private final Node<E> head = new NodeHolder<>(null); - + private final Comparator<E> comparator; + LongObjectHashMap<Node<E>> nodeMap; private Node<E> tail = null; - private int size; - // We store in an array rather than a Map for the best performance private volatile Iterator[] iters; - private int numIters; - private int nextIndex; - - private final Comparator<E> comparator; + private ToLongFunction<E> idSupplier; public LinkedListImpl() { - this(null); + this(null, null); } public LinkedListImpl(Comparator<E> comparator) { + this(comparator, null); + } + + public LinkedListImpl(Comparator<E> comparator, ToLongFunction<E> supplier) { iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE); this.comparator = comparator; + this.idSupplier = supplier; + if (idSupplier != null) { + this.nodeMap = newLongHashMap(); + } else { + this.nodeMap = null; + } + } + + @Override + public void clearID() { + idSupplier = null; + if (nodeMap != null) { + nodeMap.clear(); + nodeMap = null; + } + } Review comment: This isnt used, tested, or similarly included on the PriorityLinkedList. Should it just be removed for now, or added to the PriorityLinkedList and tested? ########## File path: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java ########## @@ -72,9 +115,43 @@ public void addHead(E e) { node.next.prev = node; } + itemAdded(node, e); + size++; } + @Override + public E removeWithID(long id) { + if (nodeMap == null) { + return null; + } + + Node<E> node = nodeMap.get(id); + if (node == null) { + return null; + } + + if (node.prev != null) { + removeAfter(node.prev); + } Review comment: Since there appears to be a fixed 'head' node that always exists, and doesnt seem to ever be in the nodeMap, what does it mean if node.prev is null here? I ask as it seems odd that removal with id could return a value as in this case, but appear not to do anything at all to update the main list structure? When can that happen? (I may be missing something obvious here..) ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java ########## @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; +import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.activemq.artemis.protocol.amqp.proton.SenderController; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.ConfigurationHelper; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.jboss.logging.Logger; + +public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection { + + private static final Logger logger = Logger.getLogger(AMQPBrokerConnection.class); + + private final AMQPBrokerConnectConfiguration brokerConnectConfiguration; + private final ProtonProtocolManager protonProtocolManager; + private final ActiveMQServer server; + private final NettyConnector bridgesConnector; + private NettyConnection connection; + private Session session; + private AMQPSessionContext sessionContext; + private ActiveMQProtonRemotingConnection protonRemotingConnection; + private volatile boolean started = false; + private final AMQPBrokerConnectionManager bridgeManager; + private int retryCounter = 0; + private volatile ScheduledFuture reconnectFuture; + private Set<Queue> senders = new HashSet<>(); + private Set<Queue> receivers = new HashSet<>(); + + final Executor connectExecutor; + final ScheduledExecutorService scheduledExecutorService; + + /** This is just for logging. + * the actual connection will come from the amqpConnection configuration*/ + String host; + + /** This is just for logging. + * the actual connection will come from the amqpConnection configuration*/ + int port; + + public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager, AMQPBrokerConnectConfiguration brokerConnectConfiguration, + ProtonProtocolManager protonProtocolManager, + ActiveMQServer server, + NettyConnector bridgesConnector) { + this.bridgeManager = bridgeManager; + this.brokerConnectConfiguration = brokerConnectConfiguration; + this.protonProtocolManager = protonProtocolManager; + this.server = server; + this.bridgesConnector = bridgesConnector; + connectExecutor = server.getExecutorFactory().getExecutor(); + scheduledExecutorService = server.getScheduledPool(); + } + + @Override + public String getName() { + return brokerConnectConfiguration.getName(); + } + + @Override + public String getProtocol() { + return "AMQP"; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void stop() { + if (connection != null) { + connection.close(); + } + ScheduledFuture scheduledFuture = reconnectFuture; + reconnectFuture = null; + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + started = false; + } + + @Override + public void start() throws Exception { + started = true; + server.getConfiguration().registerBrokerPlugin(this); + try { + + for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { + if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) { + installMirrorController((AMQPMirrorBrokerConnectionElement)connectionElement, server); + } + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + return; + } + connectExecutor.execute(() -> doConnect()); + } + + public NettyConnection getConnection() { + return connection; + } + + @Override + public void afterCreateQueue(Queue queue) { + connectExecutor.execute(() -> { + for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { + validateMatching(queue, connectionElement); + } + }); + } + + public void validateMatching(Queue queue, AMQPBrokerConnectionElement connectionElement) { + if (connectionElement.getType() != AMQPBrokerConnectionAddressType.MIRROR) { + if (connectionElement.getQueueName() != null) { + if (queue.getName().equals(connectionElement.getQueueName())) { + createLink(queue, connectionElement); + } + } else if (connectionElement.match(queue.getAddress(), server.getConfiguration().getWildcardConfiguration())) { + createLink(queue, connectionElement); + } + } + } + + public void createLink(Queue queue, AMQPBrokerConnectionElement connectionElement) { + if (connectionElement.getType() == AMQPBrokerConnectionAddressType.PEER) { + connectSender(queue, queue.getAddress().toString(), Symbol.valueOf("qd.waypoint")); + connectReceiver(protonRemotingConnection, session, sessionContext, queue, Symbol.valueOf("qd.waypoint")); + } else { + if (connectionElement.getType() == AMQPBrokerConnectionAddressType.SENDER) { + connectSender(queue, queue.getAddress().toString()); + } + if (connectionElement.getType() == AMQPBrokerConnectionAddressType.RECEIVER) { + connectReceiver(protonRemotingConnection, session, sessionContext, queue); + } + } + } + + private void doConnect() { + try { + List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations(); + + TransportConfiguration tpConfig = configurationList.get(0); + + String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams()); + int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams()); + this.host = hostOnParameter; + this.port = portOnParameter; + connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter); + + if (connection == null) { + retryConnection(); + return; + } + + reconnectFuture = null; + retryCounter = 0; + + // before we retry the connection we need to remove any previous links + // as they will need to be recreated + senders.clear(); + receivers.clear(); + + ClientSASLFactory saslFactory = null; + + if (brokerConnectConfiguration.getUser() != null && brokerConnectConfiguration.getPassword() != null) { + saslFactory = availableMechanims -> { + if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) { + return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword()); + } else { + return null; + } + }; + } + + ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory); + protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection; + connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler())); + + protonRemotingConnection.getAmqpConnection().runLater(() -> { + protonRemotingConnection.getAmqpConnection().open(); + protonRemotingConnection.getAmqpConnection().flush(); + }); + + session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session(); + sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session); + protonRemotingConnection.getAmqpConnection().runLater(() -> { + session.open(); + protonRemotingConnection.getAmqpConnection().flush(); + }); + + if (brokerConnectConfiguration.getConnectionElements() != null) { + Stream<Binding> bindingStream = server.getPostOffice().getAllBindings(); + + bindingStream.forEach(binding -> { + if (binding instanceof QueueBinding) { + Queue queue = ((QueueBinding) binding).getQueue(); + for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { + validateMatching(queue, connectionElement); + } + } + }); + + for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) { + if (connectionElement.getType() == AMQPBrokerConnectionAddressType.MIRROR) { + AMQPMirrorBrokerConnectionElement replica = (AMQPMirrorBrokerConnectionElement)connectionElement; + Queue queue = server.locateQueue(replica.getSourceMirrorAddress()); + + connectSender(queue, ProtonProtocolManager.MIRROR_ADDRESS); + } + } + } + + protonRemotingConnection.getAmqpConnection().flush(); Review comment: This section appears like it probably isnt thread safe in its use of proton-j. The initial connection open and a flush is offloaded to the connection thread...then this thread proceeds to create a session object on the connection, though then offloads its open and flush and open call back to the connection thread. The next bit where it starts creating senders and recievers on this thread (if not more, depends whether the Stream returned was sequential or not) is probably safe as those bits offload to the connection thread I believe. ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ########## @@ -129,7 +156,7 @@ public AMQPConnectionContext(ProtonProtocolManager protocolManager, } else { nettyExecutor = new ExecutorNettyAdapter(protocolManager.getServer().getExecutorFactory().getExecutor()); } - this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection); + this.handler = new ProtonHandler(nettyExecutor, protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection && saslClientFactory == null); Review comment: The arg modified is called "isServer", and this says it may now change depending on whether saslClientFactory is null - yet the new bridge code allows it to be both null or not, depending on whether a user/pass is given. This seems strange. ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: + +- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression +- `queue-name`: Configure the sender or receiver for a specific queue + + +Some examples are shown below. + +Using address expressions: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <sender match="queues.#"/> + <!-- notice the local queues for remotequeues.# need to be created on this broker --> + <receiver match="remotequeues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +Using queue names: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <receiver queue-name="remoteQueueA"/> + <sender queue-name="localQueueB"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> + +``` +*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses. + +*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives. + + +# Peers +A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages. + +Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. + +You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem. + +With a peer, you have the same properties that you have on a sender and receiver. For example: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <peer match="queues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="queues.A"> + <anycast> + <queue name="localQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> Review comment: I think a simpler example of matching queue+address names would be a better starting point for most people. Regardless, an example with differing names should outline what its actually expected its going to do. It isnt at all clear here whether the waypointed address used by clients connecting to the router is meant to be the broker address name or the broker queue name. ########## File path: examples/features/broker-connection/amqp-receiving-messages/pom.xml ########## @@ -0,0 +1,165 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq.examples.broker-connection</groupId> + <artifactId>broker-connections</artifactId> + <version>2.16.0-SNAPSHOT</version> + </parent> + + <artifactId>amqp-receiving-messages</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name> + + <properties> + <activemq.basedir>${project.basedir}/../../../..</activemq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-jms-client</artifactId> + <version>${qpid.jms.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-maven-plugin</artifactId> + <executions> + <execution> + <id>create0</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <ignore>${noServer}</ignore> + <instance>${basedir}/target/server0</instance> + <allowAnonymous>true</allowAnonymous> + <configuration>${basedir}/target/classes/activemq/server0</configuration> + <!-- this makes it easier in certain envs --> + <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions> + </configuration> + </execution> + <execution> + <id>create1</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <ignore>${noServer}</ignore> + <instance>${basedir}/target/server1</instance> + <allowAnonymous>true</allowAnonymous> + <configuration>${basedir}/target/classes/activemq/server1</configuration> + <!-- this makes it easier in certain envs --> + <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions> + </configuration> + </execution> + <!-- we first start broker 1, to avoid reconnecting statements --> + <execution> + <id>start1</id> + <goals> + <goal>cli</goal> + </goals> + <configuration> + <ignore>${noServer}</ignore> + <spawn>true</spawn> + <location>${basedir}/target/server1</location> + <testURI>tcp://localhost:5672</testURI> + <args> + <param>run</param> + </args> + <name>server1</name> + </configuration> + </execution> + <execution> + <id>start0</id> + <goals> + <goal>cli</goal> + </goals> + <configuration> + <spawn>true</spawn> + <ignore>${noServer}</ignore> + <location>${basedir}/target/server0</location> + <testURI>tcp://localhost:5671</testURI> Review comment: I'd avoid using the AMQPS port for non-TLS examples. ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java ########## @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.BrokerConnection; +import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; +import org.apache.activemq.artemis.protocol.amqp.proton.SenderController; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL; +import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.ConfigurationHelper; +import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.jboss.logging.Logger; + +public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, ActiveMQServerQueuePlugin, BrokerConnection { + + private static final Logger logger = Logger.getLogger(AMQPBrokerConnection.class); + + private final AMQPBrokerConnectConfiguration brokerConnectConfiguration; + private final ProtonProtocolManager protonProtocolManager; + private final ActiveMQServer server; + private final NettyConnector bridgesConnector; + private NettyConnection connection; + private Session session; + private AMQPSessionContext sessionContext; + private ActiveMQProtonRemotingConnection protonRemotingConnection; + private volatile boolean started = false; + private final AMQPBrokerConnectionManager bridgeManager; + private int retryCounter = 0; + private volatile ScheduledFuture reconnectFuture; + private Set<Queue> senders = new HashSet<>(); + private Set<Queue> receivers = new HashSet<>(); + + final Executor connectExecutor; + final ScheduledExecutorService scheduledExecutorService; + + /** This is just for logging. + * the actual connection will come from the amqpConnection configuration*/ + String host; + + /** This is just for logging. + * the actual connection will come from the amqpConnection configuration*/ + int port; + + public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager, AMQPBrokerConnectConfiguration brokerConnectConfiguration, + ProtonProtocolManager protonProtocolManager, + ActiveMQServer server, + NettyConnector bridgesConnector) { + this.bridgeManager = bridgeManager; + this.brokerConnectConfiguration = brokerConnectConfiguration; + this.protonProtocolManager = protonProtocolManager; + this.server = server; + this.bridgesConnector = bridgesConnector; + connectExecutor = server.getExecutorFactory().getExecutor(); + scheduledExecutorService = server.getScheduledPool(); + } + + @Override + public String getName() { + return brokerConnectConfiguration.getName(); + } + + @Override + public String getProtocol() { + return "AMQP"; + } + + @Override + public boolean isStarted() { + return started; + } + + @Override + public void stop() { + if (connection != null) { + connection.close(); + } Review comment: If you stop a connection with mirror config that doesnt specify a source mirror address, what happens to the generated mirror queue and its contents? ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ########## @@ -466,8 +466,10 @@ public void serverSend(ProtonServerReceiverContext context, RoutingType routingType = null; if (address != null) { - // Fixed-address producer - message.setAddress(address); + if (!address.toString().equals(message.getAddress())) { + // set Fixed-address producer if the message.properties.to address differs from the producer + message.setAddress(address); + } Review comment: Would read better if the "// Fixed-address producer" bit was outside the inner if, where it was originally, to balance with the comment in the other leg ( // Anonymous-relay producer, message must carry a To value) ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ########## @@ -173,8 +177,35 @@ public ProtonProtocolManager setDirectDeliver(boolean directDeliver) { return this; } + /** for outgoing */ + public ProtonClientProtocolManager createClientManager() { + ProtonClientProtocolManager clientOutgoing = new ProtonClientProtocolManager(factory, server); + return clientOutgoing; + } + @Override public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { + return internalConnectionEntry(remotingConnection, false, null); + } + + /** This method is not part of the ProtocolManager interface because it only makes sense on AMQP. + * More specifically on AMQP Bridges */ + public ConnectionEntry createOutgoingConnectionEntry(Connection remotingConnection) { + return internalConnectionEntry(remotingConnection, true, null); + } + + public ConnectionEntry createOutgoingConnectionEntry(Connection remotingConnection, ClientSASLFactory saslFactory) { + return internalConnectionEntry(remotingConnection, true, saslFactory); + } + + /** + * AMQP is an agnostic protocol, client and server. + * This method is used also by the AMQPConenctionBridge where there is no acceptor in place. Review comment: typo in AMQPConenctionBridge EDIT: Actually, the comment may be in the wrong place? This is a provate method so it cant directly be used by AMQPConnectionBridge ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: + +- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression +- `queue-name`: Configure the sender or receiver for a specific queue + + +Some examples are shown below. + +Using address expressions: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <sender match="queues.#"/> + <!-- notice the local queues for remotequeues.# need to be created on this broker --> + <receiver match="remotequeues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +Using queue names: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <receiver queue-name="remoteQueueA"/> + <sender queue-name="localQueueB"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> Review comment: Differing indentation levels. See also comment about clarifying what actually ends up happening, the *important* note below still doesnt really cover it. Maybe some diagrams / ascii art might. A simplified example using the same queue and address names might be a better start. ########## File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java ########## @@ -362,6 +362,7 @@ public PagingStore getOwner() { @Override public void setOwner(PagingStore owner) { + new Exception("Setting owner as " + owner.getStoreName()).printStackTrace(); Review comment: Debug stacktrace left over ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: + +- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression +- `queue-name`: Configure the sender or receiver for a specific queue + + +Some examples are shown below. + +Using address expressions: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <sender match="queues.#"/> + <!-- notice the local queues for remotequeues.# need to be created on this broker --> + <receiver match="remotequeues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +Using queue names: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <receiver queue-name="remoteQueueA"/> + <sender queue-name="localQueueB"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> + +``` +*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses. + +*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives. + + +# Peers +A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages. + +Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. Review comment: Not unless you tell the router to, which this section doesnt currently clarify in any way. Its not clear from this what the feature even does. I dont think anyone reading this without fairly deep implementation knoweldge would be able to make this feature work using the information currently here. This section should mention that it configures the broker to connect to the router and make itself available for storing messsages on a waypoint address, such that the messages sent to the router on the address are message-rotued to/from the broker. There needs to be an example of the address configuration required for Dispatch to treat addresse as a waypoint. ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; +import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.jboss.logging.Logger; + +public class AMQPMirrorControllerSource implements MirrorController, ActiveMQComponent { + + private static final Logger logger = Logger.getLogger(AMQPMirrorControllerSource.class); + + public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-amq-mr-ev-type"); + public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr"); + public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu"); + + // Events: + public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress"); + public static final Symbol DELETE_ADDRESS = Symbol.getSymbol("deleteAddress"); + public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue"); + public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue"); + public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("AddressCanStart"); + public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("AddressScanEnd"); Review comment: Starts with Capital while all the others are camelCase ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: Review comment: Why the asymmetry that you can 'match' on addresses, but only specify exact queues? Why arent they both for addresses, or both for queues? Its also unclear from the doc here - is the sender created then sending from the queue(s) matched by the address 'match', or is it sending from the address matched to the config? (Distinction being important if address and queue names differ, like in the examples shown later) ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: + +- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression +- `queue-name`: Configure the sender or receiver for a specific queue + + +Some examples are shown below. + +Using address expressions: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <sender match="queues.#"/> + <!-- notice the local queues for remotequeues.# need to be created on this broker --> + <receiver match="remotequeues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +Using queue names: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <receiver queue-name="remoteQueueA"/> + <sender queue-name="localQueueB"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> + +``` +*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses. + +*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives. + + +# Peers +A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages. + +Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. + +You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem. + +With a peer, you have the same properties that you have on a sender and receiver. For example: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> Review comment: Perhaps "my-router" would be a better name? ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: + +- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression +- `queue-name`: Configure the sender or receiver for a specific queue + + +Some examples are shown below. + +Using address expressions: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <sender match="queues.#"/> + <!-- notice the local queues for remotequeues.# need to be created on this broker --> + <receiver match="remotequeues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +Using queue names: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <receiver queue-name="remoteQueueA"/> + <sender queue-name="localQueueB"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> + +``` +*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses. + +*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives. + + +# Peers +A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages. + +Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. + +You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem. + +With a peer, you have the same properties that you have on a sender and receiver. For example: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <peer match="queues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="queues.A"> + <anycast> + <queue name="localQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +*Important:* Do not use this feature to connect to another broker, otherwise any message sent will be immediately ready to consume creating an infinite echo of sends and receives. + +# Mirror +The mirror option on the broker connection can capture events from the broker and pass them over the wire to another broker. This enables you to capture multiple asynchronous replicas. The following types of events are captured: + +* Message routing +* Message acknowledgement +* Queue and address creation +* queue and address deletion + +When you configure a mirror, these events are captured from the broker, stored on a local queue, and later forwarded to a target destination on another ActiveMQ Artemis broker. + +To configure a mirror, you add a `<mirror>` element within the `<amqp-connection>` element. + +The local queue is called `source-mirror-address` + +You can specify the following optional arguments. + +* `queue-removal`: Specifies whether a queue- or address-removal event is sent. The default value is `true`. +* `message-acknowledgements`: Specifies whether message acknowledgements are sent. The default value is `true`. +* `queue-creation`: Specifies whether a queue- or address-creation event is sent. The default value is `true`. +* `source-mirror-address`: By default, the mirror creates a non-durable temporary queue to store messages before they are sent to the other broker. If you define a name value for this property, an ANYCAST durable queue and address is created with the specified name. + +An example of a mirror configuration is shown below: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <mirror queue-removal="true" queue-creation="true" message-acknowledgements="true" source-mirror-address="myLocalSNFMirrorQueue"/> + </amqp-connection> +</broker-connections> +``` + +## Catch up on Mirror +The broker will not send past events over the mirror. As the broker sends and receives messages, only a natural catch up would eventually happen. Review comment: Think this could be clarified, im not sure what it means by 'natural catchup'. Is it saying, only events occuring and messages arriving on a server after the mirror is started, will be be mirrored? That is, existing messages on a queue, present before a mirror is configured, will not be mirrored? ########## File path: examples/features/broker-connection/amqp-sending-messages/pom.xml ########## @@ -0,0 +1,165 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq.examples.broker-connection</groupId> + <artifactId>broker-connections</artifactId> + <version>2.16.0-SNAPSHOT</version> + </parent> + + <artifactId>amqp-sending-messages</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name> Review comment: Name needs updated ########## File path: examples/features/broker-connection/amqp-sending-overssl/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionSenderSSL.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.JmsConnectionFactory; + +/** + * This example is demonstrating how messages are transferred from one broker towards another broker + * through the sender element on a AMQP Broker Connection. + */ +public class BrokerConnectionSenderSSL { + + public static void main(final String[] args) throws Exception { + Connection connectionOnServer0 = null; + ConnectionFactory connectionFactoryServer0 = new JmsConnectionFactory("amqp://localhost:5671"); Review comment: I know the target of the example is SSL between the brokers...but it might make more sense to use SSL for the clients too. Then you would only need one acceptor. This basically makes it seem like SSL isnt being used. ########## File path: examples/features/broker-connection/amqp-receiving-messages/src/main/java/org/apache/activemq/artemis/jms/example/BrokerConnectionReceiver.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.JmsConnectionFactory; + +/** + * This example demonstrates how sessions created from a single connection can be load + * balanced across the different nodes of the cluster. + * <p> + * In this example there are three nodes and we use a round-robin client side load-balancing + * policy. + */ Review comment: Stale comment ########## File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.paging.PagingStore; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.impl.AckReason; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; +import org.apache.activemq.artemis.core.server.mirror.MirrorController; +import org.apache.activemq.artemis.core.transaction.Transaction; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.jboss.logging.Logger; + +public class AMQPMirrorControllerSource implements MirrorController, ActiveMQComponent { + + private static final Logger logger = Logger.getLogger(AMQPMirrorControllerSource.class); + + public static final Symbol EVENT_TYPE = Symbol.getSymbol("x-opt-amq-mr-ev-type"); + public static final Symbol ADDRESS = Symbol.getSymbol("x-opt-amq-mr-adr"); + public static final Symbol QUEUE = Symbol.getSymbol("x-opt-amq-mr-qu"); + + // Events: + public static final Symbol ADD_ADDRESS = Symbol.getSymbol("addAddress"); + public static final Symbol DELETE_ADDRESS = Symbol.getSymbol("deleteAddress"); + public static final Symbol CREATE_QUEUE = Symbol.getSymbol("createQueue"); + public static final Symbol DELETE_QUEUE = Symbol.getSymbol("deleteQueue"); + public static final Symbol ADDRESS_SCAN_START = Symbol.getSymbol("AddressCanStart"); + public static final Symbol ADDRESS_SCAN_END = Symbol.getSymbol("AddressScanEnd"); + public static final Symbol POST_ACK = Symbol.getSymbol("postAck"); + + // Delivery annotation property used on mirror control routing and Ack + public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id"); + public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst"); + + private static final ThreadLocal<MirrorControlRouting> mirrorControlRouting = ThreadLocal.withInitial(() -> new MirrorControlRouting(null)); + + final Queue snfQueue; + final ActiveMQServer server; + final boolean acks; + + boolean started; + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + + @Override + public boolean isStarted() { + return started; + } + + public AMQPMirrorControllerSource(Queue snfQueue, ActiveMQServer server, boolean acks) { + this.snfQueue = snfQueue; + this.server = server; + this.acks = acks; + } + + @Override + public void startAddressScan() throws Exception { + Message message = createMessage(null, null, ADDRESS_SCAN_START, null); + route(server, message); + } + + @Override + public void endAddressScan() throws Exception { + Message message = createMessage(null, null, ADDRESS_SCAN_END, null); + route(server, message); + } + + @Override + public void addAddress(AddressInfo addressInfo) throws Exception { + Message message = createMessage(addressInfo.getName(), null, ADD_ADDRESS, addressInfo.toJSON()); + route(server, message); + } + + @Override + public void deleteAddress(AddressInfo addressInfo) throws Exception { + Message message = createMessage(addressInfo.getName(), null, DELETE_ADDRESS, addressInfo.toJSON()); + route(server, message); + } + + @Override + public void createQueue(QueueConfiguration queueConfiguration) throws Exception { + Message message = createMessage(queueConfiguration.getAddress(), queueConfiguration.getName(), CREATE_QUEUE, queueConfiguration.toJSON()); + route(server, message); + } + + @Override + public void deleteQueue(SimpleString address, SimpleString queue) throws Exception { + Message message = createMessage(address, queue, DELETE_QUEUE, queue.toString()); + route(server, message); + } + + @Override + public void sendMessage(Message message, RoutingContext context, List<MessageReference> refs) { + + try { + context.setReusable(false); + PagingStore storeOwner = null; + if (refs.size() > 0) { + storeOwner = refs.get(0).getOwner(); + } + if (storeOwner != null && !storeOwner.getAddress().equals(message.getAddressSimpleString())) { + storeOwner = server.getPagingManager().getPageStore(message.getAddressSimpleString()); + } + MessageReference ref = MessageReference.Factory.createReference(message, snfQueue, storeOwner); + + snfQueue.refUp(ref); + + Map<Symbol, Object> symbolObjectMap = new HashMap<>(); Review comment: "daMap" might be more descriptive, help readability ########## File path: examples/features/broker-connection/amqp-sending-messages/pom.xml ########## @@ -0,0 +1,165 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq.examples.broker-connection</groupId> + <artifactId>broker-connections</artifactId> + <version>2.16.0-SNAPSHOT</version> + </parent> + + <artifactId>amqp-sending-messages</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name> + + <properties> + <activemq.basedir>${project.basedir}/../../../..</activemq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-jms-client</artifactId> + <version>${qpid.jms.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.activemq</groupId> + <artifactId>artemis-maven-plugin</artifactId> + <executions> + <execution> + <id>create0</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <ignore>${noServer}</ignore> + <instance>${basedir}/target/server0</instance> + <allowAnonymous>true</allowAnonymous> + <configuration>${basedir}/target/classes/activemq/server0</configuration> + <!-- this makes it easier in certain envs --> + <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions> + </configuration> + </execution> + <execution> + <id>create1</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <ignore>${noServer}</ignore> + <instance>${basedir}/target/server1</instance> + <allowAnonymous>true</allowAnonymous> + <configuration>${basedir}/target/classes/activemq/server1</configuration> + <!-- this makes it easier in certain envs --> + <javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions> + </configuration> + </execution> + <!-- we first start broker 1, to avoid reconnecting statements --> + <execution> + <id>start1</id> + <goals> + <goal>cli</goal> + </goals> + <configuration> + <ignore>${noServer}</ignore> + <spawn>true</spawn> + <location>${basedir}/target/server1</location> + <testURI>tcp://localhost:5672</testURI> + <args> + <param>run</param> + </args> + <name>server1</name> + </configuration> + </execution> + <execution> + <id>start0</id> + <goals> + <goal>cli</goal> + </goals> + <configuration> + <spawn>true</spawn> + <ignore>${noServer}</ignore> + <location>${basedir}/target/server0</location> + <testURI>tcp://localhost:5671</testURI> Review comment: I'd avoid using the AMQPS port for non-TLS examples. ########## File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java ########## @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.net.URL; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.ExecuteUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** This test will only be executed if you have qdrouterd available on your system, otherwise is ignored by an assume exception. */ +public class QpidDispatchPeerTest extends AmqpClientTestSupport { + + ExecuteUtil.ProcessHolder qpidProcess; + + /** + * This will validate if the environemnt has qdrouterd installed and if this test can be used or not. + */ + @BeforeClass + public static void validateqdrotuer() { + try { + int result = ExecuteUtil.runCommand(true, "qdrouterd", "--version"); + Assume.assumeTrue("qdrouterd does not exist", result == 0); + } catch (Exception e) { + e.printStackTrace(); Review comment: Given this will be the typical case, I think not printing (perhaps debug logging?) the stacktrace would be good, and instead having the assumption check say why it is skipping ########## File path: examples/features/broker-connection/amqp-receiving-messages/pom.xml ########## @@ -0,0 +1,165 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq.examples.broker-connection</groupId> + <artifactId>broker-connections</artifactId> + <version>2.16.0-SNAPSHOT</version> + </parent> + + <artifactId>amqp-receiving-messages</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name> Review comment: Name needs updating ########## File path: tests/integration-tests/src/test/resources/QpidRouterPeerTest-qpidr.conf ########## @@ -0,0 +1,51 @@ + # + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # + + router { + mode: standalone + id: INT.A + } +#log { + #module: DEFAULT + #enable: trace+ + #outputFile: /tmp/qdrouterd.log +#} + + # The broker connects into this port + listener { + saslMechanisms: ANONYMOUS + host: 0.0.0.0 + role: route-container + linkCapacity: 1123 + authenticatePeer: no + port: 24621 + } + Review comment: Per the offline discussion yesterday, this listener isnt required and is somewhat misleading implying it has to be a route-container. The single normal (client) listener can be used. ########## File path: docs/user-manual/en/amqp-broker-connections.md ########## @@ -0,0 +1,238 @@ +# Broker Connections + +Instead of waiting for clients to connect, a broker can initiate a connection to another endpoint on a specific protocol. + +Currently, this feature supports only the AMQP protocol. However, in the future, it might be expanded to other protocols. + +You configure broker connections using a `<broker-connections>` XML element in the `broker.xml` configuration file. + +```xml +<broker-connections> + ... +</broker-connections> +``` + +# AMQP Server Connections + +An ActiveMQ Artemis broker can initiate connections using the AMQP protocol. This means that the broker can connect to another AMQP server (not necessarily ActiveMQ Artemis) and create elements on that connection. + +To define an AMQP broker connection, add an `<amqp-connection>` element within the `<broker-connections` element in the `broker.xml` configuration file. For example: + +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker" retry-interval="100" reconnect-attempts="-1" user="john" password="doe"> + ... + </amqp-connection> +</broker-connections> +``` + +- `uri`: tcp://host:myport (this is a required argument) +- `name`: Name of the connection used for management purposes +- `user`: User name with which to connect to the endpoint (this is an optional argument) +- `password`: Password with which to connect to the endpoint (this is an optional argument) +- `retry-interval`: Time, in milliseconds to wait before retrying a connection after an error. The default value is `5000`. +- `reconnect-attempts`: default is -1 meaning infinite +- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it. + +*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController. + +*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions. + +# AMQP Server Connection Operations +The following types of operations are supported on a AMQP server connection: + +* Senders + * Messages received on specific queues are transferred to another endpoint +* Receivers + * The broker pulls messages from another endpoint +* Peers + * The broker creates both senders and receivers on another endpoint that knows how to handle them. Currently, this is implemented by Apache Qpid Dispatch. +* Mirrors + * The broker uses an AMQP connection to another broker and duplicate messages and sends acknowledgements over the wire. + +## Senders and Receivers +It is possible to connect an ActiveMQ Artemis broker to another AMQP endpoint simply by creating a sender or receiver broker connection element. + +For a `sender`, the broker creates a message consumer on a queue that sends messages to another AMQP endpoint. + +For a `receiver`, the broker creates a message producer on an address that receives messages from another AMQP endpoint. + +Both elements work like a message bridge. However, there is no additional overhead required to process messages. Senders and receivers behave just like any other consumer or producer in ActiveMQ Artemis. + +You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties: + +- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression +- `queue-name`: Configure the sender or receiver for a specific queue + + +Some examples are shown below. + +Using address expressions: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <sender match="queues.#"/> + <!-- notice the local queues for remotequeues.# need to be created on this broker --> + <receiver match="remotequeues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +Using queue names: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <receiver queue-name="remoteQueueA"/> + <sender queue-name="localQueueB"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="remotequeues.A"> + <anycast> + <queue name="remoteQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> + +``` +*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses. + +*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives. + + +# Peers +A peer broker connection element is a combination of sender and receivers. The ActiveMQ Artemis broker creates both a sender and a receiver for a peer element, and the endpoint knows how to deal with the pair without creating an infinite loop of sending and receiving messages. + +Currently, [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/index.html) is a peer. ActiveMQ Artemis creates the pair of receivers and sender for each matching destination. These senders and receivers have special configuration to let Qpid Dispatch Router know to collaborate with ActiveMQ Artemis. + +You can experiment with advanced networking scenarios with Qpid Dispatch Router and get a lot of benefit from the AMQP protocol and its ecosystem. + +With a peer, you have the same properties that you have on a sender and receiver. For example: +```xml +<broker-connections> + <amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker"> + <peer match="queues.#"/> + </amqp-connection> +</broker-connections> + +<addresses> + <address name="queues.A"> + <anycast> + <queue name="localQueueA"/> + </anycast> + </address> + <address name="queues.B"> + <anycast> + <queue name="localQueueB"/> + </anycast> + </address> +</addresses> +``` + +*Important:* Do not use this feature to connect to another broker, otherwise any message sent will be immediately ready to consume creating an infinite echo of sends and receives. + +# Mirror Review comment: It occurred to me while looking this over again, that a given broker cant be the target of mirrors for more than 1 broker, since the message ID's passed back and forth between them (for the acking) are likely to clash. May be worth stating something around this in the docs? ########## File path: examples/features/broker-connection/amqp-receiving-messages/pom.xml ########## @@ -0,0 +1,165 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.activemq.examples.broker-connection</groupId> + <artifactId>broker-connections</artifactId> + <version>2.16.0-SNAPSHOT</version> + </parent> + + <artifactId>amqp-receiving-messages</artifactId> + <packaging>jar</packaging> + <name>ActiveMQ Artemis JMS Client Side Load Balancing Example</name> + + <properties> + <activemq.basedir>${project.basedir}/../../../..</activemq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-jms-client</artifactId> + <version>${qpid.jms.version}</version> Review comment: Is the version not dependency-managed somewhere already? ########## File path: examples/features/broker-connection/amqp-sending-overssl/readme.md ########## @@ -0,0 +1,15 @@ +# JMS SSL Example Review comment: Readme needs updated ########## File path: tests/integration-tests/src/test/resources/QpidRouterPeerTest-qpidr.conf ########## @@ -0,0 +1,51 @@ + # + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # + + router { + mode: standalone + id: INT.A + } +#log { + #module: DEFAULT + #enable: trace+ + #outputFile: /tmp/qdrouterd.log +#} + + # The broker connects into this port + listener { + saslMechanisms: ANONYMOUS + host: 0.0.0.0 + role: route-container + linkCapacity: 1123 + authenticatePeer: no + port: 24621 + } + + # Clients connect to this port + listener { + saslMechanisms: ANONYMOUS + host: 0.0.0.0 + linkCapacity: 555 + role: normal + authenticatePeer: no + port: 24622 + } Review comment: I would remove the capacity config personally, most people wont need that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 505782) Time Spent: 27h 50m (was: 27h 40m) > AMQP Server Connectivity > ------------------------ > > Key: ARTEMIS-2937 > URL: https://issues.apache.org/jira/browse/ARTEMIS-2937 > Project: ActiveMQ Artemis > Issue Type: New Feature > Components: AMQP > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.16.0 > > Time Spent: 27h 50m > Remaining Estimate: 0h > > This feature adds server side connectivity. > > It is possible to link two brokers directly using AMQP with this feature, and > have a Queue transferring messages to another broker directly. > > For this we would have options called <sender and <receiver > > > it would also be possible to use qpid-dispatch as an intermediary between > clients and the brokers (or eventually between brokers), on that case the > option will be <peer > > it would also be possible to use <mirror with a few option to replicate data > between two brokers, bringing the possibility of using it for Disaster & > Recovery and Failover. -- This message was sent by Atlassian Jira (v8.3.4#803005)