This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8599917222 ARTEMIS-4141 Update credits even for expired messages
8599917222 is described below

commit 85999172227bff29e8c5ffeef9340132d1413a35
Author: Nicolas Filotto <nfilo...@talend.com>
AuthorDate: Fri Sep 22 09:06:11 2023 +0200

    ARTEMIS-4141 Update credits even for expired messages
    
    When big messages are produced if a consumer receives an expired message, 
the credits are not updated, so if the consumer is too slow and an expiry delay 
has been set, we can end up with a situation where there are no more credits 
which prevents the consumer from receiving any more messages.
---
 .../artemis/api/core/client/MessageHandler.java    |  9 +++
 .../core/client/impl/ClientConsumerImpl.java       | 11 ++-
 .../jms/client/JMSMessageListenerWrapper.java      |  9 +++
 .../artemis/ra/inflow/ActiveMQMessageHandler.java  | 11 +++
 .../jms/client/SlowLargeMessageConsumerTest.java   | 83 ++++++++++++++++++++++
 5 files changed, 120 insertions(+), 3 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
index 4f39d1cc5b..64fa15d352 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java
@@ -32,4 +32,13 @@ public interface MessageHandler {
     * @param message a message
     */
    void onMessage(ClientMessage message);
+
+   /**
+    * Notifies the MessageHandler that an expired message has been received.
+    *
+    * @param message a message
+    */
+   default void onMessageExpired(ClientMessage message) {
+      // Do nothing by default
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 6a730a0972..75b5621843 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -992,13 +992,18 @@ public final class ClientConsumerImpl implements 
ClientConsumerInternal {
 
                logger.trace("{}::Handler.onMessage done", this);
 
-               if (message.isLargeMessage()) {
-                  message.discardBody();
-               }
             } else {
+               theHandler.onMessageExpired(message);
+
+               logger.trace("{}::Handler.onMessageExpired done", this);
+
                session.expire(this, message);
             }
 
+            if (message.isLargeMessage()) {
+               message.discardBody();
+            }
+
             // If slow consumer, we need to send 1 credit to make sure we get 
another message
             if (clientWindowSize == 0) {
                startSlowConsumer();
diff --git 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index f24e90db02..ec7bba23c2 100644
--- 
a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ 
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -143,4 +143,13 @@ public class JMSMessageListenerWrapper implements 
MessageHandler {
 
       session.setRecoverCalled(false);
    }
+
+   @Override
+   public void onMessageExpired(ClientMessage message) {
+      try {
+         message.checkCompletion();
+      } catch (ActiveMQException e) {
+         ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
+      }
+   }
 }
diff --git 
a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
 
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 8be00955ad..2a3cf24048 100644
--- 
a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ 
b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -42,6 +42,7 @@ import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSClientLogger;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
 import 
org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage;
@@ -397,6 +398,16 @@ public class ActiveMQMessageHandler implements 
MessageHandler, FailoverEventList
 
    }
 
+
+   @Override
+   public void onMessageExpired(ClientMessage message) {
+      try {
+         message.checkCompletion();
+      } catch (ActiveMQException e) {
+         ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
+      }
+   }
+
    public void start() throws ActiveMQException {
       session.start();
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java
new file mode 100644
index 0000000000..7354edf5e7
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.Test;
+
+public class SlowLargeMessageConsumerTest extends JMSTestBase {
+
+   private static final String TOPIC = "SlowLargeMessageConsumerTopic";
+
+
+   @Override
+   protected void extraServerConfig(ActiveMQServer server) {
+      server.getConfiguration().getAddressSettings().put(TOPIC, new 
AddressSettings().setExpiryDelay(100L).setMaxSizeBytes(1024));
+   }
+
+   /**
+     * @see <a 
href="https://issues.apache.org/jira/browse/ARTEMIS-4141";>ARTEMIS-4141</a>
+     */
+   @Test
+   public void ensureSlowConsumerOfLargeMessageNeverGetsStuck() throws 
Exception {
+      try (Connection conn = cf.createConnection()) {
+         conn.start();
+         try (Session sessionConsumer = conn.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+              Session sessionProducer = conn.createSession(false, 
Session.CLIENT_ACKNOWLEDGE)) {
+            final Destination topic = sessionConsumer.createTopic(TOPIC);
+            final MessageConsumer consumer = 
sessionConsumer.createConsumer(topic);
+            final AtomicBoolean slow = new AtomicBoolean(true);
+            final CountDownLatch messageReceived = new CountDownLatch(1);
+            consumer.setMessageListener(message -> {
+               if (slow.get()) {
+                  try {
+                     TimeUnit.MILLISECONDS.sleep(50);
+                  } catch (InterruptedException ex) {
+                     Thread.currentThread().interrupt();
+                  }
+               } else {
+                  messageReceived.countDown();
+               }
+            });
+            final MessageProducer producer = 
sessionProducer.createProducer(topic);
+            int msgSize = 512 * 1024;
+            for (int i = 0; i < 100; i++) {
+               
producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize)));
+               TimeUnit.MILLISECONDS.sleep(25);
+            }
+            TimeUnit.MILLISECONDS.sleep(100);
+            slow.set(false);
+            
producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize)));
+            assertTrue(messageReceived.await(500, TimeUnit.MILLISECONDS));
+         }
+      }
+   }
+}

Reply via email to