This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c83fce8db44881ee982e704b53fcc9565a1490ad Author: Francesco Nigro <nigro....@gmail.com> AuthorDate: Mon Mar 11 14:04:35 2019 +0100 ARTEMIS-1604 Artemis deadlock using MQTT Protocol Direct and async deliveries lock QueueImpl::this and ServerConsumerImpl::this in different order causing deadlock: has been introduced a deliverLock to prevent both type of delivers to concurrently happen, making irrelevant the lock ordering. --- .../artemis/core/server/impl/QueueImpl.java | 100 +++++++++++++-------- 1 file changed, 62 insertions(+), 38 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 57c33ad..ee76362 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.locks.ReentrantLock; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -210,6 +211,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final Runnable deliverRunner = new DeliverRunner(); + //This lock is used to prevent deadlocks between direct and async deliveries + private final ReentrantLock deliverLock = new ReentrantLock(); + private volatile boolean depagePending = false; private final StorageManager storageManager; @@ -881,7 +885,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return; } - if (supportsDirectDeliver && !directDeliver && direct && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { + if (direct && supportsDirectDeliver && !directDeliver && System.currentTimeMillis() - lastDirectDeliveryCheck > CHECK_QUEUE_SIZE_PERIOD) { if (logger.isTraceEnabled()) { logger.trace("Checking to re-enable direct deliver on queue " + this.getName()); } @@ -3069,57 +3073,74 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { * This method delivers the reference on the callers thread - this can give us better latency in the case there is nothing in the queue */ private boolean deliverDirect(final MessageReference ref) { - synchronized (this) { - if (!supportsDirectDeliver) { - return false; - } - if (paused || !canDispatch() && redistributor == null) { - return false; - } + //The order to enter the deliverLock re QueueImpl::this lock is very important: + //- acquire deliverLock::lock + //- acquire QueueImpl::this lock + //DeliverRunner::run is doing the same to avoid deadlocks. + //Without deliverLock, a directDeliver happening while a DeliverRunner::run + //could cause a deadlock. + //Both DeliverRunner::run and deliverDirect could trigger a ServerConsumerImpl::individualAcknowledge: + //- deliverDirect first acquire QueueImpl::this, then ServerConsumerImpl::this + //- DeliverRunner::run first acquire ServerConsumerImpl::this then QueueImpl::this + if (!deliverLock.tryLock()) { + logger.tracef("Cannot perform a directDelivery because there is a running async deliver"); + return false; + } + try { + synchronized (this) { + if (!supportsDirectDeliver) { + return false; + } + if (paused || !canDispatch() && redistributor == null) { + return false; + } - if (checkExpired(ref)) { - return true; - } + if (checkExpired(ref)) { + return true; + } - consumers.reset(); + consumers.reset(); - while (consumers.hasNext() || redistributor != null) { + while (consumers.hasNext() || redistributor != null) { - ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor; - Consumer consumer = holder.consumer; + ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor; + Consumer consumer = holder.consumer; - final SimpleString groupID = extractGroupID(ref); - Consumer groupConsumer = getGroupConsumer(groupID); + final SimpleString groupID = extractGroupID(ref); + Consumer groupConsumer = getGroupConsumer(groupID); - if (groupConsumer != null) { - consumer = groupConsumer; - } + if (groupConsumer != null) { + consumer = groupConsumer; + } - HandleStatus status = handle(ref, consumer); + HandleStatus status = handle(ref, consumer); - if (status == HandleStatus.HANDLED) { + if (status == HandleStatus.HANDLED) { - if (redistributor == null) { - handleMessageGroup(ref, consumer, groupConsumer, groupID); - } + if (redistributor == null) { + handleMessageGroup(ref, consumer, groupConsumer, groupID); + } - messagesAdded.incrementAndGet(); + messagesAdded.incrementAndGet(); - deliveriesInTransit.countUp(); - proceedDeliver(consumer, ref); - consumers.reset(); - return true; - } + deliveriesInTransit.countUp(); + proceedDeliver(consumer, ref); + consumers.reset(); + return true; + } - if (redistributor != null || groupConsumer != null) { - break; + if (redistributor != null || groupConsumer != null) { + break; + } } - } - if (logger.isTraceEnabled()) { - logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery"); + if (logger.isTraceEnabled()) { + logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery"); + } + return false; } - return false; + } finally { + deliverLock.unlock(); } } @@ -3464,8 +3485,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { enterCritical(CRITICAL_DELIVER); boolean needCheckDepage = false; try { - synchronized (QueueImpl.this.deliverRunner) { + deliverLock.lock(); + try { needCheckDepage = deliver(); + } finally { + deliverLock.unlock(); } } finally { leaveCritical(CRITICAL_DELIVER);