Author: rgodfrey Date: Wed Nov 18 09:51:00 2015 New Revision: 1714959 URL: http://svn.apache.org/viewvc?rev=1714959&view=rev Log: QPID-6865 : Avoid deadlock when calling setMessageAssignmentSuspended
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1714959&r1=1714958&r2=1714959&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java Wed Nov 18 09:51:00 2015 @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.connection.ConnectionPrincipal; -import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; @@ -402,19 +401,21 @@ public abstract class AbstractAMQPConnec } @Override - public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended) + public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended, final boolean notifyConsumers) { _messageAssignmentSuspended.set(messageAssignmentSuspended); - - for(AMQSessionModel<?> session : getSessionModels()) + if(notifyConsumers) { - if (messageAssignmentSuspended) - { - session.ensureConsumersNoticedStateChange(); - } - else + for (AMQSessionModel<?> session : getSessionModels()) { - session.notifyConsumerTargetCurrentStates(); + if (messageAssignmentSuspended) + { + session.ensureConsumersNoticedStateChange(); + } + else + { + session.notifyConsumerTargetCurrentStates(); + } } } } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1714959&r1=1714958&r2=1714959&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java Wed Nov 18 09:51:00 2015 @@ -92,9 +92,9 @@ public class MultiVersionProtocolEngine } @Override - public void setMessageAssignmentSuspended(final boolean value) + public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers) { - _delegate.setMessageAssignmentSuspended(value); + _delegate.setMessageAssignmentSuspended(value, notifyConsumers); } @Override @@ -239,7 +239,7 @@ public class MultiVersionProtocolEngine { @Override - public void setMessageAssignmentSuspended(final boolean value) + public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers) { } @@ -361,7 +361,7 @@ public class MultiVersionProtocolEngine private final AtomicBoolean _hasWork = new AtomicBoolean(); @Override - public void setMessageAssignmentSuspended(final boolean value) + public void setMessageAssignmentSuspended(final boolean value, final boolean notifyConsumers) { } Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714959&r1=1714958&r2=1714959&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java Wed Nov 18 09:51:00 2015 @@ -208,7 +208,7 @@ public class NonBlockingConnection imple { if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit) { - _protocolEngine.setMessageAssignmentSuspended(true); + _protocolEngine.setMessageAssignmentSuspended(true, false); } } @@ -242,7 +242,7 @@ public class NonBlockingConnection imple } _protocolEngine.setIOThread(Thread.currentThread()); - _protocolEngine.setMessageAssignmentSuspended(true); + _protocolEngine.setMessageAssignmentSuspended(true, true); boolean processPendingComplete = processPending(); @@ -260,7 +260,7 @@ public class NonBlockingConnection imple if (_fullyWritten) { - _protocolEngine.setMessageAssignmentSuspended(false); + _protocolEngine.setMessageAssignmentSuspended(false, true); } } else Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1714959&r1=1714958&r2=1714959&view=diff ============================================================================== --- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java (original) +++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java Wed Nov 18 09:51:00 2015 @@ -54,7 +54,7 @@ public interface ProtocolEngine extends void setTransportBlockedForWriting(boolean blocked); - void setMessageAssignmentSuspended(boolean value); + void setMessageAssignmentSuspended(boolean value, final boolean notifyConsumers); boolean isMessageAssignmentSuspended(); Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714959&r1=1714958&r2=1714959&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java (original) +++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java Wed Nov 18 09:51:00 2015 @@ -246,7 +246,7 @@ class WebSocketProvider implements Accep try { _protocolEngine.setIOThread(Thread.currentThread()); - _protocolEngine.setMessageAssignmentSuspended(true); + _protocolEngine.setMessageAssignmentSuspended(true, true); Iterator<Runnable> iter = _protocolEngine.processPendingIterator(); while(iter.hasNext()) { @@ -261,7 +261,7 @@ class WebSocketProvider implements Accep _connectionWrapper.doWrite(); - _protocolEngine.setMessageAssignmentSuspended(false); + _protocolEngine.setMessageAssignmentSuspended(false, true); } finally { @@ -418,7 +418,7 @@ class WebSocketProvider implements Accep { if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit) { - _protocolEngine.setMessageAssignmentSuspended(true); + _protocolEngine.setMessageAssignmentSuspended(true, false); } } @@ -469,7 +469,7 @@ class WebSocketProvider implements Accep try { _protocolEngine.setIOThread(Thread.currentThread()); - _protocolEngine.setMessageAssignmentSuspended(true); + _protocolEngine.setMessageAssignmentSuspended(true, true); Iterator<Runnable> iter = _protocolEngine.processPendingIterator(); while(iter.hasNext()) @@ -479,7 +479,7 @@ class WebSocketProvider implements Accep doWrite(); - _protocolEngine.setMessageAssignmentSuspended(false); + _protocolEngine.setMessageAssignmentSuspended(false, true); } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org