Author: orudyy
Date: Tue May 5 15:44:12 2015
New Revision: 1677842
URL: http://svn.apache.org/r1677842
Log:
QPID-6528: Always get next available queue entry for the subscription first
even if it is suspended in order to update _lastSeen entry and prevent holding
a reference to an old deleted queue entry
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1677842&r1=1677841&r2=1677842&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Tue May 5 15:44:12 2015
@@ -1264,8 +1264,10 @@ public abstract class AbstractQueue<X ex
{
try
{
- if (!sub.isSuspended()
- && consumerReadyAndHasInterest(sub, entry)
+ // get available queue entry first in order to avoid referring
old deleted queue entry in sub._queueContext._lastSeen
+ if ((getNextAvailableEntry(sub) == entry)
+ && !sub.isSuspended()
+ && sub.hasInterest(entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
@@ -1361,11 +1363,6 @@ public abstract class AbstractQueue<X ex
sub.send(entry, batch);
}
- private boolean consumerReadyAndHasInterest(final QueueConsumer<?> sub,
final QueueEntry entry)
- {
- return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
- }
-
private void setLastSeenEntry(final QueueConsumer<?> sub, final QueueEntry
entry)
{
@@ -2008,6 +2005,7 @@ public abstract class AbstractQueue<X ex
boolean atTail = false;
final boolean keepSendLockHeld = iterations <=
getMaxAsyncDeliveries();
boolean queueEmpty = false;
+ boolean deliveryAttempted = false;
try
{
@@ -2025,6 +2023,7 @@ public abstract class AbstractQueue<X ex
}
atTail = attemptDelivery(sub, true);
+ deliveryAttempted = true;
if (atTail && getNextAvailableEntry(sub) == null)
{
queueEmpty = true;
@@ -2042,6 +2041,12 @@ public abstract class AbstractQueue<X ex
}
}
}
+
+ if (!deliveryAttempted )
+ {
+ // avoid referring old deleted queue entry in
sub._queueContext._lastSeen
+ getNextAvailableEntry(sub);
+ }
}
finally
{
@@ -2084,14 +2089,13 @@ public abstract class AbstractQueue<X ex
{
boolean atTail = false;
+ // avoid referring old deleted queue entry in
sub._queueContext._lastSeen
+ QueueEntry node = getNextAvailableEntry(sub);
boolean subActive = sub.isActive() && !sub.isSuspended();
if (subActive)
{
- QueueEntry node = getNextAvailableEntry(sub);
-
-
if (_virtualHost.getState() != State.ACTIVE)
{
throw new ConnectionScopedRuntimeException("Delivery halted
owing to " +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]