This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new fafbd7e  ARTEMIS-1604 Artemis deadlock using MQTT Protocol
     new 201d76b  This closes #2580
fafbd7e is described below

commit fafbd7e2e5953e03573088577be620828cd77bc5
Author: Michael André Pearce <michael.andre.pea...@me.com>
AuthorDate: Tue Mar 12 19:53:07 2019 +0000

    ARTEMIS-1604 Artemis deadlock using MQTT Protocol
    
    Address code review comment not address when PR was merged.
---
 .../artemis/core/server/impl/QueueImpl.java        | 80 ++++++++++++----------
 1 file changed, 42 insertions(+), 38 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7e736ca..47cd68e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3094,60 +3094,64 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
          return false;
       }
       try {
-         synchronized (this) {
-            if (!supportsDirectDeliver) {
-               return false;
-            }
-            if (paused || !canDispatch() && redistributor == null) {
-               return false;
-            }
-
-            if (checkExpired(ref)) {
-               return true;
-            }
+         return deliver(ref);
+      } finally {
+         deliverLock.unlock();
+      }
+   }
 
-            consumers.reset();
+   private boolean deliver(final MessageReference ref) {
+      synchronized (this) {
+         if (!supportsDirectDeliver) {
+            return false;
+         }
+         if (paused || !canDispatch() && redistributor == null) {
+            return false;
+         }
 
-            while (consumers.hasNext() || redistributor != null) {
+         if (checkExpired(ref)) {
+            return true;
+         }
 
-               ConsumerHolder<? extends Consumer> holder = redistributor == 
null ? consumers.next() : redistributor;
-               Consumer consumer = holder.consumer;
+         consumers.reset();
 
-               final SimpleString groupID = extractGroupID(ref);
-               Consumer groupConsumer = getGroupConsumer(groupID);
+         while (consumers.hasNext() || redistributor != null) {
 
-               if (groupConsumer != null) {
-                  consumer = groupConsumer;
-               }
+            ConsumerHolder<? extends Consumer> holder = redistributor == null 
? consumers.next() : redistributor;
+            Consumer consumer = holder.consumer;
 
-               HandleStatus status = handle(ref, consumer);
+            final SimpleString groupID = extractGroupID(ref);
+            Consumer groupConsumer = getGroupConsumer(groupID);
 
-               if (status == HandleStatus.HANDLED) {
+            if (groupConsumer != null) {
+               consumer = groupConsumer;
+            }
 
-                  if (redistributor == null) {
-                     handleMessageGroup(ref, consumer, groupConsumer, groupID);
-                  }
+            HandleStatus status = handle(ref, consumer);
 
-                  messagesAdded.incrementAndGet();
+            if (status == HandleStatus.HANDLED) {
 
-                  deliveriesInTransit.countUp();
-                  proceedDeliver(consumer, ref);
-                  consumers.reset();
-                  return true;
+               if (redistributor == null) {
+                  handleMessageGroup(ref, consumer, groupConsumer, groupID);
                }
 
-               if (redistributor != null || groupConsumer != null) {
-                  break;
-               }
+               messagesAdded.incrementAndGet();
+
+               deliveriesInTransit.countUp();
+               proceedDeliver(consumer, ref);
+               consumers.reset();
+               return true;
             }
 
-            if (logger.isTraceEnabled()) {
-               logger.tracef("Queue " + getName() + " is out of direct 
delivery as no consumers handled a delivery");
+            if (redistributor != null || groupConsumer != null) {
+               break;
             }
-            return false;
          }
-      } finally {
-         deliverLock.unlock();
+
+         if (logger.isTraceEnabled()) {
+            logger.tracef("Queue " + getName() + " is out of direct delivery 
as no consumers handled a delivery");
+         }
+         return false;
       }
    }
 

Reply via email to