Repository: qpid-jms
Updated Branches:
  refs/heads/master abc75a35e -> 24d03437f


QPIDJMS-420 Use local tracking in AmqpConsumer for prefetch refill

Track the number of message that are currently in the prefetch buffer using
local counters that mesh well with the already added delivered message counts
to reduce access to synchronized code in the message queue.  Reduce code paths
where a message settlement might be missed or credit not replenished on error.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/24d03437
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/24d03437
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/24d03437

Branch: refs/heads/master
Commit: 24d03437fb1695b2096989c3561007312d4169ee
Parents: abc75a3
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Oct 30 15:48:26 2018 -0400
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Tue Oct 30 15:48:26 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |   2 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   2 +-
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   |  11 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 156 +++++++++++--------
 .../qpid/jms/meta/JmsConsumerInfoTest.java      |  32 ++--
 .../jms/meta/JmsDefaultResourceVisitorTest.java |   2 +-
 .../amqp/AmqpSubscriptionTrackerTest.java       |   2 +-
 .../provider/amqp/message/AmqpCodecTest.java    |   2 +-
 .../message/AmqpJmsMessageTypesTestCase.java    |   2 +-
 .../failover/FailoverProviderTestSupport.java   |   2 +-
 10 files changed, 113 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index b4e2682..da5f704 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -471,7 +471,7 @@ public class JmsConnection implements AutoCloseable, 
Connection, TopicConnection
             messageQueue = new FifoMessageQueue(configuredPrefetch);
         }
 
-        JmsConsumerInfo consumerInfo = new 
JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue, null);
+        JmsConsumerInfo consumerInfo = new 
JmsConsumerInfo(getNextConnectionConsumerId(), null);
         consumerInfo.setExplicitClientID(isExplicitClientID());
         consumerInfo.setSelector(messageSelector);
         consumerInfo.setDurable(durable);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 5662b2e..d616faf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -100,7 +100,7 @@ public class JmsMessageConsumer implements AutoCloseable, 
MessageConsumer, JmsMe
             this.messageQueue = new FifoMessageQueue(configuredPrefetch);
         }
 
-        consumerInfo = new JmsConsumerInfo(consumerId, messageQueue, this);
+        consumerInfo = new JmsConsumerInfo(consumerId, this);
         consumerInfo.setExplicitClientID(connection.isExplicitClientID());
         consumerInfo.setSelector(selector);
         consumerInfo.setDurable(isDurableSubscription());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index 7addbb7..edb928a 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -22,7 +22,6 @@ import 
org.apache.qpid.jms.policy.JmsDefaultDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
-import org.apache.qpid.jms.util.MessageQueue;
 
 public final class JmsConsumerInfo extends JmsAbstractResource implements 
Comparable<JmsConsumerInfo> {
 
@@ -42,7 +41,6 @@ public final class JmsConsumerInfo extends 
JmsAbstractResource implements Compar
     private boolean connectionConsumer;
     private int maxMessages;
     private volatile boolean listener;
-    private final MessageQueue messageQueue;
 
     private JmsRedeliveryPolicy redeliveryPolicy;
     private JmsDeserializationPolicy deserializationPolicy;
@@ -52,17 +50,16 @@ public final class JmsConsumerInfo extends 
JmsAbstractResource implements Compar
 
     private final JmsMessageDispatcher dispatcher;
 
-    public JmsConsumerInfo(JmsConsumerId consumerId, MessageQueue 
messageQueue, JmsMessageDispatcher dispatcher) {
+    public JmsConsumerInfo(JmsConsumerId consumerId, JmsMessageDispatcher 
dispatcher) {
         if (consumerId == null) {
             throw new IllegalArgumentException("Consumer ID cannot be null");
         }
         this.consumerId = consumerId;
-        this.messageQueue = messageQueue;
         this.dispatcher = dispatcher;
     }
 
     public JmsConsumerInfo copy() {
-        JmsConsumerInfo info = new JmsConsumerInfo(consumerId, messageQueue, 
dispatcher);
+        JmsConsumerInfo info = new JmsConsumerInfo(consumerId, dispatcher);
         copy(info);
         return info;
     }
@@ -86,10 +83,6 @@ public final class JmsConsumerInfo extends 
JmsAbstractResource implements Compar
         info.maxMessages = maxMessages;
     }
 
-    public int getPrefetchedMessageCount() {
-        return messageQueue.size();
-    }
-
     @Override
     public JmsConsumerId getId() {
         return consumerId;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index d2d7c39..7edbaa7 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -58,7 +58,8 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
     protected AsyncResult stopRequest;
     protected AsyncResult pullRequest;
     protected long incomingSequence;
-    protected long deliveredCount;
+    protected int deliveredCount;
+    protected int dispatchedCount;
     protected boolean deferredClose;
 
     public AmqpConsumer(AmqpSession session, JmsConsumerInfo info, Receiver 
receiver) {
@@ -206,28 +207,29 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
 
             JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) 
current.getContext();
             if (envelope.isDelivered()) {
+                final DeliveryState disposition;
+
                 switch (ackType) {
                     case ACCEPTED:
-                        current.disposition(Accepted.getInstance());
+                        disposition = Accepted.getInstance();
                         break;
                     case RELEASED:
-                        current.disposition(Released.getInstance());
+                        disposition = Released.getInstance();
                         break;
                     case REJECTED:
-                        current.disposition(REJECTED);
+                        disposition = REJECTED;
                         break;
                     case MODIFIED_FAILED:
-                        current.disposition(MODIFIED_FAILED);
+                        disposition = MODIFIED_FAILED;
                         break;
                     case MODIFIED_FAILED_UNDELIVERABLE:
-                        current.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+                        disposition = MODIFIED_FAILED_UNDELIVERABLE;
                         break;
                     default:
                         throw new IllegalArgumentException("Invalid 
acknowledgement type specified: " + ackType);
                 }
 
-                current.settle();
-                deliveredCount--;
+                handleDisposition(envelope, current, disposition);
             }
         }
 
@@ -252,60 +254,78 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             return;
         }
 
-        if (ackType.equals(ACK_TYPE.DELIVERED)) {
-            LOG.debug("Delivered Ack of message: {}", envelope);
-            deliveredCount++;
-            envelope.setDelivered(true);
-            delivery.setDefaultDeliveryState(MODIFIED_FAILED);
-            sendFlowIfNeeded();
-            return;
-        } else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
-            // A Consumer may not always send a DELIVERED ack so we need to
-            // check to ensure we don't add too much credit to the link.
-            if (!envelope.isDelivered()) {
-                sendFlowIfNeeded();
-            }
-            LOG.debug("Accepted Ack of message: {}", envelope);
-            if (!delivery.remotelySettled()) {
-                if (session.isTransacted() && !getResourceInfo().isBrowser()) {
+        switch (ackType) {
+            case DELIVERED:
+                handleDelivered(envelope, delivery);
+                break;
+            case ACCEPTED:
+                handleAccepted(envelope, delivery);
+                break;
+            case REJECTED:
+                handleDisposition(envelope, delivery, REJECTED);
+                break;
+            case RELEASED:
+                handleDisposition(envelope, delivery, Released.getInstance());
+                break;
+            case MODIFIED_FAILED:
+                handleDisposition(envelope, delivery, MODIFIED_FAILED);
+                break;
+            case MODIFIED_FAILED_UNDELIVERABLE:
+                handleDisposition(envelope, delivery, 
MODIFIED_FAILED_UNDELIVERABLE);
+                break;
+            default:
+                LOG.warn("Unsupported Ack Type for message: {}", envelope);
+                throw new IllegalArgumentException("Unknown Acknowledgement 
type");
+        }
 
-                    if (session.isTransactionFailed()) {
-                        LOG.trace("Skipping ack of message {} in failed 
transaction.", envelope);
-                        return;
-                    }
+        sendFlowIfNeeded();
+        tryCompleteDeferredClose();
+    }
 
-                    Binary txnId = 
session.getTransactionContext().getAmqpTransactionId();
-                    if (txnId != null) {
-                        
delivery.disposition(session.getTransactionContext().getTxnAcceptState());
-                        delivery.settle();
-                        
session.getTransactionContext().registerTxConsumer(this);
-                    }
-                } else {
-                    delivery.disposition(Accepted.getInstance());
+    private void handleDelivered(JmsInboundMessageDispatch envelope, Delivery 
delivery) {
+        LOG.debug("Delivered Ack of message: {}", envelope);
+        deliveredCount++;
+        envelope.setDelivered(true);
+        delivery.setDefaultDeliveryState(MODIFIED_FAILED);
+    }
+
+    private void handleAccepted(JmsInboundMessageDispatch envelope, Delivery 
delivery) {
+        LOG.debug("Accepted Ack of message: {}", envelope);
+        if (!delivery.remotelySettled()) {
+            if (session.isTransacted() && !getResourceInfo().isBrowser()) {
+
+                if (session.isTransactionFailed()) {
+                    LOG.trace("Skipping ack of message {} in failed 
transaction.", envelope);
+                    return;
+                }
+
+                Binary txnId = 
session.getTransactionContext().getAmqpTransactionId();
+                if (txnId != null) {
+                    
delivery.disposition(session.getTransactionContext().getTxnAcceptState());
                     delivery.settle();
+                    session.getTransactionContext().registerTxConsumer(this);
                 }
             } else {
+                delivery.disposition(Accepted.getInstance());
                 delivery.settle();
             }
-        } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED)) {
-            settleDelivery(delivery, MODIFIED_FAILED);
-        } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
-            settleDelivery(delivery, MODIFIED_FAILED_UNDELIVERABLE);
-        } else if (ackType.equals(ACK_TYPE.REJECTED)) {
-            settleDelivery(delivery, REJECTED);
-        } else if (ackType.equals(ACK_TYPE.RELEASED)) {
-            delivery.disposition(Released.getInstance());
-            delivery.settle();
         } else {
-            LOG.warn("Unsupported Ack Type for message: {}", envelope);
-            return;
+            delivery.settle();
         }
 
         if (envelope.isDelivered()) {
             deliveredCount--;
         }
+        dispatchedCount--;
+    }
 
-        tryCompleteDeferredClose();
+    private void handleDisposition(JmsInboundMessageDispatch envelope, 
Delivery delivery, DeliveryState outcome) {
+        delivery.disposition(outcome);
+        delivery.settle();
+        if (envelope.isDelivered()) {
+            deliveredCount--;
+        }
+        dispatchedCount--;
     }
 
     /**
@@ -325,11 +345,10 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
 
         int currentCredit = getEndpoint().getCredit();
         if (currentCredit <= prefetchSize * 0.5) {
-            int prefetchedMessageCount = 
getResourceInfo().getPrefetchedMessageCount();
+            int potentialPrefetch = currentCredit + (dispatchedCount - 
deliveredCount);
 
-            int potentialPrefetch = currentCredit + prefetchedMessageCount;
             if (potentialPrefetch <= prefetchSize * 0.7) {
-                int additionalCredit = prefetchSize - currentCredit - 
prefetchedMessageCount;
+                int additionalCredit = prefetchSize - potentialPrefetch;
 
                 LOG.trace("Consumer {} granting additional credit: {}", 
getConsumerId(), additionalCredit);
                 getEndpoint().flow(additionalCredit);
@@ -377,6 +396,12 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
         }
 
+        // Previously delivered messages should be tagged as dispatched 
messages again so we
+        // can properly compute the next credit refresh, so subtract them from 
both the delivered
+        // and dispatched counts and then dispatch them again as a new message.
+        deliveredCount -= redispatchList.size();
+        dispatchedCount -= redispatchList.size();
+
         ListIterator<JmsInboundMessageDispatch> reverseIterator = 
redispatchList.listIterator(redispatchList.size());
         while (reverseIterator.hasPrevious()) {
             deliver(reverseIterator.previous());
@@ -481,7 +506,11 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             //        In the future once the JMS mapping is complete we should 
be
             //        able to convert everything to some message even if its 
just
             //        a bytes messages as a fall back.
-            settleDelivery(incoming, MODIFIED_FAILED_UNDELIVERABLE);
+            incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+            incoming.settle();
+            // TODO: this flows credit, which we might not want, e.g if
+            // a drain was issued to stop the link.
+            sendFlowIfNeeded();
             return false;
         }
 
@@ -552,19 +581,12 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
         return "AmqpConsumer { " + getResourceInfo().getId() + " }";
     }
 
-    protected void settleDelivery(Delivery incoming, DeliveryState state) {
-        incoming.disposition(state);
-        incoming.settle();
-        // TODO: this flows credit, which we might not want, e.g if
-        // a drain was issued to stop the link.
-        sendFlowIfNeeded();
-    }
-
     protected void deliver(JmsInboundMessageDispatch envelope) throws 
Exception {
         if (!deferredClose) {
             ProviderListener listener = 
session.getProvider().getProviderListener();
             if (listener != null) {
                 LOG.debug("Dispatching received message: {}", envelope);
+                dispatchedCount++;
                 listener.onInboundMessage(envelope);
             } else {
                 LOG.error("Provider listener is not set, message will be 
dropped: {}", envelope);
@@ -641,15 +663,13 @@ public class AmqpConsumer extends 
AmqpAbstractResource<JmsConsumerInfo, Receiver
             Delivery current = delivery;
             delivery = delivery.next();
 
-            if (!(current.getContext() instanceof JmsInboundMessageDispatch)) {
+            if (current.getContext() instanceof JmsInboundMessageDispatch) {
+                JmsInboundMessageDispatch envelope = 
(JmsInboundMessageDispatch) current.getContext();
+                if (!envelope.isDelivered()) {
+                    handleDisposition(envelope, current, 
Released.getInstance());
+                }
+            } else {
                 LOG.debug("{} Found incomplete delivery with no context during 
release processing", AmqpConsumer.this);
-                continue;
-            }
-
-            JmsInboundMessageDispatch envelope = (JmsInboundMessageDispatch) 
current.getContext();
-            if (!envelope.isDelivered()) {
-                current.disposition(Released.getInstance());
-                current.settle();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index 0a2b62d..d7bc544 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -58,12 +58,12 @@ public class JmsConsumerInfoTest {
 
     @Test(expected=IllegalArgumentException.class)
     public void testExceptionWhenCreatedWithNullConnectionId() {
-        new JmsConsumerInfo(null, null, null);
+        new JmsConsumerInfo(null, null);
     }
 
     @Test
     public void testCreateFromConsumerId() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertSame(firstId, info.getId());
         assertSame(firstId.getParentId(), info.getParentId());
         assertNotNull(info.toString());
@@ -71,7 +71,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testCopy() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
 
         info.setAcknowledgementMode(1);
         info.setBrowser(true);
@@ -108,7 +108,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsDurable() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isDurable());
         info.setDurable(true);
         assertTrue(info.isDurable());
@@ -116,7 +116,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsExplicitClientID() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isExplicitClientID());
         info.setExplicitClientID(true);
         assertTrue(info.isExplicitClientID());
@@ -124,7 +124,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsShared() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isShared());
         info.setShared(true);
         assertTrue(info.isShared());
@@ -134,7 +134,7 @@ public class JmsConsumerInfoTest {
     public void testGetSubscriptionName() {
         String subName = "name";
 
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertNull(info.getSubscriptionName());
         info.setSubscriptionName(subName);
         assertEquals(subName, info.getSubscriptionName());
@@ -142,8 +142,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testCompareTo() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
 
         assertEquals(-1, first.compareTo(second));
         assertEquals(0, first.compareTo(first));
@@ -152,8 +152,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testHashCode() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
 
         assertEquals(first.hashCode(), first.hashCode());
         assertEquals(second.hashCode(), second.hashCode());
@@ -164,8 +164,8 @@ public class JmsConsumerInfoTest {
     @SuppressWarnings("unlikely-arg-type")
     @Test
     public void testEqualsCode() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
 
         assertEquals(first, first);
         assertEquals(second, second);
@@ -179,7 +179,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testVisit() throws Exception {
-        final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
+        final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
 
         final AtomicBoolean visited = new AtomicBoolean();
 
@@ -197,7 +197,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testGetRedeliveryPolicyDefaults() {
-        final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
 
         assertNotNull(info.getRedeliveryPolicy());
         info.setRedeliveryPolicy(null);
@@ -207,7 +207,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsListener() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
         assertFalse(info.isListener());
         info.setListener(true);
         assertTrue(info.isListener());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
index 39f5895..00bf2f4 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
@@ -48,7 +48,7 @@ public class JmsDefaultResourceVisitorTest {
         JmsDefaultResourceVisitor visitor = new JmsDefaultResourceVisitor();
         visitor.processConnectionInfo(new JmsConnectionInfo(connectionId));
         visitor.processSessionInfo(new JmsSessionInfo(sessionId));
-        visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null, 
null));
+        visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null));
         visitor.processProducerInfo(new JmsProducerInfo(producerId));
         visitor.processDestination(new JmsTemporaryTopic("Test"));
         visitor.processTransactionInfo(new JmsTransactionInfo(sessionId, 
transactionId));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
index 755bd21..f60af09 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
@@ -45,7 +45,7 @@ public class AmqpSubscriptionTrackerTest {
         JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, 
consumerIdCounter.incrementAndGet());
         JmsTopic topic = new JmsTopic(topicName);
 
-        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null, 
null);
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null);
 
         consumerInfo.setSubscriptionName(subscriptionName);
         consumerInfo.setDestination(topic);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
index 858cc7f..472d756 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
@@ -84,7 +84,7 @@ public class AmqpCodecTest extends QpidJmsTestCase {
         JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, 1);
         mockConnection = Mockito.mock(AmqpConnection.class);
         mockConsumer = Mockito.mock(AmqpConsumer.class);
-        Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new 
JmsConsumerInfo(consumerId, null, null));
+        Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new 
JmsConsumerInfo(consumerId, null));
     }
 
     //----- AmqpHeader encode and decode 
-------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
index 1db2744..dede595 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
@@ -131,7 +131,7 @@ public class AmqpJmsMessageTypesTestCase extends 
QpidJmsTestCase {
         AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
         Mockito.when(consumer.getConnection()).thenReturn(connection);
         
Mockito.when(consumer.getDestination()).thenReturn(consumerDestination);
-        Mockito.when(consumer.getResourceInfo()).thenReturn(new 
JmsConsumerInfo(consumerId, null, null));
+        Mockito.when(consumer.getResourceInfo()).thenReturn(new 
JmsConsumerInfo(consumerId, null));
         return consumer;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/24d03437/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
index ebce5d7..19f6d3e 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
@@ -78,7 +78,7 @@ public class FailoverProviderTestSupport extends 
QpidJmsTestCase {
 
     protected JmsConsumerInfo createConsumerInfo(JmsSessionInfo session) {
         JmsConsumerId id = new JmsConsumerId(session.getId(), 
nextConsumerId.incrementAndGet());
-        JmsConsumerInfo consumer = new JmsConsumerInfo(id, null, null);
+        JmsConsumerInfo consumer = new JmsConsumerInfo(id, null);
         return consumer;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to