Author: rgodfrey
Date: Wed Nov 16 21:36:27 2016
New Revision: 1770060

URL: http://svn.apache.org/viewvc?rev=1770060&view=rev
Log:
QPID-7514 : reduce the occurrences of consumers being removed and then 
immediately being reinserted into a list

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Wed Nov 16 21:36:27 2016
@@ -1880,19 +1880,20 @@ public abstract class AbstractQueue<X ex
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
 
-        _queueConsumerManager.setNotified(consumer, false);
+        _queueConsumerManager.clearStateAffirmationFlag(consumer);
 
         try
         {
+
             if (!consumer.isSuspended())
             {
                 messageContainer = attemptDelivery(consumer);
-                if(messageContainer.getMessageInstance() != null)
-                {
-                    _queueConsumerManager.setNotified(consumer, true);
-                }
-                else
+
+
+                if(messageContainer.getMessageInstance() == null)
                 {
+                    _queueConsumerManager.setNotified(consumer, false, true);
+
                     if (messageContainer.hasNoAvailableMessages())
                     {
                         queueEmpty = true;
@@ -2243,7 +2244,7 @@ public abstract class AbstractQueue<X ex
 
     private boolean notifyConsumer(final QueueConsumer<?> consumer)
     {
-        if(consumerHasAvailableMessages(consumer) && 
_queueConsumerManager.setNotified(consumer, true))
+        if(consumerHasAvailableMessages(consumer) && 
_queueConsumerManager.setNotified(consumer, true, false))
         {
             consumer.notifyWork();
             return true;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
 Wed Nov 16 21:36:27 2016
@@ -27,7 +27,7 @@ public interface QueueConsumerManager
     void addConsumer(QueueConsumer<?> consumer);
     boolean removeConsumer(QueueConsumer<?> consumer);
     /*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean 
interested); // called from Consumer
-    /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean 
notified); // called from Queue
+    /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean 
notified, final boolean conditional); // called from Queue
 
     // should be priority and then insertion order
     Iterator<QueueConsumer<?>> getInterestedIterator();

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
 Wed Nov 16 21:36:27 2016
@@ -81,16 +81,16 @@ public class QueueConsumerManagerImpl im
         {
             if (consumer.acquires())
             {
-                node.moveFromTo(REMOVED, NodeState.INTERESTED);
+                node.moveFromTo(REMOVED, NodeState.INTERESTED, false);
             }
             else
             {
-                node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING);
+                node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING, false);
             }
         }
         else
         {
-            node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED);
+            node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED, false);
         }
         _count++;
     }
@@ -102,7 +102,7 @@ public class QueueConsumerManagerImpl im
         removeFromAll(consumer);
         QueueConsumerNode node = consumer.getQueueConsumerNode();
 
-        if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED))
+        if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED, 
false))
         {
             _count--;
             return true;
@@ -119,40 +119,46 @@ public class QueueConsumerManagerImpl im
         {
             if (consumer.acquires())
             {
-                return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED);
+                return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED, 
false);
             }
             else
             {
-                return node.moveFromTo(NOT_INTERESTED, 
NodeState.NON_ACQUIRING);
+                return node.moveFromTo(NOT_INTERESTED, 
NodeState.NON_ACQUIRING, false);
             }
         }
         else
         {
             if (consumer.acquires())
             {
-                return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, 
NodeState.NOT_INTERESTED);
+                return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, 
NodeState.NOT_INTERESTED, false);
             }
             else
             {
-                return node.moveFromTo(NON_ACQUIRING, 
NodeState.NOT_INTERESTED);
+                return node.moveFromTo(NON_ACQUIRING, 
NodeState.NOT_INTERESTED, false);
             }
         }
     }
 
+    public void clearStateAffirmationFlag(final QueueConsumer consumer)
+    {
+        QueueConsumerNode node = consumer.getQueueConsumerNode();
+        node.clearAffirmaion();
+    }
+
     // Set by the Queue any IO thread
     @Override
-    public boolean setNotified(final QueueConsumer consumer, final boolean 
notified)
+    public boolean setNotified(final QueueConsumer consumer, final boolean 
notified, final boolean conditional)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
         if (consumer.acquires())
         {
             if (notified)
             {
-                return node.moveFromTo(INTERESTED, NodeState.NOTIFIED);
+                return node.moveFromTo(INTERESTED, NodeState.NOTIFIED, 
conditional);
             }
             else
             {
-                return node.moveFromTo(NOTIFIED, NodeState.INTERESTED);
+                return node.moveFromTo(NOTIFIED, NodeState.INTERESTED, 
conditional);
             }
         }
         else

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
 Wed Nov 16 21:36:27 2016
@@ -29,6 +29,7 @@ final class QueueConsumerNode
     private QueueConsumerNodeListEntry _listEntry;
     private QueueConsumerManagerImpl.NodeState _state = 
QueueConsumerManagerImpl.NodeState.REMOVED;
     private QueueConsumerNodeListEntry _allEntry;
+    private boolean _affirmed;
 
     QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, 
final QueueConsumer<?> queueConsumer)
     {
@@ -46,20 +47,37 @@ final class QueueConsumerNode
         return _state;
     }
 
-    public synchronized boolean 
moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates, 
QueueConsumerManagerImpl.NodeState toState)
+    public synchronized void clearAffirmaion()
+    {
+        _affirmed = false;
+    }
+
+
+    public synchronized boolean 
moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates,
+                                           QueueConsumerManagerImpl.NodeState 
toState,
+                                           final boolean conditional)
     {
         if (fromStates.contains(_state))
         {
-            if (_listEntry != null)
+            if(!conditional || !_affirmed)
+            {
+                if (_listEntry != null)
+                {
+                    _listEntry.remove();
+                }
+                _state = toState;
+                _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+                return true;
+            }
+            else
             {
-                _listEntry.remove();
+                _affirmed = false;
+                return false;
             }
-            _state = toState;
-            _listEntry = _queueConsumerManager.addNodeToInterestList(this);
-            return true;
         }
         else
         {
+            _affirmed = _state == toState;
             return false;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to