This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 91debf25db ARTEMIS-4418 use consumer delivery sequence in messageId for openwire broker sequence id, makes delivery count calculation independent of message order 91debf25db is described below commit 91debf25dbf0f3924d511ee615c6f9b0545e2a5f Author: Gary Tully <gary.tu...@gmail.com> AuthorDate: Wed Sep 6 17:51:47 2023 +0100 ARTEMIS-4418 use consumer delivery sequence in messageId for openwire broker sequence id, makes delivery count calculation independent of message order --- .../openwire/OpenWireMessageConverter.java | 9 +++++---- .../core/protocol/openwire/amq/AMQConsumer.java | 6 ++++-- .../openwire/OpenWireMessageConverterTest.java | 22 +++++++++++----------- .../core/server/impl/ServerConsumerImpl.java | 1 + .../PrefetchRedeliveryCountOpenwireTest.java | 14 ++++++++++++-- 5 files changed, 33 insertions(+), 19 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 915809d377..e6d69757d3 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -481,15 +481,16 @@ public final class OpenWireMessageConverter { public static MessageDispatch createMessageDispatch(MessageReference reference, ICoreMessage message, WireFormat marshaller, - AMQConsumer consumer, UUID serverNodeUUID) throws IOException { + AMQConsumer consumer, + UUID serverNodeUUID, + long consumerDeliverySequenceId) throws IOException { ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer, serverNodeUUID); - //we can use core message id for sequenceId - amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID()); + amqMessage.getMessageId().setBrokerSequenceId(consumerDeliverySequenceId); MessageDispatch md = new MessageDispatch(); md.setConsumerId(consumer.getId()); md.setRedeliveryCounter(reference.getDeliveryCount() - 1); - md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId()); + md.setDeliverySequenceId(consumerDeliverySequenceId); md.setMessage(amqMessage); ActiveMQDestination destination = amqMessage.getDestination(); md.setDestination(destination); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 3165b14faf..38a249e04c 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -81,6 +82,7 @@ public class AMQConsumer { private boolean internalAddress = false; private volatile Set<MessageReference> rolledbackMessageRefs; private ScheduledFuture<?> delayedDispatchPrompter; + private AtomicLong deliveredSequenceId = new AtomicLong(0); public AMQConsumer(AMQSession amqSession, org.apache.activemq.command.ActiveMQDestination d, @@ -292,7 +294,7 @@ public class AMQConsumer { message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME); } //handleDeliver is performed by an executor (see JBPAPP-6030): any AMQConsumer can share the session.wireFormat() - dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID()); + dispatch = OpenWireMessageConverter.createMessageDispatch(reference, message, session.wireFormat(), this, session.getCoreServer().getNodeManager().getUUID(), deliveredSequenceId.getAndIncrement()); int size = dispatch.getMessage().getSize(); reference.setProtocolData(MessageId.class, dispatch.getMessage().getMessageId()); session.deliverMessage(dispatch); @@ -446,7 +448,7 @@ public class AMQConsumer { // treat as delivered return true; } - if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) { + if (ref.getProtocolData(MessageId.class).getBrokerSequenceId() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) { // treat as delivered return true; } diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index 92a1bfb3c1..0580f5cad0 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -73,7 +73,7 @@ public class OpenWireMessageConverterTest { AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID, i); MessageId messageId = dispatch.getMessage().getMessageId(); assertFalse(mqMessageAuditNoSync.isDuplicate(messageId)); @@ -95,7 +95,7 @@ public class OpenWireMessageConverterTest { AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch dispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, msg, openWireFormat, amqConsumer, nodeUUID, i); MessageId messageId = dispatch.getMessage().getMessageId(); assertFalse(mqMessageAuditNoSync.isDuplicate(messageId)); @@ -114,7 +114,7 @@ public class OpenWireMessageConverterTest { AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) instanceof String); } @@ -130,7 +130,7 @@ public class OpenWireMessageConverterTest { AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch marshalled = (MessageDispatch) openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID))); + MessageDispatch marshalled = (MessageDispatch) openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0))); assertEquals(5, marshalled.getMessage().getProperties().keySet().size()); Message converted = OpenWireMessageConverter.inbound(marshalled.getMessage(), openWireFormat, null); for (int i = 0; i < 5; i++) { @@ -161,7 +161,7 @@ public class OpenWireMessageConverterTest { MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertEquals(PRODUCER_ID, classicMessageDispatch.getMessage().getProducerId().toString()); } @@ -178,7 +178,7 @@ public class OpenWireMessageConverterTest { MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertEquals(PRODUCER_ID, messageDispatch.getMessage().getProducerId().toString()); } @@ -194,7 +194,7 @@ public class OpenWireMessageConverterTest { MessageReference messageReference = new MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch classicMessageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertEquals(MESSAGE_ID, classicMessageDispatch.getMessage().getMessageId().toString()); } @@ -211,7 +211,7 @@ public class OpenWireMessageConverterTest { MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertEquals(MESSAGE_ID, messageDispatch.getMessage().getMessageId().toString()); } @@ -228,7 +228,7 @@ public class OpenWireMessageConverterTest { MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertEquals("queue://" + DESTINATION, messageDispatch.getMessage().getOriginalDestination().toString()); } @@ -245,7 +245,7 @@ public class OpenWireMessageConverterTest { MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertEquals("queue://" + DESTINATION, messageDispatch.getMessage().getReplyTo().toString()); } @@ -269,7 +269,7 @@ public class OpenWireMessageConverterTest { AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); - MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); assertNull(messageDispatch.getMessage().getProperty(hdrArrival)); assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index fa389b0912..c9d1215ae1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -656,6 +656,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { browserDeliverer.close(); } else { messageQueue.removeConsumer(this); + messageQueue.deliverAsync(); } session.removeConsumer(id); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java index b5dab5f344..9d086a1f54 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java @@ -55,11 +55,20 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { } @Test(timeout = 60_000) - public void testConsumerSingleMessageLoop() throws Exception { + public void testConsumerSingleMessageLoopExclusive() throws Exception { + doTestConsumerSingleMessageLoop(true); + } + + @Test(timeout = 60_000) + public void testConsumerSingleMessageLoopNonExclusive() throws Exception { + doTestConsumerSingleMessageLoop(false); + } + + public void doTestConsumerSingleMessageLoop(boolean exclusive) throws Exception { Connection exConn = null; SimpleString durableQueue = new SimpleString("exampleQueue"); - this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true)); + this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(exclusive)); try { ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); @@ -157,6 +166,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { } session.commit(); + // force a local socket close such that the broker sees an exception on the connection and fails the consumer via close ((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop(); exConn.close(); }