This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git

commit 15657ce3a29cd0b22e132266637f9e0bc93a8100
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Feb 25 13:09:36 2026 -0500

    ARTEMIS-5376 Preventing OME from QueueImpl::iteration while paging
---
 .../paging/cursor/impl/PageSubscriptionImpl.java   |   9 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   3 +
 .../artemis/core/server/impl/QueueImpl.java        | 131 +++++++++++++-----
 .../DualMirrorSingleAcceptorRunningTest.java       |   1 -
 .../QueueMemoryFloodValidationTest.java            | 153 +++++++++++++++++++++
 5 files changed, 262 insertions(+), 35 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 5adafe74b7..cd812b1a9f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -1431,9 +1431,14 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
       @Override
       public synchronized NextResult tryNext() {
-         // if an unbehaved program called hasNext twice before next, we only 
cache it once.
          if (cachedNext != null) {
-            return NextResult.hasElements;
+            PageCursorInfo info = 
locatePageInfo(cachedNext.getPagedMessage().getPageNumber());
+            if (info != null && 
info.isRemoved(cachedNext.getPagedMessage().getMessageNumber())) {
+               logger.debug("Reference {} has been removed from another 
iterator, moving it next", cachedNext);
+               cachedNext = null;
+            } else {
+               return NextResult.hasElements;
+            }
          }
 
          if (!pageStore.isStorePaging()) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 04618745cb..95542dd4d9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1526,4 +1526,7 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224157, value = "At least one of the components failed to 
start under the lockCoordinator {}. A retry will be executed", level = 
LogMessage.Level.INFO)
    void retryLockCoordinator(String name);
+
+   @LogMessage(id = 224158, value = "The operation {} on queue {} cannot read 
more data from paging into memory and will be interrupted.", level = 
LogMessage.Level.INFO)
+   void preventQueueManagementToFloodMemory(String operation, String queue);
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 32cd4bfa5e..944992df0e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1003,6 +1003,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    @Override
    public void addTail(final MessageReference ref, final boolean direct) {
+      if (logger.isTraceEnabled()) {
+         logger.trace("AddTail on queue {}, reference={}", this.getName(), 
ref);
+      }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_TAIL)) {
          if (scheduleIfPossible(ref)) {
             return;
@@ -2025,7 +2028,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    @Override
    public int deleteMatchingReferences(final int flushLimit, final Filter 
filter1, AckReason ackReason) throws Exception {
-      return iterQueue(flushLimit, filter1, 
createDeleteMatchingAction(ackReason));
+      return iterQueue("deleteMatchingReferences", flushLimit, filter1, 
createDeleteMatchingAction(ackReason));
    }
 
    QueueIterateAction createDeleteMatchingAction(AckReason ackReason) {
@@ -2039,13 +2042,21 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       };
    }
 
+   private int iterQueue(final String operationName,
+                         final int flushLimit,
+                         final Filter filter1,
+                         QueueIterateAction messageAction) throws Exception {
+      return iterQueue(operationName, flushLimit, filter1, messageAction, 
true);
+   }
    /**
     * This is a generic method for any method interacting on the Queue to move 
or delete messages Instead of duplicate
     * the feature we created an abstract class where you pass the logic for 
each message.
     */
-   private int iterQueue(final int flushLimit,
+   private int iterQueue(final String operationName,
+                         final int flushLimit,
                          final Filter filter1,
-                         QueueIterateAction messageAction) throws Exception {
+                         QueueIterateAction messageAction,
+                         boolean separatePageIterator) throws Exception {
       int count = 0;
       int txCount = 0;
 
@@ -2059,6 +2070,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       depageLock.lock();
 
+      if (logger.isDebugEnabled()) {
+         logger.debug("Executing iterQueue for operation {} on queue {}", 
operationName, getName());
+      }
+
       try {
          Transaction tx = new TransactionImpl(storageManager);
 
@@ -2075,6 +2090,13 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                      if (messageAction.actMessage(tx, ref)) {
                         iter.remove();
                         refRemoved(ref);
+                        if (logger.isTraceEnabled()) {
+                           logger.trace("{} matched act=true on reference {}, 
during queue iteration", count, ref);
+                        }
+                     } else {
+                        if (logger.isTraceEnabled()) {
+                           logger.trace("{} matched act=false on reference {}, 
during queue iteration", count, ref);
+                        }
                      }
                      txCount++;
                      count++;
@@ -2095,9 +2117,10 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
                List<MessageReference> cancelled = 
scheduledDeliveryHandler.cancel(messageAction::match);
                for (MessageReference messageReference : cancelled) {
-                  messageAction.actMessage(tx, messageReference);
-                  count++;
-                  txCount++;
+                  if (messageAction.actMessage(tx, messageReference)) {
+                     count++;
+                     txCount++;
+                  }
                   if (messageAction.expectedHitsReached(count)) {
                      break;
                   }
@@ -2112,24 +2135,60 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          }
 
          if (pageIterator != null) {
-            while (pageIterator.hasNext() && 
!messageAction.expectedHitsReached(count)) {
-               PagedReference reference = pageIterator.next();
-               pageIterator.remove();
+            PageIterator theIterator;
+            if (separatePageIterator) {
+               theIterator = pageSubscription.iterator();
+            } else {
+               theIterator = pageIterator;
+            }
+
+            try {
+               while (theIterator.hasNext() && 
!messageAction.expectedHitsReached(count)) {
+                  PagedReference reference = theIterator.next();
+                  boolean matched = messageAction.match(reference);
+                  boolean acted = false;
 
-               if (messageAction.match(reference)) {
-                  if (!messageAction.actMessage(tx, reference)) {
-                     addTail(reference, false);
+                  if (matched) {
+                     acted = messageAction.actMessage(tx, reference);
+                  }
+
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("{} matched={} act={} on reference {}, 
during queue iteration", count, matched, acted, reference);
+                  }
+
+                  if (separatePageIterator) {
+                     if (acted) {
+                        theIterator.remove();
+                     }
+                  } else {
+                     theIterator.remove();
+
+                     if (!acted) {
+                        // Put non-matching or non-acted messages back to the 
queue tail
+                        addTail(reference, false);
+                        if (!needsDepage()) {
+                           
ActiveMQServerLogger.LOGGER.preventQueueManagementToFloodMemory(operationName, 
String.valueOf(QueueImpl.this.getName()));
+                           break;
+                        }
+                     }
+                  }
+
+                  if (matched) {
+                     txCount++;
+                     count++;
+                  }
+
+                  if (txCount > 0 && txCount % flushLimit == 0) {
+                     tx.commit();
+                     tx = new TransactionImpl(storageManager);
+                     txCount = 0;
                   }
-                  txCount++;
-                  count++;
-               } else {
-                  addTail(reference, false);
                }
 
-               if (txCount > 0 && txCount % flushLimit == 0) {
-                  tx.commit();
-                  tx = new TransactionImpl(storageManager);
-                  txCount = 0;
+            } finally {
+               if (separatePageIterator) {
+                  // close the iterator if not allowed to depage
+                  theIterator.close();
                }
             }
          }
@@ -2441,7 +2500,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    @Override
    public boolean sendMessageToDeadLetterAddress(final long messageID) throws 
Exception {
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+      return iterQueue("sendMessageToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, 
null, new QueueIterateAction(messageID) {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2457,7 +2516,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    @Override
    public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+      return iterQueue("sendMessagesToDeadLetterAddress", DEFAULT_FLUSH_LIMIT, 
filter, new QueueIterateAction() {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2476,7 +2535,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                                 final Binding binding,
                                 final boolean rejectDuplicate) throws 
Exception {
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+      return iterQueue("moveReference", DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2513,7 +2572,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       final Integer expectedHits = messageCount > 0 ? messageCount : null;
       final DuplicateIDCache targetDuplicateCache = 
postOffice.getDuplicateIDCache(toAddress);
 
-      return iterQueue(flushLimit, filter, new 
QueueIterateAction(expectedHits) {
+      return iterQueue("moveReferences", flushLimit, filter, new 
QueueIterateAction(expectedHits) {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
             boolean ignored = false;
@@ -2541,7 +2600,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) 
throws Exception {
-      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
+      return iterQueue("moveReferencesBetweenSnFQueues", DEFAULT_FLUSH_LIMIT, 
null, new QueueIterateAction() {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
             return moveBetweenSnFQueues(queueSuffix, tx, ref, null);
@@ -2554,7 +2613,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                                 final SimpleString toQueue,
                                 final Binding binding) throws Exception {
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+      return iterQueue("copyReference", DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2567,7 +2626,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    public int rerouteMessages(final SimpleString queueName, final Filter 
filter) throws Exception {
-      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+      return iterQueue("rerouteMessages", DEFAULT_FLUSH_LIMIT, filter, new 
QueueIterateAction() {
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
             RoutingContext routingContext = new RoutingContextImpl(tx);
@@ -2592,7 +2651,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       final HashMap<String, Long> queues = new HashMap<>();
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new 
QueueIterateAction(expectedHits) {
+      return iterQueue("retryMessages", DEFAULT_FLUSH_LIMIT, filter, new 
QueueIterateAction(expectedHits) {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2637,7 +2696,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    @Override
    public boolean changeReferencePriority(final long messageID, final byte 
newPriority) throws Exception {
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, null, new 
QueueIterateAction(messageID) {
+      // changeReferences is changing the message directly in the queue.
+      // For that reason, iterQueue here needs to act as if it's depaging 
messages,
+      // and thus it needs to use the same iterator from depaging and keep the 
message at the tail of the
+      // queue after being read.
+      return iterQueue("changeReferencePriority", DEFAULT_FLUSH_LIMIT, null, 
new QueueIterateAction(messageID) {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2645,14 +2708,18 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             return false;
          }
 
-      }) == 1;
+      }, false) == 1;
 
    }
 
    @Override
    public int changeReferencesPriority(final Filter filter, final byte 
newPriority) throws Exception {
 
-      return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
+      // changeReferences is changing the message directly in the queue.
+      // For that reason, iterQueue here needs to act as if it's depaging 
messages,
+      // and thus it needs to use the same iterator from depaging and keep the 
message at the tail of the
+      // queue after being read.
+      return iterQueue("changeReferencesPriority", DEFAULT_FLUSH_LIMIT, 
filter, new QueueIterateAction() {
 
          @Override
          public boolean actMessage(Transaction tx, MessageReference ref) 
throws Exception {
@@ -2660,7 +2727,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             return false;
          }
 
-      });
+      }, false);
 
    }
 
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
index 8642bd7c4e..02c163a78d 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/DualMirrorSingleAcceptorRunningTest.java
@@ -197,7 +197,6 @@ public class DualMirrorSingleAcceptorRunningTest extends 
SmokeTestBase {
          }
 
          // Send messages through the shared acceptor
-         cfX = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
          sendMessages(cfX, MESSAGES_SENT_PER_ITERATION);
 
          // Consume some messages
diff --git 
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/QueueMemoryFloodValidationTest.java
 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/QueueMemoryFloodValidationTest.java
new file mode 100644
index 0000000000..a8bcf55d98
--- /dev/null
+++ 
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/QueueMemoryFloodValidationTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.memoryFlood;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QueueMemoryFloodValidationTest extends SoakTestBase {
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final String QUEUE_NAME = "simpleTest";
+   public static final String SERVER_NAME = "validate-iterations";
+   private static File serverLocation;
+
+   private static final long MESSAGE_COUNT = 10_000;
+   private static final int MESSAGE_TO_REMOVE_START = 9_500;
+   private static final int MESSAGE_TO_REMOVE_END = 9_600;
+   private static final int MESSAGES_REMOVED = MESSAGE_TO_REMOVE_END - 
MESSAGE_TO_REMOVE_START + 1;
+
+   private Process server;
+
+   @BeforeAll
+   public static void createServers() throws Exception {
+      serverLocation = getFileServerLocation(SERVER_NAME);
+      deleteDirectory(serverLocation);
+
+      HelperCreate cliCreateServer = helperCreate();
+      
cliCreateServer.setUseAIO(false).setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
+      // to speedup producers
+      cliCreateServer.addArgs("--no-fsync");
+      cliCreateServer.addArgs("--queues", QUEUE_NAME);
+      // limiting memory to make the test more predictable
+      cliCreateServer.addArgs("--java-memory", "512M");
+      cliCreateServer.createServer();
+
+      FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"), 
"<max-size-messages>-1</max-size-messages>", " 
<max-size-messages>1000</max-size-messages>");
+   }
+
+
+   /** It will call a few management operations making sure they will not 
flood the memory with the entire page dataset */
+   @Test
+   public void testPreventMemoryFlood() throws Exception {
+      server = startServer(SERVER_NAME, 0, 5000);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("core", 
"tcp://localhost:61616");
+      sendMessages(factory);
+
+      SimpleManagement simpleManagement = new 
SimpleManagement("tcp://localhost:61616", null, null);
+      simpleManagement.simpleManagementInt(ResourceNames.QUEUE + QUEUE_NAME, 
"changeMessagesPriority", "", (int)3);
+
+      File logLocation = new File(serverLocation, "log/artemis.log");
+      assertTrue(FileUtil.find(logLocation, l -> l.contains("AMQ224158")), 
"Expected AMQ224158 warning log message from 
ActiveMQServerLogger.preventQueueManagementToFloodMemory");
+
+      Wait.assertEquals(MESSAGE_COUNT, () -> 
simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
+
+      int removed = simpleManagement.simpleManagementInt(ResourceNames.QUEUE + 
QUEUE_NAME, "removeMessages", "i >= " + MESSAGE_TO_REMOVE_START + " AND i <= " 
+ MESSAGE_TO_REMOVE_END);
+
+      assertEquals(MESSAGES_REMOVED, removed);
+      Wait.assertEquals(MESSAGE_COUNT - MESSAGES_REMOVED, () -> 
simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
+
+      int messagesRemoved = 
simpleManagement.simpleManagementInt(ResourceNames.QUEUE + QUEUE_NAME, 
"removeAllMessages");
+      assertEquals(MESSAGE_COUNT - MESSAGES_REMOVED, messagesRemoved);
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = 
connection.createSession(Session.SESSION_TRANSACTED);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+         Message message = consumer.receiveNoWait();
+         assertNull(message);
+      }
+
+      sendMessages(factory);
+
+      removed = simpleManagement.simpleManagementInt(ResourceNames.QUEUE + 
QUEUE_NAME, "removeMessages", "i >= " + MESSAGE_TO_REMOVE_START + " AND i <= " 
+ MESSAGE_TO_REMOVE_END);
+      assertEquals(MESSAGES_REMOVED, removed);
+      Wait.assertEquals(MESSAGE_COUNT - MESSAGES_REMOVED, () -> 
simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
+
+      try (Connection connection = factory.createConnection()) {
+         connection.start();
+         Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < MESSAGE_COUNT - MESSAGES_REMOVED; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull(message);
+            assertTrue(message.getIntProperty("i") < MESSAGE_TO_REMOVE_START 
|| message.getIntProperty("i") > MESSAGE_TO_REMOVE_END);
+         }
+      }
+
+      Wait.assertEquals(0L, () -> 
simpleManagement.getMessageCountOnQueue(QUEUE_NAME), 5000, 100);
+
+   }
+
+   private static void sendMessages(ConnectionFactory factory) throws 
JMSException {
+      try (Connection connection = factory.createConnection()) {
+         Session session = 
connection.createSession(Session.SESSION_TRANSACTED);
+         MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+
+         for (int i = 0; i < MESSAGE_COUNT; i++) {
+            TextMessage message = 
session.createTextMessage(RandomUtil.randomAlphaNumericString(1024 * 40));
+            message.setIntProperty("i", i);
+            producer.send(message);
+
+            if (i % 1000 == 0) {
+               logger.info("sent {} out of {}", i, MESSAGE_COUNT);
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+   }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to