This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 677d71b8e7 ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and subscriptions 677d71b8e7 is described below commit 677d71b8e7b3bb0afd2fd618af4b86657324e50b Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Fri Jul 14 16:49:31 2023 -0400 ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and subscriptions --- .../artemis/cli/commands/AbstractAction.java | 17 +- .../utils/collections/NodeStoreFactory.java | 21 ++ .../api/core/management/ManagementHelper.java | 37 ++++ .../amqp/broker/ProtonProtocolManager.java | 8 +- .../connect/mirror/AMQPMirrorControllerSource.java | 6 +- .../connect/mirror/AMQPMirrorControllerTarget.java | 12 +- .../amqp/connect/mirror/ReferenceNodeStore.java | 41 +--- .../connect/mirror/ReferenceNodeStoreFactory.java | 82 ++++++++ .../apache/activemq/artemis/core/server/Queue.java | 3 +- .../artemis/core/server/impl/QueueImpl.java | 9 +- .../core/server/impl/RoutingContextTest.java | 4 +- .../server/impl/ScheduledDeliveryHandlerTest.java | 4 +- .../integration/amqp/connect/AMQPReplicaTest.java | 221 ++++++++++++++++++++ tests/smoke-tests/pom.xml | 32 +++ .../mirrored-subscriptions/broker1/broker.xml | 226 +++++++++++++++++++++ .../mirrored-subscriptions/broker2/broker.xml | 220 ++++++++++++++++++++ .../brokerConnection/MirroredSubscriptionTest.java | 164 +++++++++++++++ .../tests/smoke/common/SimpleManagement.java | 68 +++++++ .../tests/unit/core/postoffice/impl/FakeQueue.java | 4 +- 19 files changed, 1107 insertions(+), 72 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java index 37f08c35f1..3eebaf4e2c 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.api.core.client.ClientRequestor; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; @@ -28,25 +27,13 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; public abstract class AbstractAction extends ConnectionAbstract { + // TODO: This call could be replaced by a direct call into ManagementHelpr.doManagement and their lambdas public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception { - try (ActiveMQConnectionFactory factory = createCoreConnectionFactory(); ServerLocator locator = factory.getServerLocator(); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) { - session.start(); - ClientRequestor requestor = new ClientRequestor(session, "activemq.management"); - ClientMessage message = session.createMessage(false); - - cb.setUpInvocation(message); - - ClientMessage reply = requestor.request(message); - - if (ManagementHelper.hasOperationSucceeded(reply)) { - cb.requestSuccessful(reply); - } else { - cb.requestFailed(reply); - } + ManagementHelper.doManagement(session, cb::setUpInvocation, cb::requestSuccessful, cb::requestFailed); } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java new file mode 100644 index 0000000000..2bd6c9c292 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +public interface NodeStoreFactory<E> { + NodeStore<E> newNodeStore(); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index 678df28625..69a16a8f4b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.artemis.api.core.management; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.json.JsonArray; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -87,6 +94,36 @@ public final class ManagementHelper { public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID"); + // Lambda declaration for management function. Pretty much same thing as java.util.function.Consumer but with an exception in the declaration that was needed. + public interface MessageAcceptor { + void accept(ClientMessage message) throws Exception; + } + + /** Utility function to connect to a server and perform a management operation via core. */ + public static void doManagement(String uri, String user, String password, MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception { + try (ServerLocator locator = ServerLocatorImpl.newLocator(uri); + ClientSessionFactory sessionFactory = locator.createSessionFactory(); + ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) { + doManagement(session, setup, ok, failed); + } + } + + /** Utility function to reuse a ClientSessionConnection and perform a single management operation via core. */ + public static void doManagement(ClientSession session, MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception { + session.start(); + ClientRequestor requestor = new ClientRequestor(session, "activemq.management"); + ClientMessage message = session.createMessage(false); + + setup.accept(message); + + ClientMessage reply = requestor.request(message); + + if (ManagementHelper.hasOperationSucceeded(reply)) { + ok.accept(reply); + } else { + failed.accept(reply); + } + } /** * Stores a resource attribute in a message to retrieve the value from the server resource. diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index a34184ae6b..9355bbdff1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStore; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler; @@ -75,7 +75,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, // We must use one referenceIDSupplier per server. // protocol manager is the perfect aggregation for that. - private ReferenceNodeStore referenceIDSupplier; + private ReferenceNodeStoreFactory referenceIDSupplier; private final ProtonProtocolManagerFactory factory; @@ -125,11 +125,11 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, routingHandler = new AMQPRoutingHandler(server); } - public synchronized ReferenceNodeStore getReferenceIDSupplier() { + public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() { if (referenceIDSupplier == null) { // we lazy start the instance. // only create it when needed - referenceIDSupplier = new ReferenceNodeStore(server); + referenceIDSupplier = new ReferenceNodeStoreFactory(server); } return referenceIDSupplier; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 160e44c1aa..6d943bf2d6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -92,7 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im final Queue snfQueue; final ActiveMQServer server; - final ReferenceNodeStore idSupplier; + final ReferenceNodeStoreFactory idSupplier; final boolean acks; final boolean addQueues; final boolean deleteQueues; @@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im } } - public static void validateProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { + public static void validateProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { setProtocolData(referenceIDSupplier, ref); } } /** This method will return the brokerID used by the message */ - private static String setProtocolData(ReferenceNodeStore referenceIDSupplier, MessageReference ref) { + private static String setProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref) { String brokerID = referenceIDSupplier.getServerID(ref); long id = referenceIDSupplier.getID(ref); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index 55f35bf645..fa168005a0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement DuplicateIDCache lruduplicateIDCache; String lruDuplicateIDKey; - private final ReferenceNodeStore referenceNodeStore; + private final ReferenceNodeStoreFactory referenceNodeStore; OperationContext mirrorContext; @@ -367,15 +367,17 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement } private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) { - MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); if (logger.isTraceEnabled()) { - logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). Ref={}", nodeID, messageID, targetQueue.getName(), reference); + logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", nodeID, messageID, targetQueue.getName()); } + MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); + + if (reference == null) { if (logger.isDebugEnabled()) { - logger.debug("Retrying Reference not found on messageID={}, nodeID={}, currentRetry={}", messageID, nodeID, retry); + logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry); } switch (retry) { case 0: @@ -404,7 +406,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement if (reference != null) { if (logger.isTraceEnabled()) { - logger.trace("Post ack Server {} worked well for messageID={} nodeID={}", server, messageID, nodeID); + logger.trace("Post ack Server {} worked well for messageID={} nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(), targetQueue); } try { switch (reason) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java index d82560cace..a9ae71a273 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java @@ -20,20 +20,16 @@ import java.util.HashMap; import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; -import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; -import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; - public class ReferenceNodeStore implements NodeStore<MessageReference> { - private final String serverID; + private final ReferenceNodeStoreFactory factory; - public ReferenceNodeStore(ActiveMQServer server) { - this.serverID = server.getNodeID().toString(); + public ReferenceNodeStore(ReferenceNodeStoreFactory factory) { + this.factory = factory; } // This is where the messages are stored by server id... @@ -43,10 +39,6 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap; - public String getDefaultNodeID() { - return serverID; - } - @Override public void storeNode(MessageReference element, LinkedListImpl.Node<MessageReference> node) { String list = getServerID(element); @@ -90,7 +82,7 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { /** notice getMap should always return an instance. It should never return null. */ private synchronized LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String serverID) { if (serverID == null) { - serverID = this.serverID; // returning for the localList in case it's null + serverID = factory.getDefaultNodeID(); } if (lruListID != null && lruListID.equals(serverID)) { @@ -113,34 +105,15 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { } public String getServerID(MessageReference element) { - return getServerID(element.getMessage()); + return factory.getServerID(element); } - public String getServerID(Message message) { - Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); - if (nodeID != null) { - return nodeID.toString(); - } else { - // it is important to return null here, as the MirrorSource is expecting it to be null - // in the case the nodeID being from the originating server. - // don't be tempted to return this.serverID here. - return null; - } + return factory.getServerID(message); } public long getID(MessageReference element) { - Message message = element.getMessage(); - Long id = getID(message); - if (id == null) { - return element.getMessageID(); - } else { - return id; - } - } - - private Long getID(Message message) { - return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + return factory.getID(element); } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java new file mode 100644 index 0000000000..2782f85175 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.protocol.amqp.connect.mirror; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; + +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; +import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; + +public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> { + + final ActiveMQServer server; + + private final String serverID; + + public ReferenceNodeStoreFactory(ActiveMQServer server) { + this.server = server; + this.serverID = server.getNodeID().toString(); + + } + + @Override + public NodeStore<MessageReference> newNodeStore() { + return new ReferenceNodeStore(this); + } + + public String getDefaultNodeID() { + return serverID; + } + + public String getServerID(MessageReference element) { + return getServerID(element.getMessage()); + } + + + public String getServerID(Message message) { + Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); + if (nodeID != null) { + return nodeID.toString(); + } else { + // it is important to return null here, as the MirrorSource is expecting it to be null + // in the case the nodeID being from the originating server. + // don't be tempted to return this.serverID here. + return null; + } + } + + public long getID(MessageReference element) { + Message message = element.getMessage(); + Long id = getID(message); + if (id == null) { + return element.getMessageID(); + } else { + return id; + } + } + + private Long getID(Message message) { + return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); + } + + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 2c1319bcd4..e9042a9913 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalComponent; public interface Queue extends Bindable,CriticalComponent { @@ -77,7 +78,7 @@ public interface Queue extends Bindable,CriticalComponent { * If the idSupplier returns {@literal < 0} the ID is considered a non value (null) and it will be ignored. * * @see org.apache.activemq.artemis.utils.collections.LinkedList#setNodeStore(NodeStore) */ - MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore); + MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore); /** * The queue definition could be durable, but the messages could eventually be considered non durable. diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2ff193595b..9f948355d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -116,6 +116,7 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -219,9 +220,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private NodeStore<MessageReference> nodeStore; - private void checkIDSupplier(NodeStore<MessageReference> nodeStore) { - if (this.nodeStore != nodeStore) { - this.nodeStore = nodeStore; + private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory) { + if (this.nodeStore == null) { + this.nodeStore = nodeStoreFactory.newNodeStore(); messageReferences.setNodeStore(nodeStore); } } @@ -3457,7 +3458,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) { + public synchronized MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) { checkIDSupplier(nodeStore); MessageReference reference = messageReferences.removeWithID(serverID, id); if (reference != null) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index 90cd723c48..09c96e8745 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; -import org.apache.activemq.artemis.utils.collections.NodeStore; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.apache.activemq.artemis.utils.critical.CriticalCloseable; import org.junit.Assert; @@ -157,7 +157,7 @@ public class RoutingContextTest { } @Override - public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) { + public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) { return null; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 5e43510f8c..e4cd2acbe6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -56,8 +56,8 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.UUID; -import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; import org.slf4j.Logger; @@ -868,7 +868,7 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override - public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) { + public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) { return null; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java index 841c8ec1a6..66fafbc3ca 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java @@ -24,10 +24,16 @@ import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.Topic; +import java.lang.invoke.MethodHandles; import java.net.URI; import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -59,9 +65,13 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AMQPReplicaTest extends AmqpClientTestSupport { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final int AMQP_PORT_2 = 5673; protected static final int AMQP_PORT_3 = 5674; public static final int TIME_BEFORE_RESTART = 1000; @@ -834,6 +844,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { } public Queue locateQueue(ActiveMQServer server, String queueName) throws Exception { + Assert.assertNotNull(queueName); + Assert.assertNotNull(server); Wait.waitFor(() -> server.locateQueue(queueName) != null); return server.locateQueue(queueName); } @@ -1077,4 +1089,213 @@ public class AMQPReplicaTest extends AmqpClientTestSupport { conn.close(); } + private void consumeSubscription(int START_ID, + int LAST_ID, + int port, + String clientID, + String queueName, + String subscriptionName, + boolean assertNull) throws JMSException { + ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + port); + Connection conn = cf.createConnection(); + conn.setClientID(clientID); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + + HashSet<Integer> idsReceived = new HashSet<>(); + + Topic topic = sess.createTopic(queueName); + + MessageConsumer consumer = sess.createDurableConsumer(topic, subscriptionName); + for (int i = START_ID; i <= LAST_ID; i++) { + Message message = consumer.receive(3000); + Assert.assertNotNull(message); + Integer id = message.getIntProperty("i"); + Assert.assertNotNull(id); + Assert.assertTrue(idsReceived.add(id)); + } + + if (assertNull) { + Assert.assertNull(consumer.receiveNoWait()); + } + + for (int i = START_ID; i <= LAST_ID; i++) { + Assert.assertTrue(idsReceived.remove(i)); + } + + Assert.assertTrue(idsReceived.isEmpty()); + conn.close(); + } + + @Test + public void testMulticast() throws Exception { + multiCastReplicaTest(false, false, false, false, true); + } + + + @Test + public void testMulticastSerializeConsumption() throws Exception { + multiCastReplicaTest(false, false, false, false, false); + } + + @Test + public void testMulticastTargetPaging() throws Exception { + multiCastReplicaTest(false, true, false, false, true); + } + + @Test + public void testMulticastTargetSourcePaging() throws Exception { + multiCastReplicaTest(false, true, true, true, true); + } + + @Test + public void testMulticastTargetLargeMessage() throws Exception { + multiCastReplicaTest(true, true, true, true, true); + } + + + private void multiCastReplicaTest(boolean largeMessage, + boolean pagingTarget, + boolean pagingSource, + boolean restartBrokerConnection, boolean multiThreadConsumers) throws Exception { + + String brokerConnectionName = "brokerConnectionName:" + UUIDGenerator.getInstance().generateStringUUID(); + final ActiveMQServer server = this.server; + server.setIdentity("targetServer"); + + server_2 = createServer(AMQP_PORT_2, false); + server_2.setIdentity("server_2"); + server_2.getConfiguration().setName("thisone"); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100); + AMQPMirrorBrokerConnectionElement replica = new AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true); + replica.setName("theReplica"); + amqpConnection.addElement(replica); + server_2.getConfiguration().addAMQPConnection(amqpConnection); + server_2.getConfiguration().setName("server_2"); + + int NUMBER_OF_MESSAGES = 200; + + server_2.start(); + server.start(); + Wait.assertTrue(server_2::isStarted); + Wait.assertTrue(server::isStarted); + + // We create the address to avoid auto delete on the queue + server_2.addAddressInfo(new AddressInfo(getTopicName()).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false)); + + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getTopicName()); + MessageProducer producer = session.createProducer(topic); + + for (int i = 0; i <= 1; i++) { + // just creating the subscription and not consuming anything + consumeSubscription(0, -1, AMQP_PORT_2, "client" + i, getTopicName(), "subscription" + i, false); + } + + String subs0Name = "client0.subscription0"; + String subs1Name = "client1.subscription1"; + + + Queue subs0Server1 = locateQueue(server, subs0Name); + Queue subs1Server1 = locateQueue(server, subs1Name); + Assert.assertNotNull(subs0Server1); + Assert.assertNotNull(subs1Server1); + + Queue subs0Server2 = locateQueue(server_2, subs0Name); + Queue subs1Server2 = locateQueue(server_2, subs1Name); + Assert.assertNotNull(subs0Server2); + Assert.assertNotNull(subs1Server2); + + if (pagingTarget) { + subs0Server1.getPagingStore().startPaging(); + } + + if (pagingSource) { + subs0Server2.getPagingStore().startPaging(); + } + + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + Message message = session.createTextMessage(getText(largeMessage, i)); + message.setIntProperty("i", i); + producer.send(message); + } + + if (pagingTarget) { + subs0Server1.getPagingStore().startPaging(); + } + + Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF()); + + Assert.assertNotNull(snfreplica); + + Wait.assertEquals(0, snfreplica::getMessageCount); + + Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server1::getMessageCount, 2000); + Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server1::getMessageCount, 2000); + + Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server2::getMessageCount, 2000); + Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server2::getMessageCount, 2000); + + if (restartBrokerConnection) { + // stop and start the broker connection, making sure we wouldn't duplicate the mirror + server_2.stopBrokerConnection(brokerConnectionName); + Thread.sleep(1000); + server_2.startBrokerConnection(brokerConnectionName); + } + + Assert.assertSame(snfreplica, server_2.locateQueue(replica.getMirrorSNF())); + + if (pagingTarget) { + assertTrue(subs0Server1.getPagingStore().isPaging()); + assertTrue(subs1Server1.getPagingStore().isPaging()); + } + + ExecutorService executorService = Executors.newFixedThreadPool(2); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(2); + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i <= 1; i++) { + CountDownLatch threadDone = new CountDownLatch(1); + int subscriptionID = i; + executorService.execute(() -> { + try { + consumeSubscription(0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, "client" + subscriptionID, getTopicName(), "subscription" + subscriptionID, false); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + threadDone.countDown(); + } + }); + if (!multiThreadConsumers) { + threadDone.await(1, TimeUnit.MINUTES); + } + } + + Assert.assertTrue(done.await(60, TimeUnit.SECONDS)); + Assert.assertEquals(0, errors.get()); + + // Replica is async, so we need to wait acks to arrive before we finish consuming there + Wait.assertEquals(0, snfreplica::getMessageCount); + Wait.assertEquals(0L, subs0Server1::getMessageCount, 2000, 100); + Wait.assertEquals(0L, subs1Server1::getMessageCount, 2000, 100); + Wait.assertEquals(0L, subs0Server2::getMessageCount, 2000, 100); + Wait.assertEquals(0L, subs1Server2::getMessageCount, 2000, 100); + + + if (largeMessage) { + validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(), 0); + validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(), 0); + } + } + + + } diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 79990fa897..680fc2497a 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -1366,6 +1366,38 @@ </args> </configuration> </execution> + <execution> + <phase>test-compile</phase> + <id>create-test-Mirror1</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <role>amq</role> + <user>admin</user> + <password>admin</password> + <allowAnonymous>true</allowAnonymous> + <noWeb>true</noWeb> + <instance>${basedir}/target/mirrored-subscriptions/broker1</instance> + <configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker1</configuration> + </configuration> + </execution> + <execution> + <phase>test-compile</phase> + <id>create-test-Mirror2</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <role>amq</role> + <user>admin</user> + <password>admin</password> + <allowAnonymous>true</allowAnonymous> + <noWeb>true</noWeb> + <instance>${basedir}/target/mirrored-subscriptions/broker2</instance> + <configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker2</configuration> + </configuration> + </execution> </executions> <dependencies> <dependency> diff --git a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml new file mode 100644 index 0000000000..763558fec9 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml @@ -0,0 +1,226 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq:core "> + + <name>0.0.0.0</name> + + <persistence-enabled>true</persistence-enabled> + + <!-- this could be ASYNCIO or NIO + --> + <journal-type>NIO</journal-type> + + <paging-directory>./data/paging</paging-directory> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/large-messages</large-messages-directory> + + <journal-datasync>true</journal-datasync> + + <journal-min-files>2</journal-min-files> + + <journal-pool-files>-1</journal-pool-files> + + <message-expiry-scan-period>1000</message-expiry-scan-period> + + <security-enabled>false</security-enabled> + + <!-- + You can verify the network health of a particular NIC by specifying the <network-check-NIC> element. + <network-check-NIC>theNicName</network-check-NIC> + --> + + <!-- + Use this to use an HTTP server to validate the network + <network-check-URL-list>http://www.apache.org</network-check-URL-list> --> + + <!-- <network-check-period>10000</network-check-period> --> + <!-- <network-check-timeout>1000</network-check-timeout> --> + + <!-- this is a comma separated list, no spaces, just DNS or IPs + it should accept IPV6 + + Warning: Make sure you understand your network topology as this is meant to validate if your network is valid. + Using IPs that could eventually disappear or be partially visible may defeat the purpose. + You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running --> + <!-- <network-check-list>10.0.0.1</network-check-list> --> + + <!-- use this to customize the ping used for ipv4 addresses --> + <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> --> + + <!-- use this to customize the ping used for ipv6 addresses --> + <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> --> + + + + + <!-- how often we are looking for how many bytes are being used on the disk in ms --> + <disk-scan-period>5000</disk-scan-period> + + <!-- once the disk hits this limit the system will block, or close the connection in certain protocols + that won't support flow control. --> + <max-disk-usage>90</max-disk-usage> + + <!-- the system will enter into page mode once you hit this limit. + This is an estimate in bytes of how much the messages are using in memory + + The system will use half of the available memory (-Xmx) by default for the global-max-size. + You may specify a different value here if you need to customize it to your needs. + + <global-max-size>100Mb</global-max-size> + + --> + + <acceptors> + + <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it --> + <!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it --> + <!-- amqpCredits: The number of credits sent to AMQP producers --> + <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark --> + + <!-- Acceptor for every supported protocol --> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor> + </acceptors> + + <broker-connections> + <amqp-connection uri="tcp://localhost:61617" name="mirror" retry-interval="100"> + <mirror/> + </amqp-connection> + </broker-connections> + + <security-settings> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="guest"/> + <permission type="deleteNonDurableQueue" roles="guest"/> + <permission type="createDurableQueue" roles="guest"/> + <permission type="deleteDurableQueue" roles="guest"/> + <permission type="createAddress" roles="guest"/> + <permission type="deleteAddress" roles="guest"/> + <permission type="consume" roles="guest"/> + <permission type="browse" roles="guest"/> + <permission type="send" roles="guest"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="guest"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + <address-setting match="myQueue"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <default-max-consumers>1</default-max-consumers> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + + <address-setting match="myTopic"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <default-max-consumers>1</default-max-consumers> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + + <address-setting match="myTopicPaging"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>100K</max-size-bytes> + <default-max-consumers>1</default-max-consumers> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + </address-settings> + + <addresses> + <address name="DLQ"> + <anycast> + <queue name="DLQ"/> + </anycast> + </address> + <address name="ExpiryQueue"> + <anycast> + <queue name="ExpiryQueue"/> + </anycast> + </address> + <address name="myQueue"> + <anycast> + <!-- this should be maxed from the default --> + <queue name="myQueue"> + </queue> + </anycast> + </address> + <address name="myTopic"> + <multicast/> + </address> + <address name="myTopicPaging"> + <multicast/> + </address> + + </addresses> + + </core> +</configuration> diff --git a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml new file mode 100644 index 0000000000..33c8a7ca3a --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml @@ -0,0 +1,220 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq:core "> + + <name>0.0.0.0</name> + + <persistence-enabled>true</persistence-enabled> + + <!-- this could be ASYNCIO or NIO + --> + <journal-type>NIO</journal-type> + + <paging-directory>./data/paging</paging-directory> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/large-messages</large-messages-directory> + + <journal-datasync>true</journal-datasync> + + <journal-min-files>2</journal-min-files> + + <journal-pool-files>-1</journal-pool-files> + + <message-expiry-scan-period>1000</message-expiry-scan-period> + + <security-enabled>false</security-enabled> + + <!-- + You can verify the network health of a particular NIC by specifying the <network-check-NIC> element. + <network-check-NIC>theNicName</network-check-NIC> + --> + + <!-- + Use this to use an HTTP server to validate the network + <network-check-URL-list>http://www.apache.org</network-check-URL-list> --> + + <!-- <network-check-period>10000</network-check-period> --> + <!-- <network-check-timeout>1000</network-check-timeout> --> + + <!-- this is a comma separated list, no spaces, just DNS or IPs + it should accept IPV6 + + Warning: Make sure you understand your network topology as this is meant to validate if your network is valid. + Using IPs that could eventually disappear or be partially visible may defeat the purpose. + You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running --> + <!-- <network-check-list>10.0.0.1</network-check-list> --> + + <!-- use this to customize the ping used for ipv4 addresses --> + <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> --> + + <!-- use this to customize the ping used for ipv6 addresses --> + <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> --> + + + <!-- how often we are looking for how many bytes are being used on the disk in ms --> + <disk-scan-period>5000</disk-scan-period> + + <!-- once the disk hits this limit the system will block, or close the connection in certain protocols + that won't support flow control. --> + <max-disk-usage>90</max-disk-usage> + + <!-- the system will enter into page mode once you hit this limit. + This is an estimate in bytes of how much the messages are using in memory + + The system will use half of the available memory (-Xmx) by default for the global-max-size. + You may specify a different value here if you need to customize it to your needs. + + <global-max-size>100Mb</global-max-size> + + --> + + <acceptors> + + <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it --> + <!-- useKQueue means: it will use Netty kqueue if you are on a system (MacOS) that supports it --> + <!-- amqpCredits: The number of credits sent to AMQP producers --> + <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark --> + + <!-- Acceptor for every supported protocol --> + <acceptor name="artemis"> + tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300 + </acceptor> + </acceptors> + + <security-settings> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="guest"/> + <permission type="deleteNonDurableQueue" roles="guest"/> + <permission type="createDurableQueue" roles="guest"/> + <permission type="deleteDurableQueue" roles="guest"/> + <permission type="createAddress" roles="guest"/> + <permission type="deleteAddress" roles="guest"/> + <permission type="consume" roles="guest"/> + <permission type="browse" roles="guest"/> + <permission type="send" roles="guest"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="guest"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + <address-setting match="myQueue"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <default-max-consumers>1</default-max-consumers> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + + <address-setting match="myTopic"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <default-max-consumers>1</default-max-consumers> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + + <address-setting match="myTopicPaging"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>100K</max-size-bytes> + <default-max-consumers>1</default-max-consumers> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + </address-settings> + + <addresses> + <address name="DLQ"> + <anycast> + <queue name="DLQ"/> + </anycast> + </address> + <address name="ExpiryQueue"> + <anycast> + <queue name="ExpiryQueue"/> + </anycast> + </address> + <address name="myQueue"> + <anycast> + <!-- this should be maxed from the default --> + <queue name="myQueue"> + </queue> + </anycast> + </address> + <address name="myTopic"> + <multicast/> + </address> + <address name="myTopicPaging"> + <multicast/> + </address> + + </addresses> + + </core> +</configuration> diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java new file mode 100644 index 0000000000..69242281a9 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <br> + * http://www.apache.org/licenses/LICENSE-2.0 + * <br> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.brokerConnection; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.smoke.common.SimpleManagement; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.util.ServerUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MirroredSubscriptionTest extends SmokeTestBase { + + public static final String SERVER_NAME_A = "mirrored-subscriptions/broker1"; + public static final String SERVER_NAME_B = "mirrored-subscriptions/broker2"; + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + // Change this to true to generate a print-data in certain cases on this test + private static final boolean PRINT_DATA = false; + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT = 11099; + + Process processB; + Process processA; + + @Before + public void beforeClass() throws Exception { + cleanupData(SERVER_NAME_A); + cleanupData(SERVER_NAME_B); + processB = startServer(SERVER_NAME_B, 1, 0); + processA = startServer(SERVER_NAME_A, 0, 0); + + ServerUtil.waitForServerToStart(1, "B", "B", 30000); + ServerUtil.waitForServerToStart(0, "A", "A", 30000); + } + + @Test + public void testSend() throws Throwable { + + int COMMIT_INTERVAL = 100; + int NUMBER_OF_MESSAGES = 500; + int CLIENTS = 2; + String mainURI = "tcp://localhost:61616"; + String secondURI = "tcp://localhost:61617"; + + String topicName = "myTopic"; + + ConnectionFactory cf = CFUtil.createConnectionFactory("amqp", mainURI); + + for (int i = 0; i < CLIENTS; i++) { + try (Connection connection = cf.createConnection()) { + connection.setClientID("client" + i); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + session.createDurableSubscriber(topic, "subscription" + i); + } + } + + try (Connection connection = cf.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("hello " + i)); + if (i % COMMIT_INTERVAL == 0) { + session.commit(); + } + } + session.commit(); + } + + Map<String, Integer> result = SimpleManagement.listQueues(mainURI, null, null, 100); + result.entrySet().forEach(entry -> System.out.println("Queue " + entry.getKey() + "=" + entry.getValue())); + + checkMessages(NUMBER_OF_MESSAGES, CLIENTS, mainURI, secondURI); + + ExecutorService executorService = Executors.newFixedThreadPool(CLIENTS); + runAfter(executorService::shutdownNow); + + CountDownLatch done = new CountDownLatch(CLIENTS); + AtomicInteger errors = new AtomicInteger(0); + + for (int i = 0; i < CLIENTS; i++) { + final int clientID = i; + executorService.execute(() -> { + try (Connection connection = cf.createConnection()) { + connection.setClientID("client" + clientID); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Topic topic = session.createTopic(topicName); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subscription" + clientID); + for (int messageI = 0; messageI < NUMBER_OF_MESSAGES; messageI++) { + TextMessage message = (TextMessage) subscriber.receive(5000); + Assert.assertNotNull(message); + if (messageI % COMMIT_INTERVAL == 0) { + session.commit(); + } + } + session.commit(); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + done.countDown(); + } + }); + } + + Assert.assertTrue(done.await(300, TimeUnit.SECONDS)); + Assert.assertEquals(0, errors.get()); + checkMessages(0, CLIENTS, mainURI, secondURI); + } + + private void checkMessages(int NUMBER_OF_MESSAGES, int CLIENTS, String mainURI, String secondURI) throws Exception { + for (int i = 0; i < CLIENTS; i++) { + final int clientID = i; + Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, "client" + clientID + ".subscription" + clientID)); + Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(secondURI, "client" + clientID + ".subscription" + clientID)); + } + } + + int getMessageCount(String uri, String queueName) throws Exception { + Map<String, Integer> result = SimpleManagement.listQueues(uri, null, null, 100); + Integer resultReturn = result.get(queueName); + + logger.debug("Result = {}, queueName={}, returnValue = {}", result, queueName, resultReturn); + return resultReturn == null ? 0 : resultReturn; + + } + +} diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java new file mode 100644 index 0000000000..5b9a34b2df --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.common; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.json.JsonArray; +import org.apache.activemq.artemis.json.JsonObject; + +public class SimpleManagement { + + private static final String SIMPLE_OPTIONS = "{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}"; + + /** Simple management function that will return a list of Pair<Name of Queue, Number of Messages> */ + public static Map<String, Integer> listQueues(String uri, String user, String password, int maxRows) throws Exception { + Map<String, Integer> queues = new HashMap<>(); + ManagementHelper.doManagement(uri, user, password, t -> setupListQueue(t, maxRows), t -> listQueueResult(t, queues), SimpleManagement::failed); + return queues; + } + + private static void setupListQueue(ClientMessage m, int maxRows) throws Exception { + ManagementHelper.putOperationInvocation(m, "broker", "listQueues", SIMPLE_OPTIONS, 1, maxRows); + } + + private static void listQueueResult(ClientMessage message, Map<String, Integer> mapQueues) throws Exception { + + final String result = (String) ManagementHelper.getResult(message, String.class); + + + JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(result); + JsonArray array = queuesAsJsonObject.getJsonArray("data"); + + for (int i = 0; i < array.size(); i++) { + JsonObject object = array.getJsonObject(i); + String name = object.getString("name"); + String messageCount = object.getString("messageCount"); + mapQueues.put(name, Integer.parseInt(messageCount)); + } + + } + + private static void failed(ClientMessage message) throws Exception { + + final String result = (String) ManagementHelper.getResult(message, String.class); + + throw new Exception("Failed " + result); + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index bb779ac2a0..cd30f4b054 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -40,8 +40,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AckReason; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.ReferenceCounter; -import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl; import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer; @@ -143,7 +143,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } @Override - public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) { + public MessageReference removeWithSuppliedID(String serverID, long id, NodeStoreFactory<MessageReference> nodeStore) { return null; }