Author: orudyy
Date: Tue May  5 15:44:12 2015
New Revision: 1677842

URL: http://svn.apache.org/r1677842
Log:
QPID-6528: Always get next available queue entry for the subscription first 
even if it is suspended in order to update _lastSeen entry and prevent holding 
a reference to an old deleted queue entry

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1677842&r1=1677841&r2=1677842&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Tue May  5 15:44:12 2015
@@ -1264,8 +1264,10 @@ public abstract class AbstractQueue<X ex
         {
             try
             {
-                if (!sub.isSuspended()
-                    && consumerReadyAndHasInterest(sub, entry)
+                // get available queue entry first in order to avoid referring 
old deleted queue entry in sub._queueContext._lastSeen
+                if ((getNextAvailableEntry(sub) == entry)
+                    && !sub.isSuspended()
+                    && sub.hasInterest(entry)
                     && mightAssign(sub, entry)
                     && !sub.wouldSuspend(entry))
                 {
@@ -1361,11 +1363,6 @@ public abstract class AbstractQueue<X ex
         sub.send(entry, batch);
     }
 
-    private boolean consumerReadyAndHasInterest(final QueueConsumer<?> sub, 
final QueueEntry entry)
-    {
-        return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
-    }
-
 
     private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry 
entry)
     {
@@ -2008,6 +2005,7 @@ public abstract class AbstractQueue<X ex
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  
getMaxAsyncDeliveries();
         boolean queueEmpty = false;
+        boolean deliveryAttempted = false;
 
         try
         {
@@ -2025,6 +2023,7 @@ public abstract class AbstractQueue<X ex
                     }
 
                     atTail = attemptDelivery(sub, true);
+                    deliveryAttempted = true;
                     if (atTail && getNextAvailableEntry(sub) == null)
                     {
                         queueEmpty = true;
@@ -2042,6 +2041,12 @@ public abstract class AbstractQueue<X ex
                     }
                 }
             }
+
+            if (!deliveryAttempted )
+            {
+                // avoid referring old deleted queue entry in 
sub._queueContext._lastSeen
+                getNextAvailableEntry(sub);
+            }
         }
         finally
         {
@@ -2084,14 +2089,13 @@ public abstract class AbstractQueue<X ex
     {
         boolean atTail = false;
 
+        // avoid referring old deleted queue entry in 
sub._queueContext._lastSeen
+        QueueEntry node  = getNextAvailableEntry(sub);
         boolean subActive = sub.isActive() && !sub.isSuspended();
 
         if (subActive)
         {
 
-            QueueEntry node  = getNextAvailableEntry(sub);
-
-
             if (_virtualHost.getState() != State.ACTIVE)
             {
                 throw new ConnectionScopedRuntimeException("Delivery halted 
owing to " +



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to