This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c83fce8db44881ee982e704b53fcc9565a1490ad
Author: Francesco Nigro <nigro....@gmail.com>
AuthorDate: Mon Mar 11 14:04:35 2019 +0100

    ARTEMIS-1604 Artemis deadlock using MQTT Protocol
    
    Direct and async deliveries lock QueueImpl::this and
    ServerConsumerImpl::this in different order causing deadlock:
    has been introduced a deliverLock to prevent both type of delivers
    to concurrently happen, making irrelevant the lock ordering.
---
 .../artemis/core/server/impl/QueueImpl.java        | 100 +++++++++++++--------
 1 file changed, 62 insertions(+), 38 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 57c33ad..ee76362 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -210,6 +211,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private final Runnable deliverRunner = new DeliverRunner();
 
+   //This lock is used to prevent deadlocks between direct and async deliveries
+   private final ReentrantLock deliverLock = new ReentrantLock();
+
    private volatile boolean depagePending = false;
 
    private final StorageManager storageManager;
@@ -881,7 +885,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             return;
          }
 
-         if (supportsDirectDeliver && !directDeliver && direct && 
System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) 
{
+         if (direct && supportsDirectDeliver && !directDeliver && 
System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) 
{
             if (logger.isTraceEnabled()) {
                logger.trace("Checking to re-enable direct deliver on queue " + 
this.getName());
             }
@@ -3069,57 +3073,74 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
     * This method delivers the reference on the callers thread - this can give 
us better latency in the case there is nothing in the queue
     */
    private boolean deliverDirect(final MessageReference ref) {
-      synchronized (this) {
-         if (!supportsDirectDeliver) {
-            return false;
-         }
-         if (paused || !canDispatch() && redistributor == null) {
-            return false;
-         }
+      //The order to enter the deliverLock re QueueImpl::this lock is very 
important:
+      //- acquire deliverLock::lock
+      //- acquire QueueImpl::this lock
+      //DeliverRunner::run is doing the same to avoid deadlocks.
+      //Without deliverLock, a directDeliver happening while a 
DeliverRunner::run
+      //could cause a deadlock.
+      //Both DeliverRunner::run and deliverDirect could trigger a 
ServerConsumerImpl::individualAcknowledge:
+      //- deliverDirect first acquire QueueImpl::this, then 
ServerConsumerImpl::this
+      //- DeliverRunner::run first acquire ServerConsumerImpl::this then 
QueueImpl::this
+      if (!deliverLock.tryLock()) {
+         logger.tracef("Cannot perform a directDelivery because there is a 
running async deliver");
+         return false;
+      }
+      try {
+         synchronized (this) {
+            if (!supportsDirectDeliver) {
+               return false;
+            }
+            if (paused || !canDispatch() && redistributor == null) {
+               return false;
+            }
 
-         if (checkExpired(ref)) {
-            return true;
-         }
+            if (checkExpired(ref)) {
+               return true;
+            }
 
-         consumers.reset();
+            consumers.reset();
 
-         while (consumers.hasNext() || redistributor != null) {
+            while (consumers.hasNext() || redistributor != null) {
 
-            ConsumerHolder<? extends Consumer> holder = redistributor == null 
? consumers.next() : redistributor;
-            Consumer consumer = holder.consumer;
+               ConsumerHolder<? extends Consumer> holder = redistributor == 
null ? consumers.next() : redistributor;
+               Consumer consumer = holder.consumer;
 
-            final SimpleString groupID = extractGroupID(ref);
-            Consumer groupConsumer = getGroupConsumer(groupID);
+               final SimpleString groupID = extractGroupID(ref);
+               Consumer groupConsumer = getGroupConsumer(groupID);
 
-            if (groupConsumer != null) {
-               consumer = groupConsumer;
-            }
+               if (groupConsumer != null) {
+                  consumer = groupConsumer;
+               }
 
-            HandleStatus status = handle(ref, consumer);
+               HandleStatus status = handle(ref, consumer);
 
-            if (status == HandleStatus.HANDLED) {
+               if (status == HandleStatus.HANDLED) {
 
-               if (redistributor == null) {
-                  handleMessageGroup(ref, consumer, groupConsumer, groupID);
-               }
+                  if (redistributor == null) {
+                     handleMessageGroup(ref, consumer, groupConsumer, groupID);
+                  }
 
-               messagesAdded.incrementAndGet();
+                  messagesAdded.incrementAndGet();
 
-               deliveriesInTransit.countUp();
-               proceedDeliver(consumer, ref);
-               consumers.reset();
-               return true;
-            }
+                  deliveriesInTransit.countUp();
+                  proceedDeliver(consumer, ref);
+                  consumers.reset();
+                  return true;
+               }
 
-            if (redistributor != null || groupConsumer != null) {
-               break;
+               if (redistributor != null || groupConsumer != null) {
+                  break;
+               }
             }
-         }
 
-         if (logger.isTraceEnabled()) {
-            logger.tracef("Queue " + getName() + " is out of direct delivery 
as no consumers handled a delivery");
+            if (logger.isTraceEnabled()) {
+               logger.tracef("Queue " + getName() + " is out of direct 
delivery as no consumers handled a delivery");
+            }
+            return false;
          }
-         return false;
+      } finally {
+         deliverLock.unlock();
       }
    }
 
@@ -3464,8 +3485,11 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             enterCritical(CRITICAL_DELIVER);
             boolean needCheckDepage = false;
             try {
-               synchronized (QueueImpl.this.deliverRunner) {
+               deliverLock.lock();
+               try {
                   needCheckDepage = deliver();
+               } finally {
+                  deliverLock.unlock();
                }
             } finally {
                leaveCritical(CRITICAL_DELIVER);

Reply via email to