Author: robbie Date: Tue May 11 17:46:05 2010 New Revision: 943206 URL: http://svn.apache.org/viewvc?rev=943206&view=rev Log: QPID-2568: revert changes from commits 941359, 941376 and 941792
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Tue May 11 17:46:05 2010 @@ -74,6 +74,8 @@ public interface UnacknowledgedMessageMa * @return a set of delivery tags */ Set<Long> getDeliveryTags(); + + public long getUnacknowledgeBytes(); } Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Tue May 11 17:46:05 2010 @@ -24,10 +24,14 @@ import org.apache.qpid.server.store.Stor import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.txn.TransactionalContext; @@ -35,6 +39,8 @@ public class UnacknowledgedMessageMapImp { private final Object _lock = new Object(); + private long _unackedSize; + private Map<Long, QueueEntry> _map; private long _lastDeliveryTag; @@ -84,9 +90,14 @@ public class UnacknowledgedMessageMapImp synchronized (_lock) { - QueueEntry entry = _map.remove(deliveryTag); + QueueEntry message = _map.remove(deliveryTag); + if(message != null) + { + _unackedSize -= message.getMessage().getSize(); + + } - return entry; + return message; } } @@ -108,6 +119,7 @@ public class UnacknowledgedMessageMapImp synchronized (_lock) { _map.put(deliveryTag, message); + _unackedSize += message.getMessage().getSize(); _lastDeliveryTag = deliveryTag; } } @@ -118,6 +130,7 @@ public class UnacknowledgedMessageMapImp { Collection<QueueEntry> currentEntries = _map.values(); _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit); + _unackedSize = 0l; return currentEntries; } } @@ -144,6 +157,7 @@ public class UnacknowledgedMessageMapImp synchronized (_lock) { _map.clear(); + _unackedSize = 0l; } } @@ -169,6 +183,9 @@ public class UnacknowledgedMessageMapImp it.remove(); + _unackedSize -= unacked.getValue().getMessage().getSize(); + + if (unacked.getKey() == deliveryTag) { break; @@ -208,4 +225,8 @@ public class UnacknowledgedMessageMapImp } } + public long getUnacknowledgeBytes() + { + return _unackedSize; + } } Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Tue May 11 17:46:05 2010 @@ -137,6 +137,8 @@ public interface QueueEntry extends Comp long getSize(); + boolean getDeliveredToConsumer(); + boolean expired() throws AMQException; boolean isAcquired(); Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 11 17:46:05 2010 @@ -42,9 +42,7 @@ public class QueueEntryImpl implements Q private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; - - private boolean _immediateAndNotDelivered = false; + private final AMQMessage _message; private Set<Subscription> _rejectedBy = null; @@ -121,14 +119,17 @@ public class QueueEntryImpl implements Q public long getSize() { - AMQMessage message = getMessage(); - return message == null ? 0 : message.getSize(); + return getMessage().getSize(); + } + + public boolean getDeliveredToConsumer() + { + return getMessage().getDeliveredToConsumer(); } public boolean expired() throws AMQException { - AMQMessage message = getMessage(); - return message == null ? false : message.expired(getQueue()); + return getMessage().expired(getQueue()); } public boolean isAcquired() @@ -166,16 +167,13 @@ public class QueueEntryImpl implements Q public boolean acquiredBySubscription() { + return (_state instanceof SubscriptionAcquiredState); } public void setDeliveredToSubscription() { - AMQMessage message = getMessage(); - if (message != null) - { - message.setDeliveredToConsumer(); - } + getMessage().setDeliveredToConsumer(); } public void release() @@ -199,17 +197,12 @@ public class QueueEntryImpl implements Q public boolean immediateAndNotDelivered() { - AMQMessage message = getMessage(); - return message == null ? _immediateAndNotDelivered : message.immediateAndNotDelivered(); + return getMessage().immediateAndNotDelivered(); } public void setRedelivered(boolean b) { - AMQMessage message = getMessage(); - if(message != null) - { - message.setRedelivered(b); - } + getMessage().setRedelivered(b); } public Subscription getDeliveredSubscription() @@ -305,16 +298,7 @@ public class QueueEntryImpl implements Q { if(delete()) { - AMQMessage msg = getMessage(); - if(msg != null) - { - getMessage().decrementReference(storeContext); - - _immediateAndNotDelivered = _message.immediateAndNotDelivered(); - - //Ensure we can't hang on to the message, release the ref; - _message = null; - } + getMessage().decrementReference(storeContext); } } @@ -403,7 +387,6 @@ public class QueueEntryImpl implements Q if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE)) { _queueEntryList.advanceHead(); - return true; } else Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue May 11 17:46:05 2010 @@ -452,7 +452,7 @@ public class SimpleAMQQueue implements A deliverAsync(); } - _managedObject.checkForNotification(message); + _managedObject.checkForNotification(entry.getMessage()); return entry; } @@ -780,13 +780,7 @@ public class SimpleAMQQueue implements A public boolean accept(QueueEntry entry) { - AMQMessage message = entry.getMessage(); - if(message == null) - { - return false; - } - - final long messageId = message.getMessageId(); + final long messageId = entry.getMessage().getMessageId(); return messageId >= fromMessageId && messageId <= toMessageId; } @@ -805,13 +799,7 @@ public class SimpleAMQQueue implements A public boolean accept(QueueEntry entry) { - AMQMessage message = entry.getMessage(); - if(message == null) - { - return false; - } - - _complete = message.getMessageId() == messageId; + _complete = entry.getMessage().getMessageId() == messageId; return _complete; } @@ -883,13 +871,7 @@ public class SimpleAMQQueue implements A public boolean accept(QueueEntry entry) { - AMQMessage message = entry.getMessage(); - if(message == null) - { - return false; - } - - final long messageId = message.getMessageId(); + final long messageId = entry.getMessage().getMessageId(); return (messageId >= fromMessageId) && (messageId <= toMessageId) && entry.acquire(); @@ -973,19 +955,13 @@ public class SimpleAMQQueue implements A public boolean accept(QueueEntry entry) { - AMQMessage message = entry.getMessage(); - if(message == null) - { - return false; - } - - final long messageId = message.getMessageId(); + final long messageId = entry.getMessage().getMessageId(); if ((messageId >= fromMessageId) && (messageId <= toMessageId)) { if (!entry.isDeleted()) { - return message.incrementReference(); + return entry.getMessage().incrementReference(); } } @@ -1006,10 +982,6 @@ public class SimpleAMQQueue implements A for (QueueEntry entry : entries) { AMQMessage message = entry.getMessage(); - if(message == null) - { - continue; - } if (message.isReferenced() && message.isPersistent()) { @@ -1071,11 +1043,6 @@ public class SimpleAMQQueue implements A while (queueListIterator.advance()) { QueueEntry node = queueListIterator.getNode(); - - if(node.isDeleted()) - { - continue; - } final long messageId = node.getMessage().getMessageId(); Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue May 11 17:46:05 2010 @@ -98,7 +98,7 @@ public class NonTransactionalContext imp //required by the 'immediate' flag: if(entry.immediateAndNotDelivered()) { - _returnMessages.add(new NoConsumersException(message)); + _returnMessages.add(new NoConsumersException(entry.getMessage())); } } @@ -180,17 +180,17 @@ public class NonTransactionalContext imp beginTranIfNecessary(); } + //Message has been ack so discard it. This will dequeue and decrement the reference. + msg.discard(_storeContext); + unacknowledgedMessageMap.remove(deliveryTag); + if (debug) { _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " + msg.getMessage().getMessageId()); } - - //Message has been ack so discard it. This will dequeue and decrement the reference. - msg.discard(_storeContext); - } if(_inTran) { Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=943206&r1=943205&r2=943206&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Tue May 11 17:46:05 2010 @@ -123,7 +123,6 @@ public class TxAckTest extends TestCase private final List<Long> _unacked; private StoreContext _storeContext = new StoreContext(); private AMQQueue _queue; - private Map<QueueEntry, TestMessage> _messages; Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws Exception { @@ -132,8 +131,6 @@ public class TxAckTest extends TestCase new LinkedList<RequiredDeliveryException>() ); - _messages = new HashMap<QueueEntry, TestMessage>(); - VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next(); _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false, null, false, @@ -174,9 +171,6 @@ public class TxAckTest extends TestCase TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext()); _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message)); - QueueEntry entry = _map.get(deliveryTag); - - _messages.put(entry, (TestMessage) entry.getMessage()); } _acked = acked; _unacked = unacked; @@ -193,7 +187,7 @@ public class TxAckTest extends TestCase { QueueEntry u = _map.get(tag); assertTrue("Message not found for tag " + tag, u != null); - _messages.get(u).assertCountEquals(expected); + ((TestMessage) u.getMessage()).assertCountEquals(expected); } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org