Author: rgodfrey
Date: Sat Nov 26 07:07:39 2016
New Revision: 1771422

URL: http://svn.apache.org/viewvc?rev=1771422&view=rev
Log:
QPID-7425 : Credit should be restored only when the client explicitly 
acknowledges the delivery

Added:
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
   (with props)
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 Sat Nov 26 07:07:39 2016
@@ -42,6 +42,7 @@ public abstract class AbstractServerMess
 
     private static final 
AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection> 
_resourcesUpdater =
             
AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class, 
Collection.class,"_resources");
+    private final long _size;
 
 
     private volatile int _referenceCount = 0;
@@ -50,10 +51,17 @@ public abstract class AbstractServerMess
     private volatile Collection<UUID> _resources;
 
 
-    public AbstractServerMessageImpl(StoredMessage<T> handle, Object 
connectionReference)
+    public AbstractServerMessageImpl(StoredMessage<T> handle, Object 
connectionReference, final long size)
     {
         _handle = handle;
         _connectionReference = connectionReference;
+        _size = size;
+    }
+
+    @Override
+    public long getSize()
+    {
+        return _size;
     }
 
     public StoredMessage<T> getStoredMessage()
@@ -188,7 +196,9 @@ public abstract class AbstractServerMess
     final public Object getConnectionReference()
     {
         return _connectionReference;
-    }public String toString()
+    }
+
+    public String toString()
     {
         return "Message[" + debugIdentity() + "]";
     }
@@ -200,7 +210,7 @@ public abstract class AbstractServerMess
         private static final AtomicIntegerFieldUpdater<Reference> 
_releasedUpdater =
                 AtomicIntegerFieldUpdater.newUpdater(Reference.class, 
"_released");
 
-        private AbstractServerMessageImpl<X, T> _message;
+        private final AbstractServerMessageImpl<X, T> _message;
         private final UUID _resourceId;
         private volatile int _released;
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
 Sat Nov 26 07:07:39 2016
@@ -45,7 +45,6 @@ import org.apache.qpid.util.ByteBufferUt
 public class InternalMessage extends 
AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
 {
     private final Object _messageBody;
-    private final int _contentSize;
     private InternalMessageHeader _header;
     private String _initialRoutingAddress;
 
@@ -54,18 +53,16 @@ public class InternalMessage extends Abs
                 final InternalMessageHeader header,
                 final Object messageBody)
     {
-        super(handle, null);
+        super(handle, null, handle.getMetaData().getContentSize());
         _header = header;
         _messageBody = messageBody;
-        _contentSize = handle.getMetaData().getContentSize();
     }
 
     InternalMessage(final StoredMessage<InternalMessageMetaData> msg)
     {
-        super(msg, null);
-        _contentSize = msg.getMetaData().getContentSize();
+        super(msg, null, msg.getMetaData().getContentSize());
         _header = msg.getMetaData().getHeader();
-        Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
+        Collection<QpidByteBuffer> bufs = msg.getContent(0, (int)getSize());
 
         try(ObjectInputStream is = new ObjectInputStream(new 
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
         {
@@ -96,12 +93,6 @@ public class InternalMessage extends Abs
     }
 
     @Override
-    public long getSize()
-    {
-        return _contentSize;
-    }
-
-    @Override
     public long getExpiration()
     {
         return _header.getExpiration();

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
 Sat Nov 26 07:07:39 2016
@@ -38,7 +38,7 @@ public class AbstractServerMessageTest e
         public TestMessage(final StoredMessage<T> handle,
                            final Object connectionReference)
         {
-            super(handle, connectionReference);
+            super(handle, connectionReference, 0);
         }
 
         @Override
@@ -54,12 +54,6 @@ public class AbstractServerMessageTest e
         }
 
         @Override
-        public long getSize()
-        {
-            return 0;
-        }
-
-        @Override
         public long getExpiration()
         {
             return 0;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
 Sat Nov 26 07:07:39 2016
@@ -33,12 +33,10 @@ public class MessageTransferMessage exte
 {
 
     private final static MessageMetaData_0_10 DELETED_MESSAGE_METADATA = new 
MessageMetaData_0_10(null, 0, 0);
-    private final long _size;
 
     public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> 
storeMessage, Object connectionRef)
     {
-        super(storeMessage, connectionRef);
-        _size = storeMessage.getMetaData().getSize();
+        super(storeMessage, connectionRef, 
storeMessage.getMetaData().getSize());
     }
 
     private MessageMetaData_0_10 getMetaData()
@@ -58,10 +56,6 @@ public class MessageTransferMessage exte
         return getMetaData().getMessageHeader();
     }
 
-    public long getSize()
-    {
-        return _size;
-    }
 
     public boolean isImmediate()
     {

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Sat Nov 26 07:07:39 2016
@@ -106,7 +106,7 @@ public class AMQChannel
         implements AMQSessionModel<AMQChannel>,
                    AsyncAutoCommitTransaction.FutureRecorder,
                    ServerChannelMethodProcessor,
-                   EventLoggerProvider
+                   EventLoggerProvider, CreditRestorer
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
@@ -156,7 +156,7 @@ public class AMQChannel
 
     private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new 
LinkedList<AsyncCommand>();
 
-    private UnacknowledgedMessageMap _unacknowledgedMessageMap = new 
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
+    private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
 
@@ -224,7 +224,7 @@ public class AMQChannel
         _creditManager = new Pre0_10CreditManager(0L, 0L,
                                                   
connection.getContextValue(Long.class, AMQPConnection_0_8.HIGH_PREFETCH_LIMIT),
                                                   
connection.getContextValue(Long.class, AMQPConnection_0_8.BATCH_LIMIT));
-
+        _unacknowledgedMessageMap = new 
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH, this);
         _connection = connection;
         _channelId = channelId;
 
@@ -949,7 +949,7 @@ public class AMQChannel
                                + ") for " + consumer + " on " + 
entry.getOwningResource().getName());
         }
 
-        _unacknowledgedMessageMap.add(deliveryTag, entry);
+        _unacknowledgedMessageMap.add(deliveryTag, entry, (ConsumerTarget_0_8) 
consumer.getTarget());
 
     }
 
@@ -967,23 +967,40 @@ public class AMQChannel
      */
     private void requeue()
     {
+        final Map<Long, MessageInstance> unackedMapCopy = new 
LinkedHashMap<>();
         // we must create a new map since all the messages will get a new 
delivery tag when they are redelivered
-        Collection<MessageInstance> messagesToBeDelivered = 
_unacknowledgedMessageMap.cancelAllMessages();
+        _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+        {
+            @Override
+            public boolean callback(final long deliveryTag, final 
MessageInstance message)
+            {
+                unackedMapCopy.put(deliveryTag, message);
+                return false;
+            }
+
+            @Override
+            public void visitComplete()
+            {
 
-        if (!messagesToBeDelivered.isEmpty())
+            }
+        });
+
+        if (!unackedMapCopy.isEmpty())
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Requeuing " + messagesToBeDelivered.size() + " 
unacked messages. for " + toString());
+                _logger.debug("Requeuing " + unackedMapCopy.size() + " unacked 
messages. for " + toString());
             }
 
         }
 
-        for (MessageInstance unacked : messagesToBeDelivered)
+        for (Map.Entry<Long,MessageInstance> entry : unackedMapCopy.entrySet())
         {
+            MessageInstance unacked = entry.getValue();
             // Mark message redelivered
             unacked.setRedelivered();
-
+            // here we wish to restore credit
+            _unacknowledgedMessageMap.remove(entry.getKey(), true);
             // Ensure message is released for redelivery
             unacked.release(unacked.getAcquiringConsumer());
         }
@@ -998,7 +1015,7 @@ public class AMQChannel
      */
     public void requeue(long deliveryTag)
     {
-        MessageInstance unacked = 
_unacknowledgedMessageMap.remove(deliveryTag);
+        MessageInstance unacked = 
_unacknowledgedMessageMap.remove(deliveryTag, true);
 
         if (unacked != null)
         {
@@ -1108,7 +1125,12 @@ public class AMQChannel
             // all messages in the unacked map as redelivered.
             message.setRedelivered();
 
-            if (!resendInternal(message))
+            if (resendInternal(message))
+            {
+                // remove from unacked map - don't want to restore credit 
though(!)
+                _unacknowledgedMessageMap.remove(deliveryTag, false);
+            }
+            else
             {
                 msgToRequeue.put(deliveryTag, message);
             }
@@ -1132,7 +1154,8 @@ public class AMQChannel
             //Amend the delivery counter as the client hasn't seen these 
messages yet.
             message.decrementDeliveryCount();
 
-            _unacknowledgedMessageMap.remove(deliveryTag);
+            // here we do wish to restore credit
+            _unacknowledgedMessageMap.remove(deliveryTag, true);
 
             message.setRedelivered();
             message.release(message.getAcquiringConsumer());
@@ -1742,7 +1765,7 @@ public class AMQChannel
     private void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = 
getUnacknowledgedMessageMap();
-        final MessageInstance rejectedQueueEntry = 
unackedMap.remove(deliveryTag);
+        final MessageInstance rejectedQueueEntry = 
unackedMap.remove(deliveryTag, true);
 
         if (rejectedQueueEntry == null)
         {
@@ -3761,6 +3784,34 @@ public class AMQChannel
         }
     }
 
+    @Override
+    public void restoreCredit(final ConsumerTarget target, final int count, 
final long size)
+    {
+        boolean hasCredit = _creditManager.hasCredit();
+        _creditManager.restoreCredit(count, size);
+        if(_creditManager.hasCredit() != hasCredit)
+        {
+            if (hasCredit || 
!_creditManager.isNotBytesLimitedAndHighPrefetch())
+            {
+                updateAllConsumerNotifyWorkDesired();
+            }
+        }
+        else if (hasCredit)
+        {
+            if (_creditManager.isNotBytesLimitedAndHighPrefetch())
+            {
+                if (_creditManager.isCreditOverBatchLimit())
+                {
+                    updateAllConsumerNotifyWorkDesired();
+                }
+            }
+            else if(_creditManager.isBytesLimited())
+            {
+                target.notifyWork();
+            }
+        }
+    }
+
     private Collection<ConsumerTarget_0_8> getConsumerTargets()
     {
         return _tag2SubscriptionTargetMap.values();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
 Sat Nov 26 07:07:39 2016
@@ -34,7 +34,6 @@ import org.apache.qpid.server.store.Stor
 public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, 
MessageMetaData>
 {
     private static final MessageMetaData DELETED_MESSAGE_METADATA = new 
MessageMetaData(new MessagePublishInfo(), new ContentHeaderBody(new 
BasicContentHeaderProperties()), 0);
-    private final long _size;
 
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
@@ -43,8 +42,8 @@ public class AMQMessage extends Abstract
 
     public AMQMessage(StoredMessage<MessageMetaData> handle, Object 
connectionReference)
     {
-        super(handle, connectionReference);
-        _size = handle.getMetaData().getContentSize();
+        super(handle, connectionReference, 
handle.getMetaData().getContentSize());
+        ;
     }
 
     public MessageMetaData getMessageMetaData()
@@ -88,11 +87,6 @@ public class AMQMessage extends Abstract
         return getMessageMetaData().getArrivalTime();
     }
 
-    public long getSize()
-    {
-        return _size;
-    }
-
     public boolean isImmediate()
     {
         return getMessagePublishInfo().isImmediate();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 Sat Nov 26 07:07:39 2016
@@ -388,29 +388,9 @@ public abstract class ConsumerTarget_0_8
 
     public void restoreCredit(final ServerMessage message)
     {
-        boolean hasCredit = _creditManager.hasCredit();
+        // This method is only called when the queue is restoring credit it 
allocated and then could not use
         _creditManager.restoreCredit(1, message.getSize());
-        if(_creditManager.hasCredit() != hasCredit)
-        {
-            if (hasCredit || 
!_creditManager.isNotBytesLimitedAndHighPrefetch())
-            {
-                _channel.updateAllConsumerNotifyWorkDesired();
-            }
-        }
-        else if (hasCredit)
-        {
-            if (_creditManager.isNotBytesLimitedAndHighPrefetch())
-            {
-                if (_creditManager.isCreditOverBatchLimit())
-                {
-                    _channel.updateAllConsumerNotifyWorkDesired();
-                }
-            }
-            else
-            {
-                notifyWork();
-            }
-        }
+        updateNotifyWorkDesired();
     }
 
     protected long sendToClient(final ConsumerImpl consumer, final 
ServerMessage message,
@@ -454,8 +434,6 @@ public abstract class ConsumerTarget_0_8
     {
         _unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
         _unacknowledgedCount.decrementAndGet();
-
-        restoreCredit(entry.getMessage());
     }
 
     @Override

Added: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java?rev=1771422&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
 (added)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
 Sat Nov 26 07:07:39 2016
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.consumer.ConsumerTarget;
+
+public interface CreditRestorer
+{
+    void restoreCredit(final ConsumerTarget target, final int count, final 
long size);
+
+}

Propchange: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
 Sat Nov 26 07:07:39 2016
@@ -69,7 +69,7 @@ public class ExtractResendAndRequeue imp
 
     public void visitComplete()
     {
-        _unacknowledgedMessageMap.clear();
+
     }
 
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
 Sat Nov 26 07:07:39 2016
@@ -26,5 +26,7 @@ public interface FlowCreditManager_0_8 e
 
     boolean isNotBytesLimitedAndHighPrefetch();
 
+    boolean isBytesLimited();
+
     boolean isCreditOverBatchLimit();
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
 Sat Nov 26 07:07:39 2016
@@ -49,6 +49,12 @@ public class InfiniteCreditCreditManager
     }
 
     @Override
+    public boolean isBytesLimited()
+    {
+        return false;
+    }
+
+    @Override
     public boolean isCreditOverBatchLimit()
     {
         return false;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
 Sat Nov 26 07:07:39 2016
@@ -121,6 +121,12 @@ class Pre0_10CreditManager implements Fl
     }
 
     @Override
+    public boolean isBytesLimited()
+    {
+        return _bytesCredit != 0;
+    }
+
+    @Override
     public boolean isCreditOverBatchLimit()
     {
         return _messageCredit > _batchLimit;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
 Sat Nov 26 07:07:39 2016
@@ -22,8 +22,8 @@ package org.apache.qpid.server.protocol.
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 
 
@@ -42,16 +42,12 @@ public interface UnacknowledgedMessageMa
 
     void visit(Visitor visitor);
 
-    void add(long deliveryTag, MessageInstance message);
+    void add(long deliveryTag, MessageInstance message, final ConsumerTarget 
target);
 
-    MessageInstance remove(long deliveryTag);
-
-    Collection<MessageInstance> cancelAllMessages();
+    MessageInstance remove(long deliveryTag, final boolean restoreCredit);
 
     int size();
 
-    void clear();
-
     MessageInstance get(long deliveryTag);
 
     Collection<MessageInstance> acknowledge(long deliveryTag, boolean 
multiple);

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
 Sat Nov 26 07:07:39 2016
@@ -27,19 +27,48 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
 {
-    private Map<Long, MessageInstance> _map;
+    private static final class MessageTargetPair
+    {
+        private final MessageInstance _messageInstance;
+        private final ConsumerTarget _target;
+
+        private MessageTargetPair(final MessageInstance messageInstance, final 
ConsumerTarget target)
+        {
+            _messageInstance = messageInstance;
+            _target = target;
+        }
+
+        public MessageInstance getMessageInstance()
+        {
+            return _messageInstance;
+        }
+
+        public ConsumerTarget getTarget()
+        {
+            return _target;
+        }
+
+        public long getSize()
+        {
+            return _messageInstance.getMessage().getSize();
+        }
+    }
+    private final Map<Long, MessageTargetPair> _map;
+    // we keep this separately as it is accessed by the management thread
     private volatile int _size;
 
-    private final int _prefetchLimit;
+    private CreditRestorer _creditRestorer;
 
-    UnacknowledgedMessageMapImpl(int prefetchLimit)
+    UnacknowledgedMessageMapImpl(int prefetchLimit, CreditRestorer 
creditRestorer)
     {
-        _prefetchLimit = prefetchLimit;
         _map = new LinkedHashMap<>(prefetchLimit);
+        _creditRestorer = creditRestorer;
     }
 
     public void collect(long deliveryTag, boolean multiple, Map<Long, 
MessageInstance> msgs)
@@ -63,43 +92,43 @@ class UnacknowledgedMessageMapImpl imple
     {
         for (Long deliveryTag : msgs.keySet())
         {
-            remove(deliveryTag);
+            remove(deliveryTag, true);
         }
     }
 
-    public MessageInstance remove(long deliveryTag)
+    public MessageInstance remove(long deliveryTag, final boolean 
restoreCredit)
     {
-        MessageInstance message = _map.remove(deliveryTag);
+        MessageTargetPair message = _map.remove(deliveryTag);
         if(message != null)
         {
             _size--;
+            if(restoreCredit)
+            {
+                _creditRestorer.restoreCredit(message.getTarget(), 1, 
message.getSize());
+            }
         }
-        return message;
+        return message.getMessageInstance();
     }
 
     public void visit(Visitor visitor)
     {
-        for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
+        for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
         {
-            visitor.callback(entry.getKey(), entry.getValue());
+            visitor.callback(entry.getKey(), 
entry.getValue().getMessageInstance());
         }
         visitor.visitComplete();
     }
 
-    public void add(long deliveryTag, MessageInstance message)
+    public void add(long deliveryTag, MessageInstance message, final 
ConsumerTarget target)
     {
-        if(_map.put(deliveryTag, message) == null)
+        if(_map.put(deliveryTag, new MessageTargetPair(message,target)) == 
null)
         {
             _size++;
         }
-    }
-
-    public Collection<MessageInstance> cancelAllMessages()
-    {
-        Collection<MessageInstance> currentEntries = _map.values();
-        _map = new LinkedHashMap<>(_prefetchLimit);
-        _size = 0;
-        return currentEntries;
+        else
+        {
+            throw new ConnectionScopedRuntimeException("Unexpected duplicate 
delivery tag created");
+        }
     }
 
     public int size()
@@ -107,15 +136,10 @@ class UnacknowledgedMessageMapImpl imple
         return _size;
     }
 
-    public void clear()
-    {
-        _map.clear();
-        _size = 0;
-    }
-
     public MessageInstance get(long key)
     {
-        return _map.get(key);
+        MessageTargetPair messageTargetPair = _map.get(key);
+        return messageTargetPair == null ? null : 
messageTargetPair.getMessageInstance();
     }
 
     public Collection<MessageInstance> acknowledge(long deliveryTag, boolean 
multiple)
@@ -138,7 +162,7 @@ class UnacknowledgedMessageMapImpl imple
         else
         {
             MessageInstance instance;
-            instance = remove(deliveryTag);
+            instance = remove(deliveryTag, true);
             if(instance != null && 
instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
             {
                 return Collections.singleton(instance);
@@ -153,9 +177,9 @@ class UnacknowledgedMessageMapImpl imple
 
     private void collect(long key, Map<Long, MessageInstance> msgs)
     {
-        for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
+        for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
         {
-            msgs.put(entry.getKey(), entry.getValue());
+            msgs.put(entry.getKey(), entry.getValue().getMessageInstance());
             if (entry.getKey() == key)
             {
                 break;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
 Sat Nov 26 07:07:39 2016
@@ -20,23 +20,25 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
 import org.apache.qpid.QpidException;
 import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due 
to a rollback.
@@ -69,13 +71,13 @@ public class ExtractResendAndRequeueTest
     public void setUp() throws QpidException
     {
         _queueDeleted = false;
-        _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+        _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100, 
mock(CreditRestorer.class));
         _queue = mock(Queue.class);
         when(_queue.getName()).thenReturn(getName());
         when(_queue.isDeleted()).thenReturn(_queueDeleted);
         _consumer = mock(ConsumerImpl.class);
         
when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
-
+        ConsumerTarget target = mock(ConsumerTarget.class);
 
         long id = 0;
 
@@ -98,7 +100,7 @@ public class ExtractResendAndRequeueTest
                 }
             }).when(entry).delete();
 
-            _unacknowledgedMessageMap.add(id, entry);
+            _unacknowledgedMessageMap.add(id, entry, target);
             _referenceList.add(entry);
             //Increment ID;
             id++;
@@ -147,7 +149,7 @@ public class ExtractResendAndRequeueTest
 
         assertEquals("Message count for resend not correct.", 
INITIAL_MSG_COUNT, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, 
msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, 
_unacknowledgedMessageMap.size());
+
     }
 
     /**
@@ -176,7 +178,6 @@ public class ExtractResendAndRequeueTest
 
         assertEquals("Message count for resend not correct.", 0, 
msgToResend.size());
         assertEquals("Message count for requeue not correct.", 
INITIAL_MSG_COUNT, msgToRequeue.size());
-        assertEquals("Map was not emptied", 0, 
_unacknowledgedMessageMap.size());
     }
 
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
 Sat Nov 26 07:07:39 2016
@@ -27,6 +27,7 @@ import java.util.Collection;
 
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class UnacknowledgedMessageMapTest extends QpidTestCase
@@ -35,7 +36,7 @@ public class UnacknowledgedMessageMapTes
 
     public void testDeletedMessagesCantBeAcknowledged()
     {
-        UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100);
+        UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100, 
mock(CreditRestorer.class));
         final int expectedSize = 5;
         MessageInstance[] msgs = populateMap(map,expectedSize);
         assertEquals(expectedSize,map.size());
@@ -47,7 +48,7 @@ public class UnacknowledgedMessageMapTes
             assertTrue("Message " + i + " is missing", 
acknowledged.contains(msgs[i]));
         }
 
-        map = new UnacknowledgedMessageMapImpl(100);
+        map = new UnacknowledgedMessageMapImpl(100, 
mock(CreditRestorer.class));
         msgs = populateMap(map,expectedSize);
         // simulate some messages being ttl expired
         
when(msgs[2].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
@@ -72,7 +73,7 @@ public class UnacknowledgedMessageMapTes
         for(int i = 0; i < size; i++)
         {
             msgs[i] = createMessageInstance(i);
-            map.add((long)i,msgs[i]);
+            map.add((long)i, msgs[i], null);
         }
         return msgs;
     }
@@ -82,6 +83,9 @@ public class UnacknowledgedMessageMapTes
         MessageInstance instance = mock(MessageInstance.class);
         
when(instance.makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.TRUE);
         when(instance.getAcquiringConsumer()).thenReturn(_consumer);
+        ServerMessage message = mock(ServerMessage.class);
+        when(message.getSize()).thenReturn(0L);
+        when(instance.getMessage()).thenReturn(message);
         return instance;
     }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 Sat Nov 26 07:07:39 2016
@@ -42,20 +42,16 @@ public class Message_1_0 extends Abstrac
     public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new 
MessageMetaData_1_0(Collections.<Section>emptyList(), new 
SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
 
     private long _arrivalTime;
-    private final long _size;
-
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
-        super(storedMessage, null);
-        _size = storedMessage.getMetaData().getContentSize();
+        super(storedMessage, null, 
storedMessage.getMetaData().getContentSize());
     }
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
                        final Object connectionReference)
     {
-        super(storedMessage, connectionReference);
-        _size = storedMessage.getMetaData().getContentSize();
+        super(storedMessage, connectionReference, 
storedMessage.getMetaData().getContentSize());
         _arrivalTime = System.currentTimeMillis();
     }
 
@@ -83,11 +79,6 @@ public class Message_1_0 extends Abstrac
         return getMessageMetaData().getMessageHeader();
     }
 
-    public long getSize()
-    {
-        return _size;
-    }
-
     public long getExpiration()
     {
         return getMessageHeader().getExpiration();

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
 Sat Nov 26 07:07:39 2016
@@ -438,8 +438,12 @@ public class LastValueQueueTest extends
         int numberOfMessagesReceived = 0;
         while(numberOfShutdownsReceived < 2)
         {
-            message = _consumer.receive(10000);
-            assertNotNull(message);
+            message = _consumer.receive(5000);
+            if(message == null)
+            {
+                System.err.println("here's a good place for a breakpoint");
+            }
+            assertNotNull("null recieved after " + numberOfMessagesReceived + 
" messages and " + numberOfShutdownsReceived + " shutdowns", message);
 
             if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
             {




---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to