Author: rgodfrey
Date: Sat Nov 26 07:07:39 2016
New Revision: 1771422
URL: http://svn.apache.org/viewvc?rev=1771422&view=rev
Log:
QPID-7425 : Credit should be restored only when the client explicitly
acknowledges the delivery
Added:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
(with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
Sat Nov 26 07:07:39 2016
@@ -42,6 +42,7 @@ public abstract class AbstractServerMess
private static final
AtomicReferenceFieldUpdater<AbstractServerMessageImpl, Collection>
_resourcesUpdater =
AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
Collection.class,"_resources");
+ private final long _size;
private volatile int _referenceCount = 0;
@@ -50,10 +51,17 @@ public abstract class AbstractServerMess
private volatile Collection<UUID> _resources;
- public AbstractServerMessageImpl(StoredMessage<T> handle, Object
connectionReference)
+ public AbstractServerMessageImpl(StoredMessage<T> handle, Object
connectionReference, final long size)
{
_handle = handle;
_connectionReference = connectionReference;
+ _size = size;
+ }
+
+ @Override
+ public long getSize()
+ {
+ return _size;
}
public StoredMessage<T> getStoredMessage()
@@ -188,7 +196,9 @@ public abstract class AbstractServerMess
final public Object getConnectionReference()
{
return _connectionReference;
- }public String toString()
+ }
+
+ public String toString()
{
return "Message[" + debugIdentity() + "]";
}
@@ -200,7 +210,7 @@ public abstract class AbstractServerMess
private static final AtomicIntegerFieldUpdater<Reference>
_releasedUpdater =
AtomicIntegerFieldUpdater.newUpdater(Reference.class,
"_released");
- private AbstractServerMessageImpl<X, T> _message;
+ private final AbstractServerMessageImpl<X, T> _message;
private final UUID _resourceId;
private volatile int _released;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
Sat Nov 26 07:07:39 2016
@@ -45,7 +45,6 @@ import org.apache.qpid.util.ByteBufferUt
public class InternalMessage extends
AbstractServerMessageImpl<InternalMessage, InternalMessageMetaData>
{
private final Object _messageBody;
- private final int _contentSize;
private InternalMessageHeader _header;
private String _initialRoutingAddress;
@@ -54,18 +53,16 @@ public class InternalMessage extends Abs
final InternalMessageHeader header,
final Object messageBody)
{
- super(handle, null);
+ super(handle, null, handle.getMetaData().getContentSize());
_header = header;
_messageBody = messageBody;
- _contentSize = handle.getMetaData().getContentSize();
}
InternalMessage(final StoredMessage<InternalMessageMetaData> msg)
{
- super(msg, null);
- _contentSize = msg.getMetaData().getContentSize();
+ super(msg, null, msg.getMetaData().getContentSize());
_header = msg.getMetaData().getHeader();
- Collection<QpidByteBuffer> bufs = msg.getContent(0, _contentSize);
+ Collection<QpidByteBuffer> bufs = msg.getContent(0, (int)getSize());
try(ObjectInputStream is = new ObjectInputStream(new
ByteBufferInputStream(ByteBufferUtils.combine(bufs))))
{
@@ -96,12 +93,6 @@ public class InternalMessage extends Abs
}
@Override
- public long getSize()
- {
- return _contentSize;
- }
-
- @Override
public long getExpiration()
{
return _header.getExpiration();
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/message/AbstractServerMessageTest.java
Sat Nov 26 07:07:39 2016
@@ -38,7 +38,7 @@ public class AbstractServerMessageTest e
public TestMessage(final StoredMessage<T> handle,
final Object connectionReference)
{
- super(handle, connectionReference);
+ super(handle, connectionReference, 0);
}
@Override
@@ -54,12 +54,6 @@ public class AbstractServerMessageTest e
}
@Override
- public long getSize()
- {
- return 0;
- }
-
- @Override
public long getExpiration()
{
return 0;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
Sat Nov 26 07:07:39 2016
@@ -33,12 +33,10 @@ public class MessageTransferMessage exte
{
private final static MessageMetaData_0_10 DELETED_MESSAGE_METADATA = new
MessageMetaData_0_10(null, 0, 0);
- private final long _size;
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10>
storeMessage, Object connectionRef)
{
- super(storeMessage, connectionRef);
- _size = storeMessage.getMetaData().getSize();
+ super(storeMessage, connectionRef,
storeMessage.getMetaData().getSize());
}
private MessageMetaData_0_10 getMetaData()
@@ -58,10 +56,6 @@ public class MessageTransferMessage exte
return getMetaData().getMessageHeader();
}
- public long getSize()
- {
- return _size;
- }
public boolean isImmediate()
{
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Sat Nov 26 07:07:39 2016
@@ -106,7 +106,7 @@ public class AMQChannel
implements AMQSessionModel<AMQChannel>,
AsyncAutoCommitTransaction.FutureRecorder,
ServerChannelMethodProcessor,
- EventLoggerProvider
+ EventLoggerProvider, CreditRestorer
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -156,7 +156,7 @@ public class AMQChannel
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new
LinkedList<AsyncCommand>();
- private UnacknowledgedMessageMap _unacknowledgedMessageMap = new
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
+ private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -224,7 +224,7 @@ public class AMQChannel
_creditManager = new Pre0_10CreditManager(0L, 0L,
connection.getContextValue(Long.class, AMQPConnection_0_8.HIGH_PREFETCH_LIMIT),
connection.getContextValue(Long.class, AMQPConnection_0_8.BATCH_LIMIT));
-
+ _unacknowledgedMessageMap = new
UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH, this);
_connection = connection;
_channelId = channelId;
@@ -949,7 +949,7 @@ public class AMQChannel
+ ") for " + consumer + " on " +
entry.getOwningResource().getName());
}
- _unacknowledgedMessageMap.add(deliveryTag, entry);
+ _unacknowledgedMessageMap.add(deliveryTag, entry, (ConsumerTarget_0_8)
consumer.getTarget());
}
@@ -967,23 +967,40 @@ public class AMQChannel
*/
private void requeue()
{
+ final Map<Long, MessageInstance> unackedMapCopy = new
LinkedHashMap<>();
// we must create a new map since all the messages will get a new
delivery tag when they are redelivered
- Collection<MessageInstance> messagesToBeDelivered =
_unacknowledgedMessageMap.cancelAllMessages();
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ @Override
+ public boolean callback(final long deliveryTag, final
MessageInstance message)
+ {
+ unackedMapCopy.put(deliveryTag, message);
+ return false;
+ }
+
+ @Override
+ public void visitComplete()
+ {
- if (!messagesToBeDelivered.isEmpty())
+ }
+ });
+
+ if (!unackedMapCopy.isEmpty())
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Requeuing " + messagesToBeDelivered.size() + "
unacked messages. for " + toString());
+ _logger.debug("Requeuing " + unackedMapCopy.size() + " unacked
messages. for " + toString());
}
}
- for (MessageInstance unacked : messagesToBeDelivered)
+ for (Map.Entry<Long,MessageInstance> entry : unackedMapCopy.entrySet())
{
+ MessageInstance unacked = entry.getValue();
// Mark message redelivered
unacked.setRedelivered();
-
+ // here we wish to restore credit
+ _unacknowledgedMessageMap.remove(entry.getKey(), true);
// Ensure message is released for redelivery
unacked.release(unacked.getAcquiringConsumer());
}
@@ -998,7 +1015,7 @@ public class AMQChannel
*/
public void requeue(long deliveryTag)
{
- MessageInstance unacked =
_unacknowledgedMessageMap.remove(deliveryTag);
+ MessageInstance unacked =
_unacknowledgedMessageMap.remove(deliveryTag, true);
if (unacked != null)
{
@@ -1108,7 +1125,12 @@ public class AMQChannel
// all messages in the unacked map as redelivered.
message.setRedelivered();
- if (!resendInternal(message))
+ if (resendInternal(message))
+ {
+ // remove from unacked map - don't want to restore credit
though(!)
+ _unacknowledgedMessageMap.remove(deliveryTag, false);
+ }
+ else
{
msgToRequeue.put(deliveryTag, message);
}
@@ -1132,7 +1154,8 @@ public class AMQChannel
//Amend the delivery counter as the client hasn't seen these
messages yet.
message.decrementDeliveryCount();
- _unacknowledgedMessageMap.remove(deliveryTag);
+ // here we do wish to restore credit
+ _unacknowledgedMessageMap.remove(deliveryTag, true);
message.setRedelivered();
message.release(message.getAcquiringConsumer());
@@ -1742,7 +1765,7 @@ public class AMQChannel
private void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap =
getUnacknowledgedMessageMap();
- final MessageInstance rejectedQueueEntry =
unackedMap.remove(deliveryTag);
+ final MessageInstance rejectedQueueEntry =
unackedMap.remove(deliveryTag, true);
if (rejectedQueueEntry == null)
{
@@ -3761,6 +3784,34 @@ public class AMQChannel
}
}
+ @Override
+ public void restoreCredit(final ConsumerTarget target, final int count,
final long size)
+ {
+ boolean hasCredit = _creditManager.hasCredit();
+ _creditManager.restoreCredit(count, size);
+ if(_creditManager.hasCredit() != hasCredit)
+ {
+ if (hasCredit ||
!_creditManager.isNotBytesLimitedAndHighPrefetch())
+ {
+ updateAllConsumerNotifyWorkDesired();
+ }
+ }
+ else if (hasCredit)
+ {
+ if (_creditManager.isNotBytesLimitedAndHighPrefetch())
+ {
+ if (_creditManager.isCreditOverBatchLimit())
+ {
+ updateAllConsumerNotifyWorkDesired();
+ }
+ }
+ else if(_creditManager.isBytesLimited())
+ {
+ target.notifyWork();
+ }
+ }
+ }
+
private Collection<ConsumerTarget_0_8> getConsumerTargets()
{
return _tag2SubscriptionTargetMap.values();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
Sat Nov 26 07:07:39 2016
@@ -34,7 +34,6 @@ import org.apache.qpid.server.store.Stor
public class AMQMessage extends AbstractServerMessageImpl<AMQMessage,
MessageMetaData>
{
private static final MessageMetaData DELETED_MESSAGE_METADATA = new
MessageMetaData(new MessagePublishInfo(), new ContentHeaderBody(new
BasicContentHeaderProperties()), 0);
- private final long _size;
public AMQMessage(StoredMessage<MessageMetaData> handle)
{
@@ -43,8 +42,8 @@ public class AMQMessage extends Abstract
public AMQMessage(StoredMessage<MessageMetaData> handle, Object
connectionReference)
{
- super(handle, connectionReference);
- _size = handle.getMetaData().getContentSize();
+ super(handle, connectionReference,
handle.getMetaData().getContentSize());
+ ;
}
public MessageMetaData getMessageMetaData()
@@ -88,11 +87,6 @@ public class AMQMessage extends Abstract
return getMessageMetaData().getArrivalTime();
}
- public long getSize()
- {
- return _size;
- }
-
public boolean isImmediate()
{
return getMessagePublishInfo().isImmediate();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Sat Nov 26 07:07:39 2016
@@ -388,29 +388,9 @@ public abstract class ConsumerTarget_0_8
public void restoreCredit(final ServerMessage message)
{
- boolean hasCredit = _creditManager.hasCredit();
+ // This method is only called when the queue is restoring credit it
allocated and then could not use
_creditManager.restoreCredit(1, message.getSize());
- if(_creditManager.hasCredit() != hasCredit)
- {
- if (hasCredit ||
!_creditManager.isNotBytesLimitedAndHighPrefetch())
- {
- _channel.updateAllConsumerNotifyWorkDesired();
- }
- }
- else if (hasCredit)
- {
- if (_creditManager.isNotBytesLimitedAndHighPrefetch())
- {
- if (_creditManager.isCreditOverBatchLimit())
- {
- _channel.updateAllConsumerNotifyWorkDesired();
- }
- }
- else
- {
- notifyWork();
- }
- }
+ updateNotifyWorkDesired();
}
protected long sendToClient(final ConsumerImpl consumer, final
ServerMessage message,
@@ -454,8 +434,6 @@ public abstract class ConsumerTarget_0_8
{
_unacknowledgedBytes.addAndGet(-entry.getMessage().getSize());
_unacknowledgedCount.decrementAndGet();
-
- restoreCredit(entry.getMessage());
}
@Override
Added:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java?rev=1771422&view=auto
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
(added)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
Sat Nov 26 07:07:39 2016
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import org.apache.qpid.server.consumer.ConsumerTarget;
+
+public interface CreditRestorer
+{
+ void restoreCredit(final ConsumerTarget target, final int count, final
long size);
+
+}
Propchange:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/CreditRestorer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
Sat Nov 26 07:07:39 2016
@@ -69,7 +69,7 @@ public class ExtractResendAndRequeue imp
public void visitComplete()
{
- _unacknowledgedMessageMap.clear();
+
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/FlowCreditManager_0_8.java
Sat Nov 26 07:07:39 2016
@@ -26,5 +26,7 @@ public interface FlowCreditManager_0_8 e
boolean isNotBytesLimitedAndHighPrefetch();
+ boolean isBytesLimited();
+
boolean isCreditOverBatchLimit();
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/InfiniteCreditCreditManager.java
Sat Nov 26 07:07:39 2016
@@ -49,6 +49,12 @@ public class InfiniteCreditCreditManager
}
@Override
+ public boolean isBytesLimited()
+ {
+ return false;
+ }
+
+ @Override
public boolean isCreditOverBatchLimit()
{
return false;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
Sat Nov 26 07:07:39 2016
@@ -121,6 +121,12 @@ class Pre0_10CreditManager implements Fl
}
@Override
+ public boolean isBytesLimited()
+ {
+ return _bytesCredit != 0;
+ }
+
+ @Override
public boolean isCreditOverBatchLimit()
{
return _messageCredit > _batchLimit;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
Sat Nov 26 07:07:39 2016
@@ -22,8 +22,8 @@ package org.apache.qpid.server.protocol.
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
@@ -42,16 +42,12 @@ public interface UnacknowledgedMessageMa
void visit(Visitor visitor);
- void add(long deliveryTag, MessageInstance message);
+ void add(long deliveryTag, MessageInstance message, final ConsumerTarget
target);
- MessageInstance remove(long deliveryTag);
-
- Collection<MessageInstance> cancelAllMessages();
+ MessageInstance remove(long deliveryTag, final boolean restoreCredit);
int size();
- void clear();
-
MessageInstance get(long deliveryTag);
Collection<MessageInstance> acknowledge(long deliveryTag, boolean
multiple);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
Sat Nov 26 07:07:39 2016
@@ -27,19 +27,48 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
- private Map<Long, MessageInstance> _map;
+ private static final class MessageTargetPair
+ {
+ private final MessageInstance _messageInstance;
+ private final ConsumerTarget _target;
+
+ private MessageTargetPair(final MessageInstance messageInstance, final
ConsumerTarget target)
+ {
+ _messageInstance = messageInstance;
+ _target = target;
+ }
+
+ public MessageInstance getMessageInstance()
+ {
+ return _messageInstance;
+ }
+
+ public ConsumerTarget getTarget()
+ {
+ return _target;
+ }
+
+ public long getSize()
+ {
+ return _messageInstance.getMessage().getSize();
+ }
+ }
+ private final Map<Long, MessageTargetPair> _map;
+ // we keep this separately as it is accessed by the management thread
private volatile int _size;
- private final int _prefetchLimit;
+ private CreditRestorer _creditRestorer;
- UnacknowledgedMessageMapImpl(int prefetchLimit)
+ UnacknowledgedMessageMapImpl(int prefetchLimit, CreditRestorer
creditRestorer)
{
- _prefetchLimit = prefetchLimit;
_map = new LinkedHashMap<>(prefetchLimit);
+ _creditRestorer = creditRestorer;
}
public void collect(long deliveryTag, boolean multiple, Map<Long,
MessageInstance> msgs)
@@ -63,43 +92,43 @@ class UnacknowledgedMessageMapImpl imple
{
for (Long deliveryTag : msgs.keySet())
{
- remove(deliveryTag);
+ remove(deliveryTag, true);
}
}
- public MessageInstance remove(long deliveryTag)
+ public MessageInstance remove(long deliveryTag, final boolean
restoreCredit)
{
- MessageInstance message = _map.remove(deliveryTag);
+ MessageTargetPair message = _map.remove(deliveryTag);
if(message != null)
{
_size--;
+ if(restoreCredit)
+ {
+ _creditRestorer.restoreCredit(message.getTarget(), 1,
message.getSize());
+ }
}
- return message;
+ return message.getMessageInstance();
}
public void visit(Visitor visitor)
{
- for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
{
- visitor.callback(entry.getKey(), entry.getValue());
+ visitor.callback(entry.getKey(),
entry.getValue().getMessageInstance());
}
visitor.visitComplete();
}
- public void add(long deliveryTag, MessageInstance message)
+ public void add(long deliveryTag, MessageInstance message, final
ConsumerTarget target)
{
- if(_map.put(deliveryTag, message) == null)
+ if(_map.put(deliveryTag, new MessageTargetPair(message,target)) ==
null)
{
_size++;
}
- }
-
- public Collection<MessageInstance> cancelAllMessages()
- {
- Collection<MessageInstance> currentEntries = _map.values();
- _map = new LinkedHashMap<>(_prefetchLimit);
- _size = 0;
- return currentEntries;
+ else
+ {
+ throw new ConnectionScopedRuntimeException("Unexpected duplicate
delivery tag created");
+ }
}
public int size()
@@ -107,15 +136,10 @@ class UnacknowledgedMessageMapImpl imple
return _size;
}
- public void clear()
- {
- _map.clear();
- _size = 0;
- }
-
public MessageInstance get(long key)
{
- return _map.get(key);
+ MessageTargetPair messageTargetPair = _map.get(key);
+ return messageTargetPair == null ? null :
messageTargetPair.getMessageInstance();
}
public Collection<MessageInstance> acknowledge(long deliveryTag, boolean
multiple)
@@ -138,7 +162,7 @@ class UnacknowledgedMessageMapImpl imple
else
{
MessageInstance instance;
- instance = remove(deliveryTag);
+ instance = remove(deliveryTag, true);
if(instance != null &&
instance.makeAcquisitionUnstealable(instance.getAcquiringConsumer()))
{
return Collections.singleton(instance);
@@ -153,9 +177,9 @@ class UnacknowledgedMessageMapImpl imple
private void collect(long key, Map<Long, MessageInstance> msgs)
{
- for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageTargetPair> entry : _map.entrySet())
{
- msgs.put(entry.getKey(), entry.getValue());
+ msgs.put(entry.getKey(), entry.getValue().getMessageInstance());
if (entry.getKey() == key)
{
break;
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
Sat Nov 26 07:07:39 2016
@@ -20,23 +20,25 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.qpid.QpidException;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
/**
* QPID-1385 : Race condition between added to unacked map and resending due
to a rollback.
@@ -69,13 +71,13 @@ public class ExtractResendAndRequeueTest
public void setUp() throws QpidException
{
_queueDeleted = false;
- _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
+ _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100,
mock(CreditRestorer.class));
_queue = mock(Queue.class);
when(_queue.getName()).thenReturn(getName());
when(_queue.isDeleted()).thenReturn(_queueDeleted);
_consumer = mock(ConsumerImpl.class);
when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement());
-
+ ConsumerTarget target = mock(ConsumerTarget.class);
long id = 0;
@@ -98,7 +100,7 @@ public class ExtractResendAndRequeueTest
}
}).when(entry).delete();
- _unacknowledgedMessageMap.add(id, entry);
+ _unacknowledgedMessageMap.add(id, entry, target);
_referenceList.add(entry);
//Increment ID;
id++;
@@ -147,7 +149,7 @@ public class ExtractResendAndRequeueTest
assertEquals("Message count for resend not correct.",
INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0,
msgToRequeue.size());
- assertEquals("Map was not emptied", 0,
_unacknowledgedMessageMap.size());
+
}
/**
@@ -176,7 +178,6 @@ public class ExtractResendAndRequeueTest
assertEquals("Message count for resend not correct.", 0,
msgToResend.size());
assertEquals("Message count for requeue not correct.",
INITIAL_MSG_COUNT, msgToRequeue.size());
- assertEquals("Map was not emptied", 0,
_unacknowledgedMessageMap.size());
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapTest.java
Sat Nov 26 07:07:39 2016
@@ -27,6 +27,7 @@ import java.util.Collection;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.test.utils.QpidTestCase;
public class UnacknowledgedMessageMapTest extends QpidTestCase
@@ -35,7 +36,7 @@ public class UnacknowledgedMessageMapTes
public void testDeletedMessagesCantBeAcknowledged()
{
- UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100);
+ UnacknowledgedMessageMap map = new UnacknowledgedMessageMapImpl(100,
mock(CreditRestorer.class));
final int expectedSize = 5;
MessageInstance[] msgs = populateMap(map,expectedSize);
assertEquals(expectedSize,map.size());
@@ -47,7 +48,7 @@ public class UnacknowledgedMessageMapTes
assertTrue("Message " + i + " is missing",
acknowledged.contains(msgs[i]));
}
- map = new UnacknowledgedMessageMapImpl(100);
+ map = new UnacknowledgedMessageMapImpl(100,
mock(CreditRestorer.class));
msgs = populateMap(map,expectedSize);
// simulate some messages being ttl expired
when(msgs[2].makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.FALSE);
@@ -72,7 +73,7 @@ public class UnacknowledgedMessageMapTes
for(int i = 0; i < size; i++)
{
msgs[i] = createMessageInstance(i);
- map.add((long)i,msgs[i]);
+ map.add((long)i, msgs[i], null);
}
return msgs;
}
@@ -82,6 +83,9 @@ public class UnacknowledgedMessageMapTes
MessageInstance instance = mock(MessageInstance.class);
when(instance.makeAcquisitionUnstealable(_consumer)).thenReturn(Boolean.TRUE);
when(instance.getAcquiringConsumer()).thenReturn(_consumer);
+ ServerMessage message = mock(ServerMessage.class);
+ when(message.getSize()).thenReturn(0L);
+ when(instance.getMessage()).thenReturn(message);
return instance;
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
Sat Nov 26 07:07:39 2016
@@ -42,20 +42,16 @@ public class Message_1_0 extends Abstrac
public static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new
MessageMetaData_1_0(Collections.<Section>emptyList(), new
SectionEncoderImpl(DESCRIBED_TYPE_REGISTRY));
private long _arrivalTime;
- private final long _size;
-
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
- super(storedMessage, null);
- _size = storedMessage.getMetaData().getContentSize();
+ super(storedMessage, null,
storedMessage.getMetaData().getContentSize());
}
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final Object connectionReference)
{
- super(storedMessage, connectionReference);
- _size = storedMessage.getMetaData().getContentSize();
+ super(storedMessage, connectionReference,
storedMessage.getMetaData().getContentSize());
_arrivalTime = System.currentTimeMillis();
}
@@ -83,11 +79,6 @@ public class Message_1_0 extends Abstrac
return getMessageMetaData().getMessageHeader();
}
- public long getSize()
- {
- return _size;
- }
-
public long getExpiration()
{
return getMessageHeader().getExpiration();
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java?rev=1771422&r1=1771421&r2=1771422&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
Sat Nov 26 07:07:39 2016
@@ -438,8 +438,12 @@ public class LastValueQueueTest extends
int numberOfMessagesReceived = 0;
while(numberOfShutdownsReceived < 2)
{
- message = _consumer.receive(10000);
- assertNotNull(message);
+ message = _consumer.receive(5000);
+ if(message == null)
+ {
+ System.err.println("here's a good place for a breakpoint");
+ }
+ assertNotNull("null recieved after " + numberOfMessagesReceived +
" messages and " + numberOfShutdownsReceived + " shutdowns", message);
if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]