This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 29deb30c73 ARTEMIS-4366 Some Adjustment to class names on Mirror 29deb30c73 is described below commit 29deb30c738ea765fbed738aa3e36517c3c89ca4 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Tue Jul 18 09:30:45 2023 -0400 ARTEMIS-4366 Some Adjustment to class names on Mirror This is in relation to comments from PR #4555 --- .../protocol/amqp/broker/ProtonProtocolManager.java | 10 ++++------ .../connect/mirror/AMQPMirrorControllerSource.java | 6 +++--- .../connect/mirror/AMQPMirrorControllerTarget.java | 3 +-- ...eNodeStoreFactory.java => ReferenceIDSupplier.java} | 16 +++++++++------- .../amqp/connect/mirror/ReferenceNodeStore.java | 18 ++++++++---------- 5 files changed, 25 insertions(+), 28 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 9355bbdff1..c64d287dff 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; -import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory; +import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler; @@ -73,9 +73,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, private final ActiveMQServer server; - // We must use one referenceIDSupplier per server. - // protocol manager is the perfect aggregation for that. - private ReferenceNodeStoreFactory referenceIDSupplier; + private ReferenceIDSupplier referenceIDSupplier; private final ProtonProtocolManagerFactory factory; @@ -125,11 +123,11 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, routingHandler = new AMQPRoutingHandler(server); } - public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() { + public synchronized ReferenceIDSupplier getReferenceIDSupplier() { if (referenceIDSupplier == null) { // we lazy start the instance. // only create it when needed - referenceIDSupplier = new ReferenceNodeStoreFactory(server); + referenceIDSupplier = new ReferenceIDSupplier(server); } return referenceIDSupplier; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java index 6d943bf2d6..144ec59a39 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java @@ -92,7 +92,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im final Queue snfQueue; final ActiveMQServer server; - final ReferenceNodeStoreFactory idSupplier; + final ReferenceIDSupplier idSupplier; final boolean acks; final boolean addQueues; final boolean deleteQueues; @@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im } } - public static void validateProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { + public static void validateProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref, SimpleString snfAddress) { if (ref.getProtocolData(DeliveryAnnotations.class) == null && !ref.getMessage().getAddressSimpleString().equals(snfAddress)) { setProtocolData(referenceIDSupplier, ref); } } /** This method will return the brokerID used by the message */ - private static String setProtocolData(ReferenceNodeStoreFactory referenceIDSupplier, MessageReference ref) { + private static String setProtocolData(ReferenceIDSupplier referenceIDSupplier, MessageReference ref) { String brokerID = referenceIDSupplier.getServerID(ref); long id = referenceIDSupplier.getID(ref); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java index fa168005a0..47415cc049 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java @@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement DuplicateIDCache lruduplicateIDCache; String lruDuplicateIDKey; - private final ReferenceNodeStoreFactory referenceNodeStore; + private final ReferenceIDSupplier referenceNodeStore; OperationContext mirrorContext; @@ -374,7 +374,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore); - if (reference == null) { if (logger.isDebugEnabled()) { logger.debug("Retrying Reference not found on messageID={}, nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java similarity index 87% rename from artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java rename to artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java index 2782f85175..6ae967c710 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceIDSupplier.java @@ -26,18 +26,23 @@ import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY; import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY; -public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageReference> { +/** + * Since Artemis 2.30.0 this is supplying a new NodeStore per queue. + * It is also parsing MessageReference and Message for the proper ID for the messages. + * @since 2.30.0 + */ +public class ReferenceIDSupplier implements NodeStoreFactory<MessageReference> { final ActiveMQServer server; private final String serverID; - public ReferenceNodeStoreFactory(ActiveMQServer server) { + public ReferenceIDSupplier(ActiveMQServer server) { this.server = server; this.serverID = server.getNodeID().toString(); - } + /** This will return the NodeStore that will be used by the Queue. */ @Override public NodeStore<MessageReference> newNodeStore() { return new ReferenceNodeStore(this); @@ -51,7 +56,6 @@ public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageRefere return getServerID(element.getMessage()); } - public String getServerID(Message message) { Object nodeID = message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY); if (nodeID != null) { @@ -77,6 +81,4 @@ public class ReferenceNodeStoreFactory implements NodeStoreFactory<MessageRefere private Long getID(Message message) { return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY); } - - -} +} \ No newline at end of file diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java index a9ae71a273..485051e8aa 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java @@ -26,10 +26,10 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl; public class ReferenceNodeStore implements NodeStore<MessageReference> { - private final ReferenceNodeStoreFactory factory; + private final ReferenceIDSupplier idSupplier; - public ReferenceNodeStore(ReferenceNodeStoreFactory factory) { - this.factory = factory; + public ReferenceNodeStore(ReferenceIDSupplier idSupplier) { + this.idSupplier = idSupplier; } // This is where the messages are stored by server id... @@ -67,7 +67,6 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { } } - @Override public LinkedListImpl.Node<MessageReference> getNode(String serverID, long id) { LongObjectHashMap<LinkedListImpl.Node<MessageReference>> nodeMap = getMap(serverID); @@ -82,7 +81,7 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { /** notice getMap should always return an instance. It should never return null. */ private synchronized LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String serverID) { if (serverID == null) { - serverID = factory.getDefaultNodeID(); + serverID = idSupplier.getDefaultNodeID(); } if (lruListID != null && lruListID.equals(serverID)) { @@ -105,15 +104,15 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { } public String getServerID(MessageReference element) { - return factory.getServerID(element); + return idSupplier.getServerID(element); } public String getServerID(Message message) { - return factory.getServerID(message); + return idSupplier.getServerID(message); } public long getID(MessageReference element) { - return factory.getID(element); + return idSupplier.getID(element); } @Override @@ -132,5 +131,4 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> { } return size; } - -} +} \ No newline at end of file