This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 8599917222 ARTEMIS-4141 Update credits even for expired messages 8599917222 is described below commit 85999172227bff29e8c5ffeef9340132d1413a35 Author: Nicolas Filotto <nfilo...@talend.com> AuthorDate: Fri Sep 22 09:06:11 2023 +0200 ARTEMIS-4141 Update credits even for expired messages When big messages are produced if a consumer receives an expired message, the credits are not updated, so if the consumer is too slow and an expiry delay has been set, we can end up with a situation where there are no more credits which prevents the consumer from receiving any more messages. --- .../artemis/api/core/client/MessageHandler.java | 9 +++ .../core/client/impl/ClientConsumerImpl.java | 11 ++- .../jms/client/JMSMessageListenerWrapper.java | 9 +++ .../artemis/ra/inflow/ActiveMQMessageHandler.java | 11 +++ .../jms/client/SlowLargeMessageConsumerTest.java | 83 ++++++++++++++++++++++ 5 files changed, 120 insertions(+), 3 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java index 4f39d1cc5b..64fa15d352 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/MessageHandler.java @@ -32,4 +32,13 @@ public interface MessageHandler { * @param message a message */ void onMessage(ClientMessage message); + + /** + * Notifies the MessageHandler that an expired message has been received. + * + * @param message a message + */ + default void onMessageExpired(ClientMessage message) { + // Do nothing by default + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 6a730a0972..75b5621843 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -992,13 +992,18 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { logger.trace("{}::Handler.onMessage done", this); - if (message.isLargeMessage()) { - message.discardBody(); - } } else { + theHandler.onMessageExpired(message); + + logger.trace("{}::Handler.onMessageExpired done", this); + session.expire(this, message); } + if (message.isLargeMessage()) { + message.discardBody(); + } + // If slow consumer, we need to send 1 credit to make sure we get another message if (clientWindowSize == 0) { startSlowConsumer(); diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java index f24e90db02..ec7bba23c2 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java @@ -143,4 +143,13 @@ public class JMSMessageListenerWrapper implements MessageHandler { session.setRecoverCalled(false); } + + @Override + public void onMessageExpired(ClientMessage message) { + try { + message.checkCompletion(); + } catch (ActiveMQException e) { + ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e); + } + } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index 8be00955ad..2a3cf24048 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSClientLogger; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMessage; @@ -397,6 +398,16 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList } + + @Override + public void onMessageExpired(ClientMessage message) { + try { + message.checkCompletion(); + } catch (ActiveMQException e) { + ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e); + } + } + public void start() throws ActiveMQException { session.start(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java new file mode 100644 index 0000000000..7354edf5e7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/SlowLargeMessageConsumerTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.jms.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.commons.lang3.RandomUtils; +import org.junit.Test; + +public class SlowLargeMessageConsumerTest extends JMSTestBase { + + private static final String TOPIC = "SlowLargeMessageConsumerTopic"; + + + @Override + protected void extraServerConfig(ActiveMQServer server) { + server.getConfiguration().getAddressSettings().put(TOPIC, new AddressSettings().setExpiryDelay(100L).setMaxSizeBytes(1024)); + } + + /** + * @see <a href="https://issues.apache.org/jira/browse/ARTEMIS-4141">ARTEMIS-4141</a> + */ + @Test + public void ensureSlowConsumerOfLargeMessageNeverGetsStuck() throws Exception { + try (Connection conn = cf.createConnection()) { + conn.start(); + try (Session sessionConsumer = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session sessionProducer = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE)) { + final Destination topic = sessionConsumer.createTopic(TOPIC); + final MessageConsumer consumer = sessionConsumer.createConsumer(topic); + final AtomicBoolean slow = new AtomicBoolean(true); + final CountDownLatch messageReceived = new CountDownLatch(1); + consumer.setMessageListener(message -> { + if (slow.get()) { + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } else { + messageReceived.countDown(); + } + }); + final MessageProducer producer = sessionProducer.createProducer(topic); + int msgSize = 512 * 1024; + for (int i = 0; i < 100; i++) { + producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize))); + TimeUnit.MILLISECONDS.sleep(25); + } + TimeUnit.MILLISECONDS.sleep(100); + slow.set(false); + producer.send(sessionProducer.createObjectMessage(RandomUtils.nextBytes(msgSize))); + assertTrue(messageReceived.await(500, TimeUnit.MILLISECONDS)); + } + } + } +}