This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/artemis.git
commit 15657ce3a29cd0b22e132266637f9e0bc93a8100 Author: Clebert Suconic <[email protected]> AuthorDate: Wed Feb 25 13:09:36 2026 -0500 ARTEMIS-5376 Preventing OME from QueueImpl::iteration while paging --- .../paging/cursor/impl/PageSubscriptionImpl.java | 9 +- .../artemis/core/server/ActiveMQServerLogger.java | 3 + .../artemis/core/server/impl/QueueImpl.java | 131 +++++++++++++----- .../DualMirrorSingleAcceptorRunningTest.java | 1 - .../QueueMemoryFloodValidationTest.java | 153 +++++++++++++++++++++ 5 files changed, 262 insertions(+), 35 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 5adafe74b7..cd812b1a9f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -1431,9 +1431,14 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public synchronized NextResult tryNext() { - // if an unbehaved program called hasNext twice before next, we only cache it once. if (cachedNext != null) { - return NextResult.hasElements; + PageCursorInfo info = locatePageInfo(cachedNext.getPagedMessage().getPageNumber()); + if (info != null && info.isRemoved(cachedNext.getPagedMessage().getMessageNumber())) { + logger.debug("Reference {} has been removed from another iterator, moving it next", cachedNext); + cachedNext = null; + } else { + return NextResult.hasElements; + } } if (!pageStore.isStorePaging()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 04618745cb..95542dd4d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1526,4 +1526,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224157, value = "At least one of the components failed to start under the lockCoordinator {}. A retry will be executed", level = LogMessage.Level.INFO) void retryLockCoordinator(String name); + + @LogMessage(id = 224158, value = "The operation {} on queue {} cannot read more data from paging into memory and will be interrupted.", level = LogMessage.Level.INFO) + void preventQueueManagementToFloodMemory(String operation, String queue); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 32cd4bfa5e..944992df0e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1003,6 +1003,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void addTail(final MessageReference ref, final boolean direct) { + if (logger.isTraceEnabled()) { + logger.trace("AddTail on queue {}, reference={}", this.getName(), ref); + } try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_TAIL)) { if (scheduleIfPossible(ref)) { return; @@ -2025,7 +2028,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception { - return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason)); + return iterQueue("deleteMatchingReferences", flushLimit, filter1, createDeleteMatchingAction(ackReason)); } QueueIterateAction createDeleteMatchingAction(AckReason ackReason) { @@ -2039,13 +2042,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { }; } + private int iterQueue(final String operationName, + final int flushLimit, + final Filter filter1, + QueueIterateAction messageAction) throws Exception { + return iterQueue(operationName, flushLimit, filter1, messageAction, true); + } /** * This is a generic method for any method interacting on the Queue to move or delete messages Instead of duplicate * the feature we created an abstract class where you pass the logic for each message. */ - private int iterQueue(final int flushLimit, + private int iterQueue(final String operationName, + final int flushLimit, final Filter filter1, - QueueIterateAction messageAction) throws Exception { + QueueIterateAction messageAction, + boolean separatePageIterator) throws Exception { int count = 0; int txCount = 0; @@ -2059,6 +2070,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { depageLock.lock(); + if (logger.isDebugEnabled()) { + logger.debug("Executing iterQueue for operation {} on queue {}", operationName, getName()); + } + try { Transaction tx = new TransactionImpl(storageManager); @@ -2075,6 +2090,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (messageAction.actMessage(tx, ref)) { iter.remove(); refRemoved(ref); + if (logger.isTraceEnabled()) { + logger.trace("{} matched act=true on reference {}, during queue iteration", count, ref); + } + } else { + if (logger.isTraceEnabled()) { + logger.trace("{} matched act=false on reference {}, during queue iteration", count, ref); + } } txCount++; count++; @@ -2095,9 +2117,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(messageAction::match); for (MessageReference messageReference : cancelled) { - messageAction.actMessage(tx, messageReference); - count++; - txCount++; + if (messageAction.actMessage(tx, messageReference)) { + count++; + txCount++; + } if (messageAction.expectedHitsReached(count)) { break; } @@ -2112,24 +2135,60 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } if (pageIterator != null) { - while (pageIterator.hasNext() && !messageAction.expectedHitsReached(count)) { - PagedReference reference = pageIterator.next(); - pageIterator.remove(); + PageIterator theIterator; + if (separatePageIterator) { + theIterator = pageSubscription.iterator(); + } else { + theIterator = pageIterator; + } + + try { + while (theIterator.hasNext() && !messageAction.expectedHitsReached(count)) { + PagedReference reference = theIterator.next(); + boolean matched = messageAction.match(reference); + boolean acted = false; - if (messageAction.match(reference)) { - if (!messageAction.actMessage(tx, reference)) { - addTail(reference, false); + if (matched) { + acted = messageAction.actMessage(tx, reference); + } + + if (logger.isTraceEnabled()) { + logger.trace("{} matched={} act={} on reference {}, during queue iteration", count, matched, acted, reference); + } + + if (separatePageIterator) { + if (acted) { + theIterator.remove(); + } + } else { + theIterator.remove(); + + if (!acted) { + // Put non-matching or non-acted messages back to the queue tail + addTail(reference, false); + if (!needsDepage()) { + ActiveMQServerLogger.LOGGER.preventQueueManagementToFloodMemory(operationName, String.valueOf(QueueImpl.this.getName())); + break; + } + } + } + + if (matched) { + txCount++; + count++; + } + + if (txCount > 0 && txCount % flushLimit == 0) { + tx.commit(); + tx = new TransactionImpl(storageManager); + txCount = 0; } - txCount++; - count++; - } else { - addTail(reference, false); } - if (txCount > 0 && txCount % flushLimit == 0) { - tx.commit(); - tx = new TransactionImpl(storageManager); - txCount = 0; + } finally { + if (separatePageIterator) { + // close the iterator if not allowed to depage + theIterator.close(); } } } @@ -2441,7 +2500,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + return iterQueue("sendMessageToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2457,7 +2516,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { + return iterQueue("sendMessagesToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2476,7 +2535,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final Binding binding, final boolean rejectDuplicate) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + return iterQueue("moveReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2513,7 +2572,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final Integer expectedHits = messageCount > 0 ? messageCount : null; final DuplicateIDCache targetDuplicateCache = postOffice.getDuplicateIDCache(toAddress); - return iterQueue(flushLimit, filter, new QueueIterateAction(expectedHits) { + return iterQueue("moveReferences", flushLimit, filter, new QueueIterateAction(expectedHits) { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { boolean ignored = false; @@ -2541,7 +2600,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() { + return iterQueue("moveReferencesBetweenSnFQueues", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { return moveBetweenSnFQueues(queueSuffix, tx, ref, null); @@ -2554,7 +2613,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final SimpleString toQueue, final Binding binding) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + return iterQueue("copyReference", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2567,7 +2626,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } public int rerouteMessages(final SimpleString queueName, final Filter filter) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { + return iterQueue("rerouteMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { RoutingContext routingContext = new RoutingContextImpl(tx); @@ -2592,7 +2651,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { final HashMap<String, Long> queues = new HashMap<>(); - return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) { + return iterQueue("retryMessages", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction(expectedHits) { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2637,7 +2696,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { + // changeReferences is changing the message directly in the queue. + // For that reason, iterQueue here needs to act as if it's depaging messages, + // and thus it needs to use the same iterator from depaging and keep the message at the tail of the + // queue after being read. + return iterQueue("changeReferencePriority", DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction(messageID) { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2645,14 +2708,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return false; } - }) == 1; + }, false) == 1; } @Override public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception { - return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { + // changeReferences is changing the message directly in the queue. + // For that reason, iterQueue here needs to act as if it's depaging messages, + // and thus it needs to use the same iterator from depaging and keep the message at the tail of the + // queue after being read. + return iterQueue("changeReferencesPriority", DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() { @Override public boolean actMessage(Transaction tx, MessageReference ref) throws Exception { @@ -2660,7 +2727,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return false; } - }); + }, false); } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java index 8642bd7c4e..02c163a78d 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java @@ -197,7 +197,6 @@ public class DualMirrorSingleAcceptorRunningTest extends SmokeTestBase { } // Send messages through the shared acceptor - cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); sendMessages(cfX, MESSAGES_SENT_PER_ITERATION); // Consume some messages diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/QueueMemoryFloodValidationTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/QueueMemoryFloodValidationTest.java new file mode 100644 index 0000000000..a8bcf55d98 --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/QueueMemoryFloodValidationTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.memoryFlood; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.File; +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.api.core.management.SimpleManagement; +import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class QueueMemoryFloodValidationTest extends SoakTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String QUEUE_NAME = "simpleTest"; + public static final String SERVER_NAME = "validate-iterations"; + private static File serverLocation; + + private static final long MESSAGE_COUNT = 10_000; + private static final int MESSAGE_TO_REMOVE_START = 9_500; + private static final int MESSAGE_TO_REMOVE_END = 9_600; + private static final int MESSAGES_REMOVED = MESSAGE_TO_REMOVE_END - MESSAGE_TO_REMOVE_START + 1; + + private Process server; + + @BeforeAll + public static void createServers() throws Exception { + serverLocation = getFileServerLocation(SERVER_NAME); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = helperCreate(); + cliCreateServer.setUseAIO(false).setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation); + // to speedup producers + cliCreateServer.addArgs("--no-fsync"); + cliCreateServer.addArgs("--queues", QUEUE_NAME); + // limiting memory to make the test more predictable + cliCreateServer.addArgs("--java-memory", "512M"); + cliCreateServer.createServer(); + + FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"), "<max-size-messages>-1</max-size-messages>", " <max-size-messages>1000</max-size-messages>"); + } + + + /** It will call a few management operations making sure they will not flood the memory with the entire page dataset */ + @Test + public void testPreventMemoryFlood() throws Exception { + server = startServer(SERVER_NAME, 0, 5000); + + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + sendMessages(factory); + + SimpleManagement simpleManagement = new SimpleManagement("tcp://localhost:61616", null, null); + simpleManagement.simpleManagementInt(ResourceNames.QUEUE + QUEUE_NAME, "changeMessagesPriority", "", (int)3); + + File logLocation = new File(serverLocation, "log/artemis.log"); + assertTrue(FileUtil.find(logLocation, l -> l.contains("AMQ224158")), "Expected AMQ224158 warning log message from ActiveMQServerLogger.preventQueueManagementToFloodMemory"); + + Wait.assertEquals(MESSAGE_COUNT, () -> simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100); + + int removed = simpleManagement.simpleManagementInt(ResourceNames.QUEUE + QUEUE_NAME, "removeMessages", "i >= " + MESSAGE_TO_REMOVE_START + " AND i <= " + MESSAGE_TO_REMOVE_END); + + assertEquals(MESSAGES_REMOVED, removed); + Wait.assertEquals(MESSAGE_COUNT - MESSAGES_REMOVED, () -> simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100); + + int messagesRemoved = simpleManagement.simpleManagementInt(ResourceNames.QUEUE + QUEUE_NAME, "removeAllMessages"); + assertEquals(MESSAGE_COUNT - MESSAGES_REMOVED, messagesRemoved); + + try (Connection connection = factory.createConnection()) { + connection.start(); + Session session = connection.createSession(Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + Message message = consumer.receiveNoWait(); + assertNull(message); + } + + sendMessages(factory); + + removed = simpleManagement.simpleManagementInt(ResourceNames.QUEUE + QUEUE_NAME, "removeMessages", "i >= " + MESSAGE_TO_REMOVE_START + " AND i <= " + MESSAGE_TO_REMOVE_END); + assertEquals(MESSAGES_REMOVED, removed); + Wait.assertEquals(MESSAGE_COUNT - MESSAGES_REMOVED, () -> simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100); + + try (Connection connection = factory.createConnection()) { + connection.start(); + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + for (int i = 0; i < MESSAGE_COUNT - MESSAGES_REMOVED; i++) { + Message message = consumer.receive(5000); + assertNotNull(message); + assertTrue(message.getIntProperty("i") < MESSAGE_TO_REMOVE_START || message.getIntProperty("i") > MESSAGE_TO_REMOVE_END); + } + } + + Wait.assertEquals(0L, () -> simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100); + + } + + private static void sendMessages(ConnectionFactory factory) throws JMSException { + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + TextMessage message = session.createTextMessage(RandomUtil.randomAlphaNumericString(1024 * 40)); + message.setIntProperty("i", i); + producer.send(message); + + if (i % 1000 == 0) { + logger.info("sent {} out of {}", i, MESSAGE_COUNT); + session.commit(); + } + } + session.commit(); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
