add support for using drain to stop links

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fdc8e54
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fdc8e54
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fdc8e54

Branch: refs/heads/master
Commit: 0fdc8e549b344e894e9ff5b54e9d09b31560afb3
Parents: f994b35
Author: Robert Gemmell <[email protected]>
Authored: Fri Dec 5 12:46:05 2014 +0000
Committer: Robert Gemmell <[email protected]>
Committed: Fri Dec 5 14:49:05 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 61 +++++++++++++++++++-
 1 file changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fdc8e54/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index e252345..fc9ec43 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -82,6 +82,8 @@ import org.slf4j.LoggerFactory;
 // TODO should expectXXXYYYZZZ methods just be expect(matcher)?
 public class TestAmqpPeer implements AutoCloseable
 {
+    private static final int LINK_HANDLE_OFFSET = 100;
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TestAmqpPeer.class.getName());
 
     private final TestAmqpPeerRunner _driverRunnable;
@@ -98,7 +100,7 @@ public class TestAmqpPeer implements AutoCloseable
      */
     private CountDownLatch _handlersCompletedLatch;
 
-    private volatile int _nextLinkHandle = 100;
+    private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET;
 
     private byte[] _deferredBytes;
 
@@ -700,13 +702,68 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectLinkFlow()
     {
+        expectLinkFlow(false);
+    }
+
+    public void expectLinkFlow(boolean drain)
+    {
+        Matcher<Boolean> drainMatcher = null;
+        if(drain)
+        {
+            drainMatcher = equalTo(true);
+        }
+        else
+        {
+            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+        }
+
         final FlowMatcher flowMatcher = new FlowMatcher()
                         
.withLinkCredit(Matchers.greaterThan(UnsignedInteger.ZERO))
-                        .withHandle(Matchers.notNullValue());
+                        .withHandle(Matchers.notNullValue())
+                        .withDrain(drainMatcher);
+
+        if(drain)
+        {
+            final FlowFrame drainResponse = new FlowFrame();
+            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: 
shouldnt be hard coded
+            
drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); 
//TODO: shouldnt be hard coded
+            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+            drainResponse.setDrain(true);
+
+            // The flow frame channel will be dynamically set based on the 
incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowResponseSender = new FrameSender(this, 
FrameType.AMQP, -1, drainResponse, null);
+            flowResponseSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    
flowResponseSender.setChannel(flowMatcher.getActualChannel());
+                    drainResponse.setHandle(calculateLinkHandle(flowMatcher));
+                    
drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+                    
drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // 
Assuming no 'in-flight' messages.
+                    
drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+                }
+            });
+
+            flowMatcher.onSuccess(flowResponseSender);
+        }
 
         addHandler(flowMatcher);
     }
 
+    private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) 
{
+        UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
+
+        return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
+    }
+
+    private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) 
{
+        UnsignedInteger dc = (UnsignedInteger) 
flowMatcher.getReceivedDeliveryCount();
+        UnsignedInteger lc = (UnsignedInteger) 
flowMatcher.getReceivedLinkCredit();
+
+        return dc.add(lc);
+    }
+
     public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType 
headerDescribedType,
                                                  final 
MessageAnnotationsDescribedType messageAnnotationsDescribedType,
                                                  final PropertiesDescribedType 
propertiesDescribedType,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to