This is an automated email from the ASF dual-hosted git repository. michaelpearce pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new fafbd7e ARTEMIS-1604 Artemis deadlock using MQTT Protocol new 201d76b This closes #2580 fafbd7e is described below commit fafbd7e2e5953e03573088577be620828cd77bc5 Author: Michael André Pearce <michael.andre.pea...@me.com> AuthorDate: Tue Mar 12 19:53:07 2019 +0000 ARTEMIS-1604 Artemis deadlock using MQTT Protocol Address code review comment not address when PR was merged. --- .../artemis/core/server/impl/QueueImpl.java | 80 ++++++++++++---------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 7e736ca..47cd68e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3094,60 +3094,64 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return false; } try { - synchronized (this) { - if (!supportsDirectDeliver) { - return false; - } - if (paused || !canDispatch() && redistributor == null) { - return false; - } - - if (checkExpired(ref)) { - return true; - } + return deliver(ref); + } finally { + deliverLock.unlock(); + } + } - consumers.reset(); + private boolean deliver(final MessageReference ref) { + synchronized (this) { + if (!supportsDirectDeliver) { + return false; + } + if (paused || !canDispatch() && redistributor == null) { + return false; + } - while (consumers.hasNext() || redistributor != null) { + if (checkExpired(ref)) { + return true; + } - ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor; - Consumer consumer = holder.consumer; + consumers.reset(); - final SimpleString groupID = extractGroupID(ref); - Consumer groupConsumer = getGroupConsumer(groupID); + while (consumers.hasNext() || redistributor != null) { - if (groupConsumer != null) { - consumer = groupConsumer; - } + ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor; + Consumer consumer = holder.consumer; - HandleStatus status = handle(ref, consumer); + final SimpleString groupID = extractGroupID(ref); + Consumer groupConsumer = getGroupConsumer(groupID); - if (status == HandleStatus.HANDLED) { + if (groupConsumer != null) { + consumer = groupConsumer; + } - if (redistributor == null) { - handleMessageGroup(ref, consumer, groupConsumer, groupID); - } + HandleStatus status = handle(ref, consumer); - messagesAdded.incrementAndGet(); + if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - proceedDeliver(consumer, ref); - consumers.reset(); - return true; + if (redistributor == null) { + handleMessageGroup(ref, consumer, groupConsumer, groupID); } - if (redistributor != null || groupConsumer != null) { - break; - } + messagesAdded.incrementAndGet(); + + deliveriesInTransit.countUp(); + proceedDeliver(consumer, ref); + consumers.reset(); + return true; } - if (logger.isTraceEnabled()) { - logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery"); + if (redistributor != null || groupConsumer != null) { + break; } - return false; } - } finally { - deliverLock.unlock(); + + if (logger.isTraceEnabled()) { + logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery"); + } + return false; } }