This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 91debf25db ARTEMIS-4418 use consumer delivery sequence in messageId 
for openwire broker sequence id, makes delivery count calculation independent 
of message order
91debf25db is described below

commit 91debf25dbf0f3924d511ee615c6f9b0545e2a5f
Author: Gary Tully <gary.tu...@gmail.com>
AuthorDate: Wed Sep 6 17:51:47 2023 +0100

    ARTEMIS-4418 use consumer delivery sequence in messageId for openwire 
broker sequence id, makes delivery count calculation independent of message 
order
---
 .../openwire/OpenWireMessageConverter.java         |  9 +++++----
 .../core/protocol/openwire/amq/AMQConsumer.java    |  6 ++++--
 .../openwire/OpenWireMessageConverterTest.java     | 22 +++++++++++-----------
 .../core/server/impl/ServerConsumerImpl.java       |  1 +
 .../PrefetchRedeliveryCountOpenwireTest.java       | 14 ++++++++++++--
 5 files changed, 33 insertions(+), 19 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 915809d377..e6d69757d3 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -481,15 +481,16 @@ public final class OpenWireMessageConverter {
    public static MessageDispatch createMessageDispatch(MessageReference 
reference,
                                                        ICoreMessage message,
                                                        WireFormat marshaller,
-                                                       AMQConsumer consumer, 
UUID serverNodeUUID) throws IOException {
+                                                       AMQConsumer consumer,
+                                                       UUID serverNodeUUID,
+                                                       long 
consumerDeliverySequenceId) throws IOException {
       ActiveMQMessage amqMessage = toAMQMessage(reference, message, 
marshaller, consumer, serverNodeUUID);
 
-      //we can use core message id for sequenceId
-      amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
+      
amqMessage.getMessageId().setBrokerSequenceId(consumerDeliverySequenceId);
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());
       md.setRedeliveryCounter(reference.getDeliveryCount() - 1);
-      
md.setDeliverySequenceId(amqMessage.getMessageId().getBrokerSequenceId());
+      md.setDeliverySequenceId(consumerDeliverySequenceId);
       md.setMessage(amqMessage);
       ActiveMQDestination destination = amqMessage.getDestination();
       md.setDestination(destination);
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3165b14faf..38a249e04c 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
@@ -81,6 +82,7 @@ public class AMQConsumer {
    private boolean internalAddress = false;
    private volatile Set<MessageReference> rolledbackMessageRefs;
    private ScheduledFuture<?> delayedDispatchPrompter;
+   private AtomicLong deliveredSequenceId = new AtomicLong(0);
 
    public AMQConsumer(AMQSession amqSession,
                       org.apache.activemq.command.ActiveMQDestination d,
@@ -292,7 +294,7 @@ public class AMQConsumer {
             message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
          }
          //handleDeliver is performed by an executor (see JBPAPP-6030): any 
AMQConsumer can share the session.wireFormat()
-         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, 
message, session.wireFormat(), this, 
session.getCoreServer().getNodeManager().getUUID());
+         dispatch = OpenWireMessageConverter.createMessageDispatch(reference, 
message, session.wireFormat(), this, 
session.getCoreServer().getNodeManager().getUUID(), 
deliveredSequenceId.getAndIncrement());
          int size = dispatch.getMessage().getSize();
          reference.setProtocolData(MessageId.class, 
dispatch.getMessage().getMessageId());
          session.deliverMessage(dispatch);
@@ -446,7 +448,7 @@ public class AMQConsumer {
          // treat as delivered
          return true;
       }
-      if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && 
!isRolledBack(ref)) {
+      if (ref.getProtocolData(MessageId.class).getBrokerSequenceId() <= 
info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
          // treat as delivered
          return true;
       }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index 92a1bfb3c1..0580f5cad0 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -73,7 +73,7 @@ public class OpenWireMessageConverterTest {
          AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
          
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
 
-         MessageDispatch dispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, msg, 
openWireFormat, amqConsumer, nodeUUID);
+         MessageDispatch dispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, msg, 
openWireFormat, amqConsumer, nodeUUID, i);
 
          MessageId messageId = dispatch.getMessage().getMessageId();
          assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
@@ -95,7 +95,7 @@ public class OpenWireMessageConverterTest {
          AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
          
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
 
-         MessageDispatch dispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, msg, 
openWireFormat, amqConsumer, nodeUUID);
+         MessageDispatch dispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, msg, 
openWireFormat, amqConsumer, nodeUUID, i);
 
          MessageId messageId = dispatch.getMessage().getMessageId();
          assertFalse(mqMessageAuditNoSync.isDuplicate(messageId));
@@ -114,7 +114,7 @@ public class OpenWireMessageConverterTest {
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
 
-      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
 
       assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) 
instanceof String);
    }
@@ -130,7 +130,7 @@ public class OpenWireMessageConverterTest {
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
 
-      MessageDispatch marshalled = (MessageDispatch) 
openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference,
 coreMessage, openWireFormat, amqConsumer, nodeUUID)));
+      MessageDispatch marshalled = (MessageDispatch) 
openWireFormat.unmarshal(openWireFormat.marshal(OpenWireMessageConverter.createMessageDispatch(messageReference,
 coreMessage, openWireFormat, amqConsumer, nodeUUID, 0)));
       assertEquals(5, marshalled.getMessage().getProperties().keySet().size());
       Message converted = 
OpenWireMessageConverter.inbound(marshalled.getMessage(), openWireFormat, null);
       for (int i = 0; i < 5; i++) {
@@ -161,7 +161,7 @@ public class OpenWireMessageConverterTest {
       MessageReference messageReference = new 
MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
-      MessageDispatch classicMessageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) 
artemisMessage, openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch classicMessageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) 
artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0);
       assertEquals(PRODUCER_ID, 
classicMessageDispatch.getMessage().getProducerId().toString());
    }
 
@@ -178,7 +178,7 @@ public class OpenWireMessageConverterTest {
       MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
-      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
       assertEquals(PRODUCER_ID, 
messageDispatch.getMessage().getProducerId().toString());
    }
 
@@ -194,7 +194,7 @@ public class OpenWireMessageConverterTest {
       MessageReference messageReference = new 
MessageReferenceImpl(artemisMessage, Mockito.mock(Queue.class));
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
-      MessageDispatch classicMessageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) 
artemisMessage, openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch classicMessageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, (ICoreMessage) 
artemisMessage, openWireFormat, amqConsumer, nodeUUID, 0);
       assertEquals(MESSAGE_ID, 
classicMessageDispatch.getMessage().getMessageId().toString());
    }
 
@@ -211,7 +211,7 @@ public class OpenWireMessageConverterTest {
       MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
-      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
       assertEquals(MESSAGE_ID, 
messageDispatch.getMessage().getMessageId().toString());
    }
 
@@ -228,7 +228,7 @@ public class OpenWireMessageConverterTest {
       MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
-      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
       assertEquals("queue://" + DESTINATION, 
messageDispatch.getMessage().getOriginalDestination().toString());
    }
 
@@ -245,7 +245,7 @@ public class OpenWireMessageConverterTest {
       MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
-      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
       assertEquals("queue://" + DESTINATION, 
messageDispatch.getMessage().getReplyTo().toString());
    }
 
@@ -269,7 +269,7 @@ public class OpenWireMessageConverterTest {
       AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
       
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
 
-      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID, 0);
 
       assertNull(messageDispatch.getMessage().getProperty(hdrArrival));
       assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime));
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index fa389b0912..c9d1215ae1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -656,6 +656,7 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
          browserDeliverer.close();
       } else {
          messageQueue.removeConsumer(this);
+         messageQueue.deliverAsync();
       }
 
       session.removeConsumer(id);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index b5dab5f344..9d086a1f54 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -55,11 +55,20 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
    }
 
    @Test(timeout = 60_000)
-   public void testConsumerSingleMessageLoop() throws Exception {
+   public void testConsumerSingleMessageLoopExclusive() throws Exception {
+      doTestConsumerSingleMessageLoop(true);
+   }
+
+   @Test(timeout = 60_000)
+   public void testConsumerSingleMessageLoopNonExclusive() throws Exception {
+      doTestConsumerSingleMessageLoop(false);
+   }
+
+   public void  doTestConsumerSingleMessageLoop(boolean exclusive) throws 
Exception {
       Connection exConn = null;
 
       SimpleString durableQueue = new SimpleString("exampleQueue");
-      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(exclusive));
 
       try {
          ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
@@ -157,6 +166,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
             }
             session.commit();
 
+            // force a local socket close such that the broker sees an 
exception on the connection and fails the consumer via close
             
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
             exConn.close();
          }

Reply via email to