Author: robbie
Date: Tue May 11 17:46:05 2010
New Revision: 943206

URL: http://svn.apache.org/viewvc?rev=943206&view=rev
Log:
QPID-2568: revert changes from commits 941359, 941376 and 941792

Modified:
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 Tue May 11 17:46:05 2010
@@ -74,6 +74,8 @@ public interface UnacknowledgedMessageMa
      * @return a set of delivery tags
      */
     Set<Long> getDeliveryTags();
+
+    public long getUnacknowledgeBytes();
 }
 
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 Tue May 11 17:46:05 2010
@@ -24,10 +24,14 @@ import org.apache.qpid.server.store.Stor
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.TransactionalContext;
 
@@ -35,6 +39,8 @@ public class UnacknowledgedMessageMapImp
 {
     private final Object _lock = new Object();
 
+    private long _unackedSize;
+
     private Map<Long, QueueEntry> _map;
 
     private long _lastDeliveryTag;
@@ -84,9 +90,14 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
 
-            QueueEntry entry = _map.remove(deliveryTag);
+            QueueEntry message = _map.remove(deliveryTag);
+            if(message != null)
+            {
+                _unackedSize -= message.getMessage().getSize();
+
+            }
 
-            return entry;
+            return message;
         }
     }
 
@@ -108,6 +119,7 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             _map.put(deliveryTag, message);
+            _unackedSize += message.getMessage().getSize();
             _lastDeliveryTag = deliveryTag;
         }
     }
@@ -118,6 +130,7 @@ public class UnacknowledgedMessageMapImp
         {
             Collection<QueueEntry> currentEntries = _map.values();
             _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+            _unackedSize = 0l;
             return currentEntries;
         }
     }
@@ -144,6 +157,7 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             _map.clear();
+            _unackedSize = 0l;
         }
     }
 
@@ -169,6 +183,9 @@ public class UnacknowledgedMessageMapImp
 
                 it.remove();
 
+                _unackedSize -= unacked.getValue().getMessage().getSize();
+
+
                 if (unacked.getKey() == deliveryTag)
                 {
                     break;
@@ -208,4 +225,8 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
+    public long getUnacknowledgeBytes()
+    {
+        return _unackedSize;
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Tue May 11 17:46:05 2010
@@ -137,6 +137,8 @@ public interface QueueEntry extends Comp
 
     long getSize();
 
+    boolean getDeliveredToConsumer();
+
     boolean expired() throws AMQException;
 
     boolean isAcquired();

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Tue May 11 17:46:05 2010
@@ -42,9 +42,7 @@ public class QueueEntryImpl implements Q
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private AMQMessage _message;
-    
-    private boolean _immediateAndNotDelivered = false;
+    private final AMQMessage _message;
 
     private Set<Subscription> _rejectedBy = null;
 
@@ -121,14 +119,17 @@ public class QueueEntryImpl implements Q
 
     public long getSize()
     {
-        AMQMessage message = getMessage();
-        return message == null ? 0 : message.getSize();
+        return getMessage().getSize();
+    }
+
+    public boolean getDeliveredToConsumer()
+    {
+        return getMessage().getDeliveredToConsumer();
     }
 
     public boolean expired() throws AMQException
     {
-        AMQMessage message = getMessage();
-        return message == null ? false : message.expired(getQueue());
+        return getMessage().expired(getQueue());
     }
 
     public boolean isAcquired()
@@ -166,16 +167,13 @@ public class QueueEntryImpl implements Q
 
     public boolean acquiredBySubscription()
     {
+
         return (_state instanceof SubscriptionAcquiredState);
     }
 
     public void setDeliveredToSubscription()
     {
-        AMQMessage message = getMessage();
-        if (message != null)
-        {
-            message.setDeliveredToConsumer();
-        }
+        getMessage().setDeliveredToConsumer();
     }
 
     public void release()
@@ -199,17 +197,12 @@ public class QueueEntryImpl implements Q
 
     public boolean immediateAndNotDelivered() 
     {
-        AMQMessage message = getMessage();
-        return message == null ? _immediateAndNotDelivered : 
message.immediateAndNotDelivered();
+        return getMessage().immediateAndNotDelivered();
     }
 
     public void setRedelivered(boolean b)
     {
-        AMQMessage message = getMessage();
-        if(message != null)
-        {
-            message.setRedelivered(b);
-        }
+        getMessage().setRedelivered(b);
     }
 
     public Subscription getDeliveredSubscription()
@@ -305,16 +298,7 @@ public class QueueEntryImpl implements Q
     {
         if(delete())
         {
-            AMQMessage msg = getMessage();
-            if(msg != null)
-            {
-                getMessage().decrementReference(storeContext);
-
-                _immediateAndNotDelivered = 
_message.immediateAndNotDelivered();
-
-                //Ensure we can't hang on to the message, release the ref;
-                _message = null;
-            }
+            getMessage().decrementReference(storeContext);
         }
     }
 
@@ -403,7 +387,6 @@ public class QueueEntryImpl implements Q
         if(state != DELETED_STATE && 
_stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
             _queueEntryList.advanceHead();            
-            
             return true;
         }
         else

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Tue May 11 17:46:05 2010
@@ -452,7 +452,7 @@ public class SimpleAMQQueue implements A
             deliverAsync();
         }
 
-        _managedObject.checkForNotification(message);
+        _managedObject.checkForNotification(entry.getMessage());
 
         return entry;
     }
@@ -780,13 +780,7 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                AMQMessage message = entry.getMessage();
-                if(message == null)
-                {
-                    return false;
-                }
-                
-                final long messageId = message.getMessageId();
+                final long messageId = entry.getMessage().getMessageId();
                 return messageId >= fromMessageId && messageId <= toMessageId;
             }
 
@@ -805,13 +799,7 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                AMQMessage message = entry.getMessage();
-                if(message == null)
-                {
-                    return false;
-                }
-                
-                _complete = message.getMessageId() == messageId;
+                _complete = entry.getMessage().getMessageId() == messageId;
                 return _complete;
             }
 
@@ -883,13 +871,7 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                AMQMessage message = entry.getMessage();
-                if(message == null)
-                {
-                    return false;
-                }
-                
-                final long messageId = message.getMessageId();
+                final long messageId = entry.getMessage().getMessageId();
                 return (messageId >= fromMessageId)
                        && (messageId <= toMessageId)
                        && entry.acquire();
@@ -973,19 +955,13 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                AMQMessage message = entry.getMessage();
-                if(message == null)
-                {
-                    return false;
-                }
-                
-                final long messageId = message.getMessageId();
+                final long messageId = entry.getMessage().getMessageId();
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId))
                 {
                     if (!entry.isDeleted())
                     {
-                        return message.incrementReference();
+                        return entry.getMessage().incrementReference();
                     }
                 }
 
@@ -1006,10 +982,6 @@ public class SimpleAMQQueue implements A
             for (QueueEntry entry : entries)
             {
                 AMQMessage message = entry.getMessage();
-                if(message == null)
-                {
-                    continue;
-                }
 
                 if (message.isReferenced() && message.isPersistent())
                 {
@@ -1071,11 +1043,6 @@ public class SimpleAMQQueue implements A
             while (queueListIterator.advance())
             {
                 QueueEntry node = queueListIterator.getNode();
-                
-                if(node.isDeleted())
-                {
-                    continue;
-                }
 
                 final long messageId = node.getMessage().getMessageId();
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
 Tue May 11 17:46:05 2010
@@ -98,7 +98,7 @@ public class NonTransactionalContext imp
         //required by the 'immediate' flag:
         if(entry.immediateAndNotDelivered())
         {
-            _returnMessages.add(new NoConsumersException(message));
+            _returnMessages.add(new NoConsumersException(entry.getMessage()));
         }
 
     }
@@ -180,17 +180,17 @@ public class NonTransactionalContext imp
                 beginTranIfNecessary();
             }
 
+            //Message has been ack so discard it. This will dequeue and 
decrement the reference.
+            msg.discard(_storeContext);
+
             unacknowledgedMessageMap.remove(deliveryTag);
 
+
             if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with 
delivery tag " + deliveryTag + " msg id " +
                            msg.getMessage().getMessageId());
             }
-
-            //Message has been ack so discard it. This will dequeue and 
decrement the reference.
-            msg.discard(_storeContext);
-
         }
         if(_inTran)
         {

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=943206&r1=943205&r2=943206&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
 Tue May 11 17:46:05 2010
@@ -123,7 +123,6 @@ public class TxAckTest extends TestCase
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
                private AMQQueue _queue;
-               private Map<QueueEntry, TestMessage> _messages;
 
         Scenario(int messageCount, List<Long> acked, List<Long> unacked) 
throws Exception
         {
@@ -132,8 +131,6 @@ public class TxAckTest extends TestCase
                                                                           new 
LinkedList<RequiredDeliveryException>()
             );
 
-            _messages = new HashMap<QueueEntry, TestMessage>();
-
             VirtualHost virtualHost = 
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
 
             _queue = AMQQueueFactory.createAMQQueueImpl(new 
AMQShortString("test"), false, null, false,
@@ -174,9 +171,6 @@ public class TxAckTest extends TestCase
 
                 TestMessage message = new TestMessage(deliveryTag, i, info, 
txnContext.getStoreContext());
                 _map.add(deliveryTag, _queue.enqueue(new StoreContext(), 
message));
-                QueueEntry entry = _map.get(deliveryTag);
-
-                _messages.put(entry, (TestMessage) entry.getMessage());
             }
             _acked = acked;
             _unacked = unacked;
@@ -193,7 +187,7 @@ public class TxAckTest extends TestCase
             {
                 QueueEntry u = _map.get(tag);
                 assertTrue("Message not found for tag " + tag, u != null);
-                _messages.get(u).assertCountEquals(expected);
+                ((TestMessage) u.getMessage()).assertCountEquals(expected);
             }
         }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to