Repository: activemq
Updated Branches:
  refs/heads/master b5dd0a16f -> cc6213ebf


https://issues.apache.org/jira/browse/AMQ-5712

Switching addMessageLast to tryAddMessageLast when messages are added
to a Queue pending cursor to allow a potential deadlock to be
avoided. There is more work to be done here but this will at least
prevent a deadlock from occurring.

Fix and test based off of a patch created by Timothy Bish.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc6213eb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc6213eb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc6213eb

Branch: refs/heads/master
Commit: cc6213ebf25a129b278a2ff0d7c32c25edd71eaa
Parents: b5dd0a1
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Nov 20 20:45:38 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Fri Nov 20 20:58:27 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  72 +++---
 .../cursors/AbstractPendingMessageCursor.java   |  90 ++++++--
 .../region/cursors/AbstractStoreCursor.java     |   4 +-
 .../cursors/FilePendingMessageCursor.java       |   5 -
 .../region/cursors/PendingMessageCursor.java    |   2 +
 .../cursors/StoreDurableSubscriberCursor.java   |   2 +-
 .../broker/region/cursors/StoreQueueCursor.java |   4 +-
 .../region/cursors/VMPendingMessageCursor.java  |   2 +-
 .../org/apache/activemq/bugs/AMQ5712Test.java   | 231 +++++++++++++++++++
 9 files changed, 352 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e6c20de..adc3a53 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -827,33 +827,38 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         ListenableFuture<Object> result = null;
 
         producerExchange.incrementSend();
-        checkUsage(context, producerExchange, message);
-        sendLock.lockInterruptibly();
-        try {
-            
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
-            if (store != null && message.isPersistent()) {
-                message.getMessageId().setFutureOrSequenceLong(null);
-                try {
-                    if (messages.isCacheEnabled()) {
-                        result = store.asyncAddQueueMessage(context, message, 
isOptimizeStorage());
-                        result.addListener(new 
PendingMarshalUsageTracker(message));
-                    } else {
-                        store.addMessage(context, message);
-                    }
-                    if (isReduceMemoryFootprint()) {
-                        message.clearMarshalledState();
+        do {
+            checkUsage(context, producerExchange, message);
+            sendLock.lockInterruptibly();
+            try {
+                
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
+                if (store != null && message.isPersistent()) {
+                    message.getMessageId().setFutureOrSequenceLong(null);
+                    try {
+                        if (messages.isCacheEnabled()) {
+                            result = store.asyncAddQueueMessage(context, 
message, isOptimizeStorage());
+                            result.addListener(new 
PendingMarshalUsageTracker(message));
+                        } else {
+                            store.addMessage(context, message);
+                        }
+                        if (isReduceMemoryFootprint()) {
+                            message.clearMarshalledState();
+                        }
+                    } catch (Exception e) {
+                        // we may have a store in inconsistent state, so reset 
the cursor
+                        // before restarting normal broker operations
+                        resetNeeded = true;
+                        throw e;
                     }
-                } catch (Exception e) {
-                    // we may have a store in inconsistent state, so reset the 
cursor
-                    // before restarting normal broker operations
-                    resetNeeded = true;
-                    throw e;
                 }
+                if(tryOrderedCursorAdd(message, context)) {
+                    break;
+                }
+            } finally {
+                sendLock.unlock();
             }
-            orderedCursorAdd(message, context);
-        } finally {
-            sendLock.unlock();
-        }
+        } while (started.get());
+
         if (store == null || (!context.isInTransaction() && 
!message.isPersistent())) {
             messageSent(context, message);
         }
@@ -867,15 +872,19 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         }
     }
 
-    private void orderedCursorAdd(Message message, ConnectionContext context) 
throws Exception {
+    private boolean tryOrderedCursorAdd(Message message, ConnectionContext 
context) throws Exception {
+        boolean result = true;
+
         if (context.isInTransaction()) {
             context.getTransaction().addSynchronization(new CursorAddSync(new 
MessageContext(context, message, null)));
         } else if (store != null && message.isPersistent()) {
             doPendingCursorAdditions();
         } else {
             // no ordering issue with non persistent messages
-            cursorAdd(message);
+            result = tryCursorAdd(message);
         }
+
+        return result;
     }
 
     private void checkUsage(ConnectionContext context,ProducerBrokerExchange 
producerBrokerExchange, Message message) throws ResourceAllocationException, 
IOException, InterruptedException {
@@ -1813,7 +1822,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         }
     }
 
-    final boolean cursorAdd(final Message msg) throws Exception {
+    private final boolean cursorAdd(final Message msg) throws Exception {
         messagesLock.writeLock().lock();
         try {
             return messages.addMessageLast(msg);
@@ -1822,6 +1831,15 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         }
     }
 
+    private final boolean tryCursorAdd(final Message msg) throws Exception {
+        messagesLock.writeLock().lock();
+        try {
+            return messages.tryAddMessageLast(msg, 50);
+        } finally {
+            messagesLock.writeLock().unlock();
+        }
+    }
+
     final void messageSent(final ConnectionContext context, final Message msg) 
throws Exception {
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
index 12ea104..0857482 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
@@ -33,8 +33,8 @@ import org.apache.activemq.usage.SystemUsage;
 /**
  * Abstract method holder for pending message (messages awaiting disptach to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public abstract class AbstractPendingMessageCursor implements 
PendingMessageCursor {
     protected int memoryUsageHighWaterMark = 70;
@@ -49,12 +49,13 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     private boolean started=false;
     protected MessageReference last = null;
     protected final boolean prioritizedMessages;
-    
+
     public AbstractPendingMessageCursor(boolean prioritizedMessages) {
         this.prioritizedMessages=prioritizedMessages;
     }
-  
 
+
+    @Override
     public synchronized void start() throws Exception  {
         if (!started && enableAudit && audit==null) {
             audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
@@ -62,71 +63,89 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
         started=true;
     }
 
+    @Override
     public synchronized void stop() throws Exception  {
         started=false;
         gc();
     }
 
+    @Override
     public void add(ConnectionContext context, Destination destination) throws 
Exception {
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public List<MessageReference> remove(ConnectionContext context, 
Destination destination) throws Exception {
         return Collections.EMPTY_LIST;
     }
 
+    @Override
     public boolean isRecoveryRequired() {
         return true;
     }
 
+    @Override
     public void addMessageFirst(MessageReference node) throws Exception {
     }
 
+    @Override
     public boolean addMessageLast(MessageReference node) throws Exception {
-        return true;
+        return tryAddMessageLast(node, INFINITE_WAIT);
     }
-    
+
+    @Override
     public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) 
throws Exception {
-        return addMessageLast(node);
+        return true;
     }
 
+    @Override
     public void addRecoveredMessage(MessageReference node) throws Exception {
         addMessageLast(node);
     }
 
+    @Override
     public void clear() {
     }
 
+    @Override
     public boolean hasNext() {
         return false;
     }
 
+    @Override
     public boolean isEmpty() {
         return false;
     }
 
+    @Override
     public boolean isEmpty(Destination destination) {
         return isEmpty();
     }
 
+    @Override
     public MessageReference next() {
         return null;
     }
 
+    @Override
     public void remove() {
     }
 
+    @Override
     public void reset() {
     }
 
+    @Override
     public int size() {
         return 0;
     }
 
+    @Override
     public int getMaxBatchSize() {
         return maxBatchSize;
     }
 
+    @Override
     public void setMaxBatchSize(int maxBatchSize) {
         this.maxBatchSize = maxBatchSize;
     }
@@ -134,31 +153,39 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     protected void fillBatch() throws Exception {
     }
 
+    @Override
     public void resetForGC() {
         reset();
     }
 
+    @Override
     public void remove(MessageReference node) {
     }
 
+    @Override
     public void gc() {
     }
 
+    @Override
     public void setSystemUsage(SystemUsage usageManager) {
         this.systemUsage = usageManager;
     }
 
+    @Override
     public boolean hasSpace() {
         return systemUsage != null ? 
(!systemUsage.getMemoryUsage().isFull(memoryUsageHighWaterMark)) : true;
     }
 
+    @Override
     public boolean isFull() {
         return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : 
false;
     }
 
+    @Override
     public void release() {
     }
 
+    @Override
     public boolean hasMessagesBufferedToDeliver() {
         return false;
     }
@@ -166,6 +193,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @return the memoryUsageHighWaterMark
      */
+    @Override
     public int getMemoryUsageHighWaterMark() {
         return memoryUsageHighWaterMark;
     }
@@ -173,6 +201,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
      */
+    @Override
     public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
         this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
     }
@@ -180,25 +209,28 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @return the usageManager
      */
+    @Override
     public SystemUsage getSystemUsage() {
         return this.systemUsage;
     }
 
     /**
      * destroy the cursor
-     * 
+     *
      * @throws Exception
      */
+    @Override
     public void destroy() throws Exception {
         stop();
     }
 
     /**
      * Page in a restricted number of messages
-     * 
+     *
      * @param maxItems maximum number of messages to return
      * @return a list of paged in messages
      */
+    @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
         throw new RuntimeException("Not supported");
     }
@@ -206,6 +238,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @return the maxProducersToAudit
      */
+    @Override
     public int getMaxProducersToAudit() {
         return maxProducersToAudit;
     }
@@ -213,6 +246,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @param maxProducersToAudit the maxProducersToAudit to set
      */
+    @Override
     public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
         this.maxProducersToAudit = maxProducersToAudit;
         if (audit != null) {
@@ -223,25 +257,28 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @return the maxAuditDepth
      */
+    @Override
     public int getMaxAuditDepth() {
         return maxAuditDepth;
     }
-    
+
 
     /**
      * @param maxAuditDepth the maxAuditDepth to set
      */
+    @Override
     public synchronized void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
         if (audit != null) {
             audit.setAuditDepth(maxAuditDepth);
         }
     }
-    
-    
+
+
     /**
      * @return the enableAudit
      */
+    @Override
     public boolean isEnableAudit() {
         return enableAudit;
     }
@@ -249,38 +286,44 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
     /**
      * @param enableAudit the enableAudit to set
      */
+    @Override
     public synchronized void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
         if (enableAudit && started && audit==null) {
             audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
         }
     }
-    
+
+    @Override
     public boolean isTransient() {
         return false;
     }
-    
-       
+
+
     /**
      * set the audit
      * @param audit new audit component
      */
+    @Override
     public void setMessageAudit(ActiveMQMessageAudit audit) {
        this.audit=audit;
     }
-    
-    
+
+
     /**
      * @return the audit
      */
+    @Override
     public ActiveMQMessageAudit getMessageAudit() {
        return audit;
     }
-    
+
+    @Override
     public boolean isUseCache() {
         return useCache;
     }
 
+    @Override
     public void setUseCache(boolean useCache) {
         this.useCache = useCache;
     }
@@ -290,7 +333,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
         rollback(messageId);
         return !unique;
     }
-    
+
     /**
      * records a message id and checks if it is a duplicate
      * @param messageId
@@ -302,17 +345,18 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
         }
         return !audit.isDuplicate(messageId);
     }
-    
+
+    @Override
     public synchronized void rollback(MessageId id) {
         if (audit != null) {
             audit.rollback(id);
         }
     }
-    
+
     public synchronized boolean isStarted() {
         return started;
     }
-    
+
     public static boolean isPrioritizedMessageSubscriber(Broker 
broker,Subscription sub) {
         boolean result = false;
         Set<Destination> destinations = 
broker.getDestinations(sub.getActiveMQDestination());
@@ -328,6 +372,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
 
     }
 
+    @Override
     public synchronized boolean isCacheEnabled() {
         return cacheEnabled;
     }
@@ -336,6 +381,7 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
         cacheEnabled = val;
     }
 
+    @Override
     public void rebase() {
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 05e4b1f..c6cca59 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -44,7 +44,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     private Iterator<MessageReference> iterator = null;
     protected boolean batchResetNeeded = false;
     protected int size;
-    private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
+    private final LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
     private static int SYNC_ADD = 0;
     private static int ASYNC_ADD = 1;
     final MessageId[] lastCachedIds = new MessageId[2];
@@ -210,7 +210,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
     @Override
-    public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
+    public synchronized boolean tryAddMessageLast(MessageReference node, long 
wait) throws Exception {
         boolean disableCache = false;
         if (hasSpace()) {
             if (!isCacheEnabled() && size==0 && isStarted() && useCache) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 3f3f33b..9c9a8e7 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -203,11 +203,6 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
      * @throws Exception
      */
     @Override
-    public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
-        return tryAddMessageLast(node, 0);
-    }
-
-    @Override
     public synchronized boolean tryAddMessageLast(MessageReference node, long 
maxWaitTime) throws Exception {
         if (!node.isExpired()) {
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index bf7fd7a..6359635 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -35,6 +35,8 @@ import org.apache.activemq.usage.SystemUsage;
  */
 public interface PendingMessageCursor extends Service {
 
+    static final long INFINITE_WAIT = 0;
+
     /**
      * Add a destination
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 9d723b8..269bde3 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -183,7 +183,7 @@ public class StoreDurableSubscriberCursor extends 
AbstractPendingMessageCursor {
     }
 
     @Override
-    public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
+    public synchronized boolean tryAddMessageLast(MessageReference node, long 
wait) throws Exception {
         if (node != null) {
             Message msg = node.getMessage();
             if (isStarted()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index caa93b6..7f26b43 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -90,14 +90,14 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
     }
 
     @Override
-    public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
+    public synchronized boolean tryAddMessageLast(MessageReference node, long 
maxWait) throws Exception {
         boolean result = true;
         if (node != null) {
             Message msg = node.getMessage();
             if (started) {
                 pendingCount++;
                 if (!msg.isPersistent()) {
-                    nonPersistent.addMessageLast(node);
+                    result = nonPersistent.tryAddMessageLast(node, maxWait);
                 }
             }
             if (msg.isPersistent()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
index 75be766..cd4da9d 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
@@ -102,7 +102,7 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
      */
 
     @Override
-    public synchronized boolean addMessageLast(MessageReference node) {
+    public synchronized boolean tryAddMessageLast(MessageReference node, long 
maxWait) {
         node.incrementReferenceCount();
         list.addMessageLast(node);
         return true;

http://git-wip-us.apache.org/repos/asf/activemq/blob/cc6213eb/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java
new file mode 100644
index 0000000..4a396a4
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5712Test.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test behavior of senders when broker side producer flow control kicks in.
+ */
+public class AMQ5712Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ5712Test.class);
+
+    @Rule public TestName name = new TestName();
+
+    private BrokerService brokerService;
+    private Connection connection;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {}
+        }
+
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+            brokerService = null;
+        }
+    }
+
+    private Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost?create=false");
+        factory.setAlwaysSyncSend(true);
+        return factory.createConnection();
+    }
+
+    @Test(timeout = 120000)
+    public void test() throws Exception {
+        connection = createConnection();
+        connection.start();
+
+        final int MSG_COUNT = 100;
+
+        final Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        final Queue queue = session.createQueue(name.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        final QueueViewMBean queueView = getProxyToQueue(name.getMethodName());
+
+        byte[] payload = new byte[65535];
+        Arrays.fill(payload, (byte) 255);
+        final CountDownLatch done = new CountDownLatch(1);
+        final AtomicInteger counter = new AtomicInteger();
+
+        Thread purge = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    while (!done.await(5, TimeUnit.SECONDS)) {
+                        if (queueView.getBlockedSends() > 0 && 
queueView.getQueueSize() > 0) {
+                            long queueSize = queueView.getQueueSize();
+                            LOG.info("Queue send blocked at {} messages", 
queueSize);
+                            MessageConsumer consumer = 
session.createConsumer(queue);
+                            for (int i = 0; i < queueSize; i++) {
+                                Message message = consumer.receive(60000);
+                                if (message != null) {
+                                    counter.incrementAndGet();
+                                    message.acknowledge();
+                                } else {
+                                    LOG.warn("Got null message when none as 
expected.");
+                                }
+                            }
+                            consumer.close();
+                        }
+                    }
+                } catch (Exception ex) {
+                }
+            }
+        });
+        purge.start();
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(payload);
+            producer.send(message);
+            LOG.info("sent message: {}", i);
+        }
+
+        done.countDown();
+        purge.join(60000);
+        if (purge.isAlive()) {
+            fail("Consumer thread should have read initial batch and 
completed.");
+        }
+
+        //wait for processed acked messages
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return queueView.getDequeueCount() == counter.get();
+            }
+        }));
+
+        long remainingQueued = queueView.getQueueSize();
+        LOG.info("Remaining messages to consume: {}", remainingQueued);
+        assertEquals(remainingQueued, MSG_COUNT - counter.get());
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        for (int i = counter.get(); i < MSG_COUNT; i++) {
+            Message message = consumer.receive(5000);
+            assertNotNull("Should not get null message", consumer);
+            counter.incrementAndGet();
+            message.acknowledge();
+            LOG.info("Read message: {}", i);
+        }
+
+        assertEquals("Should consume all messages", MSG_COUNT, counter.get());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+
+        KahaDBStore persistence = createStore(true);
+        persistence.setJournalMaxFileLength(1024 * 1024 * 1);
+
+        answer.setPersistent(true);
+        answer.setPersistenceAdapter(persistence);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 6);
+        answer.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5);
+        answer.getSystemUsage().getStoreUsage().setLimit(1024 * 1024 * 5);
+        answer.setUseJmx(true);
+        answer.getManagementContext().setCreateConnector(false);
+        answer.setSchedulerSupport(false);
+        answer.setAdvisorySupport(false);
+
+        PListStoreImpl tempStore = 
((PListStoreImpl)answer.getSystemUsage().getTempUsage().getStore());
+        tempStore.setCleanupInterval(10000);
+        tempStore.setJournalMaxFileLength(1024 * 1024 * 2);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setProducerFlowControl(false);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policy);
+
+        answer.setDestinationPolicy(policyMap);
+
+        return answer;
+    }
+
+    private KahaDBStore createStore(boolean delete) throws IOException {
+        KahaDBStore kaha = new KahaDBStore();
+        kaha.setDirectory(new File("target/activemq-data/kahadb"));
+        if( delete ) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws 
MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) 
brokerService.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, 
true);
+        return proxy;
+    }
+}
\ No newline at end of file

Reply via email to