Repository: activemq Updated Branches: refs/heads/master b5dd0a16f -> cc6213ebf
https://issues.apache.org/jira/browse/AMQ-5712 Switching addMessageLast to tryAddMessageLast when messages are added to a Queue pending cursor to allow a potential deadlock to be avoided. There is more work to be done here but this will at least prevent a deadlock from occurring. Fix and test based off of a patch created by Timothy Bish. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc6213eb Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc6213eb Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc6213eb Branch: refs/heads/master Commit: cc6213ebf25a129b278a2ff0d7c32c25edd71eaa Parents: b5dd0a1 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Fri Nov 20 20:45:38 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Fri Nov 20 20:58:27 2015 +0000 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 72 +++--- .../cursors/AbstractPendingMessageCursor.java | 90 ++++++-- .../region/cursors/AbstractStoreCursor.java | 4 +- .../cursors/FilePendingMessageCursor.java | 5 - .../region/cursors/PendingMessageCursor.java | 2 + .../cursors/StoreDurableSubscriberCursor.java | 2 +- .../broker/region/cursors/StoreQueueCursor.java | 4 +- .../region/cursors/VMPendingMessageCursor.java | 2 +- .../org/apache/activemq/bugs/AMQ5712Test.java | 231 +++++++++++++++++++ 9 files changed, 352 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index e6c20de..adc3a53 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -827,33 +827,38 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index ListenableFuture<Object> result = null; producerExchange.incrementSend(); - checkUsage(context, producerExchange, message); - sendLock.lockInterruptibly(); - try { - message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); - if (store != null && message.isPersistent()) { - message.getMessageId().setFutureOrSequenceLong(null); - try { - if (messages.isCacheEnabled()) { - result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); - result.addListener(new PendingMarshalUsageTracker(message)); - } else { - store.addMessage(context, message); - } - if (isReduceMemoryFootprint()) { - message.clearMarshalledState(); + do { + checkUsage(context, producerExchange, message); + sendLock.lockInterruptibly(); + try { + message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); + if (store != null && message.isPersistent()) { + message.getMessageId().setFutureOrSequenceLong(null); + try { + if (messages.isCacheEnabled()) { + result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); + result.addListener(new PendingMarshalUsageTracker(message)); + } else { + store.addMessage(context, message); + } + if (isReduceMemoryFootprint()) { + message.clearMarshalledState(); + } + } catch (Exception e) { + // we may have a store in inconsistent state, so reset the cursor + // before restarting normal broker operations + resetNeeded = true; + throw e; } - } catch (Exception e) { - // we may have a store in inconsistent state, so reset the cursor - // before restarting normal broker operations - resetNeeded = true; - throw e; } + if(tryOrderedCursorAdd(message, context)) { + break; + } + } finally { + sendLock.unlock(); } - orderedCursorAdd(message, context); - } finally { - sendLock.unlock(); - } + } while (started.get()); + if (store == null || (!context.isInTransaction() && !message.isPersistent())) { messageSent(context, message); } @@ -867,15 +872,19 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } - private void orderedCursorAdd(Message message, ConnectionContext context) throws Exception { + private boolean tryOrderedCursorAdd(Message message, ConnectionContext context) throws Exception { + boolean result = true; + if (context.isInTransaction()) { context.getTransaction().addSynchronization(new CursorAddSync(new MessageContext(context, message, null))); } else if (store != null && message.isPersistent()) { doPendingCursorAdditions(); } else { // no ordering issue with non persistent messages - cursorAdd(message); + result = tryCursorAdd(message); } + + return result; } private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException { @@ -1813,7 +1822,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } - final boolean cursorAdd(final Message msg) throws Exception { + private final boolean cursorAdd(final Message msg) throws Exception { messagesLock.writeLock().lock(); try { return messages.addMessageLast(msg); @@ -1822,6 +1831,15 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } + private final boolean tryCursorAdd(final Message msg) throws Exception { + messagesLock.writeLock().lock(); + try { + return messages.tryAddMessageLast(msg, 50); + } finally { + messagesLock.writeLock().unlock(); + } + } + final void messageSent(final ConnectionContext context, final Message msg) throws Exception { destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 12ea104..0857482 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -33,8 +33,8 @@ import org.apache.activemq.usage.SystemUsage; /** * Abstract method holder for pending message (messages awaiting disptach to a * consumer) cursor - * - * + * + * */ public abstract class AbstractPendingMessageCursor implements PendingMessageCursor { protected int memoryUsageHighWaterMark = 70; @@ -49,12 +49,13 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs private boolean started=false; protected MessageReference last = null; protected final boolean prioritizedMessages; - + public AbstractPendingMessageCursor(boolean prioritizedMessages) { this.prioritizedMessages=prioritizedMessages; } - + + @Override public synchronized void start() throws Exception { if (!started && enableAudit && audit==null) { audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); @@ -62,71 +63,89 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs started=true; } + @Override public synchronized void stop() throws Exception { started=false; gc(); } + @Override public void add(ConnectionContext context, Destination destination) throws Exception { } + @Override @SuppressWarnings("unchecked") public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { return Collections.EMPTY_LIST; } + @Override public boolean isRecoveryRequired() { return true; } + @Override public void addMessageFirst(MessageReference node) throws Exception { } + @Override public boolean addMessageLast(MessageReference node) throws Exception { - return true; + return tryAddMessageLast(node, INFINITE_WAIT); } - + + @Override public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { - return addMessageLast(node); + return true; } + @Override public void addRecoveredMessage(MessageReference node) throws Exception { addMessageLast(node); } + @Override public void clear() { } + @Override public boolean hasNext() { return false; } + @Override public boolean isEmpty() { return false; } + @Override public boolean isEmpty(Destination destination) { return isEmpty(); } + @Override public MessageReference next() { return null; } + @Override public void remove() { } + @Override public void reset() { } + @Override public int size() { return 0; } + @Override public int getMaxBatchSize() { return maxBatchSize; } + @Override public void setMaxBatchSize(int maxBatchSize) { this.maxBatchSize = maxBatchSize; } @@ -134,31 +153,39 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs protected void fillBatch() throws Exception { } + @Override public void resetForGC() { reset(); } + @Override public void remove(MessageReference node) { } + @Override public void gc() { } + @Override public void setSystemUsage(SystemUsage usageManager) { this.systemUsage = usageManager; } + @Override public boolean hasSpace() { return systemUsage != null ? (!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true; } + @Override public boolean isFull() { return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false; } + @Override public void release() { } + @Override public boolean hasMessagesBufferedToDeliver() { return false; } @@ -166,6 +193,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @return the memoryUsageHighWaterMark */ + @Override public int getMemoryUsageHighWaterMark() { return memoryUsageHighWaterMark; } @@ -173,6 +201,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set */ + @Override public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; } @@ -180,25 +209,28 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @return the usageManager */ + @Override public SystemUsage getSystemUsage() { return this.systemUsage; } /** * destroy the cursor - * + * * @throws Exception */ + @Override public void destroy() throws Exception { stop(); } /** * Page in a restricted number of messages - * + * * @param maxItems maximum number of messages to return * @return a list of paged in messages */ + @Override public LinkedList<MessageReference> pageInList(int maxItems) { throw new RuntimeException("Not supported"); } @@ -206,6 +238,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @return the maxProducersToAudit */ + @Override public int getMaxProducersToAudit() { return maxProducersToAudit; } @@ -213,6 +246,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @param maxProducersToAudit the maxProducersToAudit to set */ + @Override public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { this.maxProducersToAudit = maxProducersToAudit; if (audit != null) { @@ -223,25 +257,28 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @return the maxAuditDepth */ + @Override public int getMaxAuditDepth() { return maxAuditDepth; } - + /** * @param maxAuditDepth the maxAuditDepth to set */ + @Override public synchronized void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; if (audit != null) { audit.setAuditDepth(maxAuditDepth); } } - - + + /** * @return the enableAudit */ + @Override public boolean isEnableAudit() { return enableAudit; } @@ -249,38 +286,44 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs /** * @param enableAudit the enableAudit to set */ + @Override public synchronized void setEnableAudit(boolean enableAudit) { this.enableAudit = enableAudit; if (enableAudit && started && audit==null) { audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); } } - + + @Override public boolean isTransient() { return false; } - - + + /** * set the audit * @param audit new audit component */ + @Override public void setMessageAudit(ActiveMQMessageAudit audit) { this.audit=audit; } - - + + /** * @return the audit */ + @Override public ActiveMQMessageAudit getMessageAudit() { return audit; } - + + @Override public boolean isUseCache() { return useCache; } + @Override public void setUseCache(boolean useCache) { this.useCache = useCache; } @@ -290,7 +333,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs rollback(messageId); return !unique; } - + /** * records a message id and checks if it is a duplicate * @param messageId @@ -302,17 +345,18 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs } return !audit.isDuplicate(messageId); } - + + @Override public synchronized void rollback(MessageId id) { if (audit != null) { audit.rollback(id); } } - + public synchronized boolean isStarted() { return started; } - + public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) { boolean result = false; Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination()); @@ -328,6 +372,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs } + @Override public synchronized boolean isCacheEnabled() { return cacheEnabled; } @@ -336,6 +381,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs cacheEnabled = val; } + @Override public void rebase() { } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index 05e4b1f..c6cca59 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -44,7 +44,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i private Iterator<MessageReference> iterator = null; protected boolean batchResetNeeded = false; protected int size; - private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); + private final LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); private static int SYNC_ADD = 0; private static int ASYNC_ADD = 1; final MessageId[] lastCachedIds = new MessageId[2]; @@ -210,7 +210,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i } @Override - public synchronized boolean addMessageLast(MessageReference node) throws Exception { + public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception { boolean disableCache = false; if (hasSpace()) { if (!isCacheEnabled() && size==0 && isStarted() && useCache) { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 3f3f33b..9c9a8e7 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -203,11 +203,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple * @throws Exception */ @Override - public synchronized boolean addMessageLast(MessageReference node) throws Exception { - return tryAddMessageLast(node, 0); - } - - @Override public synchronized boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { if (!node.isExpired()) { try { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index bf7fd7a..6359635 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -35,6 +35,8 @@ import org.apache.activemq.usage.SystemUsage; */ public interface PendingMessageCursor extends Service { + static final long INFINITE_WAIT = 0; + /** * Add a destination * http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 9d723b8..269bde3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -183,7 +183,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { } @Override - public synchronized boolean addMessageLast(MessageReference node) throws Exception { + public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws Exception { if (node != null) { Message msg = node.getMessage(); if (isStarted()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index caa93b6..7f26b43 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -90,14 +90,14 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } @Override - public synchronized boolean addMessageLast(MessageReference node) throws Exception { + public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) throws Exception { boolean result = true; if (node != null) { Message msg = node.getMessage(); if (started) { pendingCount++; if (!msg.isPersistent()) { - nonPersistent.addMessageLast(node); + result = nonPersistent.tryAddMessageLast(node, maxWait); } } if (msg.isPersistent()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java index 75be766..cd4da9d 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java @@ -102,7 +102,7 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor { */ @Override - public synchronized boolean addMessageLast(MessageReference node) { + public synchronized boolean tryAddMessageLast(MessageReference node, long maxWait) { node.incrementReferenceCount(); list.addMessageLast(node); return true; http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java new file mode 100644 index 0000000..4a396a4 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.store.kahadb.plist.PListStoreImpl; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test behavior of senders when broker side producer flow control kicks in. + */ +public class AMQ5712Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ5712Test.class); + + @Rule public TestName name = new TestName(); + + private BrokerService brokerService; + private Connection connection; + + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + try { + connection.close(); + } catch (Exception e) {} + } + + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + brokerService = null; + } + } + + private Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false"); + factory.setAlwaysSyncSend(true); + return factory.createConnection(); + } + + @Test(timeout = 120000) + public void test() throws Exception { + connection = createConnection(); + connection.start(); + + final int MSG_COUNT = 100; + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + final QueueViewMBean queueView = getProxyToQueue(name.getMethodName()); + + byte[] payload = new byte[65535]; + Arrays.fill(payload, (byte) 255); + final CountDownLatch done = new CountDownLatch(1); + final AtomicInteger counter = new AtomicInteger(); + + Thread purge = new Thread(new Runnable() { + + @Override + public void run() { + try { + while (!done.await(5, TimeUnit.SECONDS)) { + if (queueView.getBlockedSends() > 0 && queueView.getQueueSize() > 0) { + long queueSize = queueView.getQueueSize(); + LOG.info("Queue send blocked at {} messages", queueSize); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < queueSize; i++) { + Message message = consumer.receive(60000); + if (message != null) { + counter.incrementAndGet(); + message.acknowledge(); + } else { + LOG.warn("Got null message when none as expected."); + } + } + consumer.close(); + } + } + } catch (Exception ex) { + } + } + }); + purge.start(); + + for (int i = 0; i < MSG_COUNT; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload); + producer.send(message); + LOG.info("sent message: {}", i); + } + + done.countDown(); + purge.join(60000); + if (purge.isAlive()) { + fail("Consumer thread should have read initial batch and completed."); + } + + //wait for processed acked messages + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return queueView.getDequeueCount() == counter.get(); + } + })); + + long remainingQueued = queueView.getQueueSize(); + LOG.info("Remaining messages to consume: {}", remainingQueued); + assertEquals(remainingQueued, MSG_COUNT - counter.get()); + + MessageConsumer consumer = session.createConsumer(queue); + for (int i = counter.get(); i < MSG_COUNT; i++) { + Message message = consumer.receive(5000); + assertNotNull("Should not get null message", consumer); + counter.incrementAndGet(); + message.acknowledge(); + LOG.info("Read message: {}", i); + } + + assertEquals("Should consume all messages", MSG_COUNT, counter.get()); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + + KahaDBStore persistence = createStore(true); + persistence.setJournalMaxFileLength(1024 * 1024 * 1); + + answer.setPersistent(true); + answer.setPersistenceAdapter(persistence); + answer.setDeleteAllMessagesOnStartup(true); + answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 6); + answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5); + answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 5); + answer.setUseJmx(true); + answer.getManagementContext().setCreateConnector(false); + answer.setSchedulerSupport(false); + answer.setAdvisorySupport(false); + + PListStoreImpl tempStore = ((PListStoreImpl)answer.getSystemUsage().getTempUsage().getStore()); + tempStore.setCleanupInterval(10000); + tempStore.setJournalMaxFileLength(1024 * 1024 * 2); + + PolicyEntry policy = new PolicyEntry(); + policy.setProducerFlowControl(false); + + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policy); + + answer.setDestinationPolicy(policyMap); + + return answer; + } + + private KahaDBStore createStore(boolean delete) throws IOException { + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + if( delete ) { + kaha.deleteAllMessages(); + } + return kaha; + } + + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } +} \ No newline at end of file