This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 766e81d26e ARTEMIS-4505 Cleanup page transactions on startup of the broker 766e81d26e is described below commit 766e81d26e222466e810c3d9d0608c2a939fe298 Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Tue Nov 14 23:35:36 2023 -0500 ARTEMIS-4505 Cleanup page transactions on startup of the broker --- .../artemis/core/paging/PageTransactionInfo.java | 5 + .../core/paging/cursor/PageCursorProvider.java | 2 + .../cursor/impl/PageCounterRebuildManager.java | 7 +- .../paging/cursor/impl/PageCursorProviderImpl.java | 13 +- .../core/paging/impl/PageTransactionInfoImpl.java | 15 +++ .../core/paging/impl/PagingManagerImpl.java | 87 ++++++++++++-- .../artemis/core/server/ActiveMQServerLogger.java | 9 ++ .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../core/server/files/FileMoveManagerTest.java | 2 +- .../paging/PageTransactionCleanupTest.java | 133 +++++++++++++++++++++ 10 files changed, 258 insertions(+), 17 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java index e977c00ed0..9692e2b7a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java @@ -73,4 +73,9 @@ public interface PageTransactionInfo extends EncodingSupport { */ boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription cursor, PagedReference pagedMessage); + /** Used on PageRebuildManager to cleanup orphaned Page Transactions */ + boolean isOrphaned(); + + PageTransactionInfo setOrphaned(boolean orphaned); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java index b42049fe55..526289aa00 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java @@ -67,4 +67,6 @@ public interface PageCursorProvider { void counterRebuildDone(); + boolean isRebuildDone(); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java index 62d40de398..e4cac16264 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java @@ -39,10 +39,12 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; /** this class will copy current data from the Subscriptions, count messages while the server is already active * performing other activity */ +// TODO: Rename this as RebuildManager in a future major version public class PageCounterRebuildManager implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -58,7 +60,7 @@ public class PageCounterRebuildManager implements Runnable { private final Set<Long> storedLargeMessages; - public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store, Map<Long, PageTransactionInfo> transactions, Set<Long> storedLargeMessages) { + public PageCounterRebuildManager(PagingManager pagingManager, PagingStore store, Map<Long, PageTransactionInfo> transactions, Set<Long> storedLargeMessages, AtomicLong minPageTXIDFound) { // we make a copy of the data because we are allowing data to influx. We will consolidate the values at the end initialize(store); this.pagingManager = pagingManager; @@ -253,6 +255,9 @@ public class PageCounterRebuildManager implements Runnable { if (msg.getTransactionID() > 0) { txInfo = transactions.get(msg.getTransactionID()); + if (txInfo != null) { + txInfo.setOrphaned(false); + } } Transaction preparedTX = txInfo == null ? null : txInfo.getPreparedTransaction(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index dea4cd5fbc..1cf6c1cd47 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -61,7 +61,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { // We can't call cleanup before counters were rebuilt // as they will determine if a subscription is empty or not - protected volatile boolean countersRebuilt = true; + protected volatile boolean rebuildDone = true; protected final PagingStore pagingStore; @@ -268,7 +268,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { protected void cleanup() { - if (!countersRebuilt) { + if (!rebuildDone) { logger.debug("Counters were not rebuilt yet, cleanup has to be ignored on address {}", pagingStore != null ? pagingStore.getAddress() : "NULL"); return; } @@ -611,11 +611,16 @@ public class PageCursorProviderImpl implements PageCursorProvider { @Override public void counterRebuildStarted() { - this.countersRebuilt = false; + this.rebuildDone = false; } @Override public void counterRebuildDone() { - this.countersRebuilt = true; + this.rebuildDone = true; + } + + @Override + public boolean isRebuildDone() { + return this.rebuildDone; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java index 3bcdcf4d4f..2ec4d242a7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java @@ -67,11 +67,26 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo { private List<LateDelivery> lateDeliveries; + /** To be used during by the RebuildManager. + * When reading transactions not found transactions are marked as done. */ + private boolean orphaned; + public PageTransactionInfoImpl(final long transactionID) { this(); this.transactionID = transactionID; } + @Override + public boolean isOrphaned() { + return orphaned; + } + + @Override + public PageTransactionInfoImpl setOrphaned(boolean orphaned) { + this.orphaned = orphaned; + return this; + } + public PageTransactionInfoImpl() { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 41773760be..b97c9448b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.paging.impl; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -27,6 +28,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import io.netty.util.collection.LongObjectHashMap; @@ -37,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStoreFactory; import org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -48,6 +52,7 @@ import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.lang.invoke.MethodHandles; import java.util.function.BiConsumer; @@ -56,6 +61,8 @@ import static org.apache.activemq.artemis.core.server.files.FileStoreMonitor.Fil public final class PagingManagerImpl implements PagingManager { + private static final int PAGE_TX_CLEANUP_PRINT_LIMIT = 1000; + private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", "60")); private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -76,6 +83,8 @@ public final class PagingManagerImpl implements PagingManager { private final HierarchicalRepository<AddressSettings> addressSettingsRepository; + private final ActiveMQServer server; + private PagingStoreFactory pagingStoreFactory; private volatile boolean globalFull; @@ -125,7 +134,8 @@ public final class PagingManagerImpl implements PagingManager { final HierarchicalRepository<AddressSettings> addressSettingsRepository, final long maxSize, final long maxMessages, - final SimpleString managementAddress) { + final SimpleString managementAddress, + final ActiveMQServer server) { pagingStoreFactory = pagingSPI; this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); @@ -138,14 +148,16 @@ public final class PagingManagerImpl implements PagingManager { globalSizeMetric.setUnderCallback(() -> setGlobalFull(false)); this.managerExecutor = pagingSPI.newExecutor(); this.managementAddress = managementAddress; + this.server = server; } SizeAwareMetric getSizeAwareMetric() { return globalSizeMetric; } - - /** To be used in tests only called through PagingManagerTestAccessor */ + /** + * To be used in tests only called through PagingManagerTestAccessor + */ void resetMaxSize(long maxSize, long maxMessages) { this.maxSize = maxSize; this.maxMessages = maxMessages; @@ -164,13 +176,13 @@ public final class PagingManagerImpl implements PagingManager { public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<AddressSettings> addressSettingsRepository) { - this(pagingSPI, addressSettingsRepository, -1, -1, null); + this(pagingSPI, addressSettingsRepository, -1, -1, null, null); } public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<AddressSettings> addressSettingsRepository, final SimpleString managementAddress) { - this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress); + this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress, null); } @Override @@ -325,7 +337,6 @@ public final class PagingManagerImpl implements PagingManager { } } - @Override public boolean isGlobalFull() { return diskFull || maxSize > 0 && globalFull; @@ -513,7 +524,6 @@ public final class PagingManagerImpl implements PagingManager { } } - @Override public synchronized void stop() throws Exception { if (!started) { @@ -583,22 +593,79 @@ public final class PagingManagerImpl implements PagingManager { public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) { Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap(); // making a copy - transactions.forEach(transactionsSet::put); + transactions.forEach((a, b) -> { + transactionsSet.put(a, b); + b.setOrphaned(true); + }); + AtomicLong minLargeMessageID = new AtomicLong(Long.MAX_VALUE); + + // make a copy of the stores + Map<SimpleString, PagingStore> currentStoreMap = new HashMap<>(); + stores.forEach(currentStoreMap::put); if (logger.isDebugEnabled()) { logger.debug("Page Transactions during rebuildCounters:"); transactionsSet.forEach((a, b) -> logger.debug("{} = {}", a, b)); } - stores.forEach((address, pgStore) -> { - PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages); + currentStoreMap.forEach((address, pgStore) -> { + PageCounterRebuildManager rebuildManager = new PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages, minLargeMessageID); logger.debug("Setting destination {} to rebuild counters", address); managerExecutor.execute(rebuildManager); }); + managerExecutor.execute(() -> cleanupPageTransactions(transactionsSet, currentStoreMap)); + FutureTask<Object> task = new FutureTask<>(() -> null); managerExecutor.execute(task); return task; } + + private void cleanupPageTransactions(Map<Long, PageTransactionInfo> transactionSet, Map<SimpleString, PagingStore> currentStoreMap) { + if (server == null) { + logger.warn("Server attribute was not set, cannot proceed with page transaction cleanup"); + } + AtomicBoolean proceed = new AtomicBoolean(true); + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // I'm now checking if all pagingStore have finished rebuilding the page counter. + // this would be only false if some exception happened on previous executions on the rebuild manager. + // so if any exception happened, the cleanup is not going to be done + currentStoreMap.forEach((a, b) -> { + if (!b.getCursorProvider().isRebuildDone()) { + logger.warn("cannot proceed on cleaning up page transactions as page cursor for {} is not done rebuilding it", b.getAddress()); + proceed.set(false); + } + }); + + if (!proceed.get()) { + return; + } + + AtomicLong txRemoved = new AtomicLong(0); + + transactionSet.forEach((a, b) -> { + if (b.isOrphaned()) { + b.onUpdate(b.getNumberOfMessages(), server.getStorageManager(), this); + txRemoved.incrementAndGet(); + + // I'm pringing up to 1000 records, id by ID.. + if (txRemoved.get() < PAGE_TX_CLEANUP_PRINT_LIMIT) { + ActiveMQServerLogger.LOGGER.removeOrphanedPageTransaction(a); + } else { + // after a while, I start just printing counters to speed up things a bit + if (txRemoved.get() % PAGE_TX_CLEANUP_PRINT_LIMIT == 0) { + ActiveMQServerLogger.LOGGER.cleaningOrphanedTXCleanup(txRemoved.get()); + } + + } + } + }); + + if (txRemoved.get() > 0) { + ActiveMQServerLogger.LOGGER.completeOrphanedTXCleanup(txRemoved.get()); + } else { + logger.debug("Complete cleanupPageTransactions with no orphaned records found"); + } + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 51b1580dac..966661365b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1593,4 +1593,13 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224130, value = "The {} value {} will override the {} value {} since both are set.", level = LogMessage.Level.INFO) void configParamOverride(String overridingParam, Object overridingParamValue, String overriddenParam, Object overriddenParamValue); + + @LogMessage(id = 224131, value = "Removing orphaned page transaction {}", level = LogMessage.Level.INFO) + void removeOrphanedPageTransaction(long transactionID); + + @LogMessage(id = 224132, value = "{} orphaned page transactions were removed", level = LogMessage.Level.INFO) + void completeOrphanedTXCleanup(long numberOfPageTx); + + @LogMessage(id = 224133, value = "{} orphaned page transactions have been removed", level = LogMessage.Level.INFO) + void cleaningOrphanedTXCleanup(long numberOfPageTx); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 82c88f4bf6..7f072eb1d4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -3048,7 +3048,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public PagingManager createPagingManager() throws Exception { - return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getGlobalMaxMessages(), configuration.getManagementAddress()); + return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getGlobalMaxMessages(), configuration.getManagementAddress(), this); } protected PagingStoreFactory getPagingStoreFactory() throws Exception { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java index 0991420ac2..4cd01b4bf8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java @@ -307,7 +307,7 @@ public class FileMoveManagerTest { PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), new OrderedExecutorFactory(threadPool), true, null); - PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress()); + PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress(), null); managerImpl.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java new file mode 100644 index 0000000000..5cf6c8b591 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.paging; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PagingOrderTest. + * <br> + * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters + */ +public class PageTransactionCleanupTest extends ActiveMQTestBase { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final int PAGE_MAX = 100 * 1024; + + private static final int PAGE_SIZE = 10 * 1024; + + static final SimpleString ADDRESS = new SimpleString("TestQueue"); + + @Test + public void testPageTXCleanup() throws Throwable { + + Configuration config = createDefaultConfig(true).setJournalSyncNonTransactional(false); + + ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>()); + + server.start(); + + Queue queue1 = server.createQueue(new QueueConfiguration("test1").setRoutingType(RoutingType.ANYCAST)); + Queue queue2 = server.createQueue(new QueueConfiguration("test2").setRoutingType(RoutingType.ANYCAST)); + + queue1.getPagingStore().startPaging(); + queue2.getPagingStore().startPaging(); + + ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + + final int NUMBER_OF_MESSAGES = 30; + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + for (int producerID = 1; producerID <= 2; producerID++) { + MessageProducer producer = session.createProducer(session.createQueue("test" + producerID)); + for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { + producer.send(session.createTextMessage("hello " + i)); + session.commit(); + } + } + } + + PagingStoreImpl store = (PagingStoreImpl) queue1.getPagingStore(); + File folder = store.getFolder(); + + server.stop(); + + for (String fileName : folder.list((dir, f) -> f.endsWith(".page"))) { + File fileToRemove = new File(folder, fileName); + fileToRemove.delete(); + logger.debug("removing file {}", fileToRemove); + } + + try (AssertionLoggerHandler handler = new AssertionLoggerHandler()) { + server.start(); + Wait.assertTrue(() -> handler.findText("AMQ224132")); + } + + server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000); + + HashMap<Integer, AtomicInteger> countedJournal = countJournal(server.getConfiguration()); + Assert.assertEquals(NUMBER_OF_MESSAGES, countedJournal.get(35).get()); + + server.stop(); + server.start(); + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + connection.start(); + for (int producerID = 1; producerID <= 2; producerID++) { + MessageConsumer consumer = session.createConsumer(session.createQueue("test" + producerID)); + for (int i = 0; i < (producerID == 1 ? 0 : NUMBER_OF_MESSAGES); i++) { + TextMessage message = (TextMessage) consumer.receive(5000); + Assert.assertNotNull("message not received on producer + " + producerID + ", message " + i, message); + Assert.assertEquals("could not find message " + i + " on producerID=" + producerID, "hello " + i, message.getText()); + session.commit(); + } + Assert.assertNull(consumer.receiveNoWait()); + consumer.close(); + } + } + } + +}