Author: robbie Date: Mon Aug 8 22:56:17 2011 New Revision: 1155138 URL: http://svn.apache.org/viewvc?rev=1155138&view=rev Log: QPID-3387: use the subscription ID to track rejection rather than the subscription itself
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Aug 8 22:56:17 2011 @@ -202,9 +202,7 @@ public interface QueueEntry extends Comp void reject(); - void reject(Subscription subscription); - - boolean isRejectedBy(Subscription subscription); + boolean isRejectedBy(long subscriptionId); void dequeue(); Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Aug 8 22:56:17 2011 @@ -51,7 +51,7 @@ public class QueueEntryImpl implements Q private MessageReference _message; - private Set<Subscription> _rejectedBy = null; + private Set<Long> _rejectedBy = null; private volatile EntryState _state = AVAILABLE_STATE; @@ -325,19 +325,16 @@ public class QueueEntryImpl implements Q public void reject() { - reject(getDeliveredSubscription()); - } + Subscription subscription = getDeliveredSubscription(); - public void reject(Subscription subscription) - { if (subscription != null) { if (_rejectedBy == null) { - _rejectedBy = new HashSet<Subscription>(); + _rejectedBy = new HashSet<Long>(); } - _rejectedBy.add(subscription); + _rejectedBy.add(subscription.getSubscriptionID()); } else { @@ -345,12 +342,12 @@ public class QueueEntryImpl implements Q } } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { if (_rejectedBy != null) // We have subscriptions that rejected this message { - return _rejectedBy.contains(subscription); + return _rejectedBy.contains(subscriptionId); } else // This messasge hasn't been rejected yet. { Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Mon Aug 8 22:56:17 2011 @@ -475,7 +475,7 @@ public abstract class SubscriptionImpl i //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) + if (entry.isRejectedBy(getSubscriptionID())) { if (_logger.isDebugEnabled()) { Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Mon Aug 8 22:56:17 2011 @@ -203,7 +203,7 @@ public class Subscription_0_10 implement //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) + if (entry.isRejectedBy(getSubscriptionID())) { return false; Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Mon Aug 8 22:56:17 2011 @@ -428,21 +428,11 @@ public class AbstractHeadersExchangeTest //To change body of implemented methods use File | Settings | File Templates. } - public void reject(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; //To change body of implemented methods use File | Settings | File Templates. } - public void requeue(Subscription subscription) - { - //To change body of implemented methods use File | Settings | File Templates. - } - public void dequeue() { //To change body of implemented methods use File | Settings | File Templates. Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java Mon Aug 8 22:56:17 2011 @@ -139,7 +139,7 @@ public class MockQueueEntry implements Q } - public boolean isRejectedBy(Subscription subscription) + public boolean isRejectedBy(long subscriptionId) { return false; @@ -153,13 +153,6 @@ public class MockQueueEntry implements Q } - public void reject(Subscription subscription) - { - - - } - - public void release() { Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java?rev=1155138&r1=1155137&r2=1155138&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java Mon Aug 8 22:56:17 2011 @@ -26,6 +26,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.queue.QueueEntry.EntryState; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.subscription.Subscription; /** * Tests for {@link QueueEntryImpl} @@ -210,4 +211,37 @@ public class QueueEntryImplTest extends } return state; } + + /** + * Tests rejecting a queue entry records the Subscription ID + * for later verification by isRejectedBy(subscriptionId). + */ + public void testRejectAndRejectedBy() + { + Subscription sub = new MockSubscription(); + long subId = sub.getSubscriptionID(); + + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertFalse("Queue entry should not yet have been acquired by a subscription", _queueEntry.isAcquired()); + + //acquire, reject, and release the message using the subscription + assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub)); + _queueEntry.reject(); + _queueEntry.release(); + + //verify the rejection is recorded + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + + //repeat rejection using a second subscription + Subscription sub2 = new MockSubscription(); + long sub2Id = sub2.getSubscriptionID(); + + assertFalse("Queue entry should not yet have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2)); + _queueEntry.reject(); + + //verify it still records being rejected by both subscriptions + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(subId)); + assertTrue("Queue entry should have been rejected by the subscription", _queueEntry.isRejectedBy(sub2Id)); + } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org