add support for using drain to stop links
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0fdc8e54 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0fdc8e54 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0fdc8e54 Branch: refs/heads/master Commit: 0fdc8e549b344e894e9ff5b54e9d09b31560afb3 Parents: f994b35 Author: Robert Gemmell <[email protected]> Authored: Fri Dec 5 12:46:05 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Fri Dec 5 14:49:05 2014 +0000 ---------------------------------------------------------------------- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 61 +++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0fdc8e54/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index e252345..fc9ec43 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -82,6 +82,8 @@ import org.slf4j.LoggerFactory; // TODO should expectXXXYYYZZZ methods just be expect(matcher)? public class TestAmqpPeer implements AutoCloseable { + private static final int LINK_HANDLE_OFFSET = 100; + private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName()); private final TestAmqpPeerRunner _driverRunnable; @@ -98,7 +100,7 @@ public class TestAmqpPeer implements AutoCloseable */ private CountDownLatch _handlersCompletedLatch; - private volatile int _nextLinkHandle = 100; + private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET; private byte[] _deferredBytes; @@ -700,13 +702,68 @@ public class TestAmqpPeer implements AutoCloseable public void expectLinkFlow() { + expectLinkFlow(false); + } + + public void expectLinkFlow(boolean drain) + { + Matcher<Boolean> drainMatcher = null; + if(drain) + { + drainMatcher = equalTo(true); + } + else + { + drainMatcher = Matchers.anyOf(equalTo(false), nullValue()); + } + final FlowMatcher flowMatcher = new FlowMatcher() .withLinkCredit(Matchers.greaterThan(UnsignedInteger.ZERO)) - .withHandle(Matchers.notNullValue()); + .withHandle(Matchers.notNullValue()) + .withDrain(drainMatcher); + + if(drain) + { + final FlowFrame drainResponse = new FlowFrame(); + drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded + drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded + drainResponse.setLinkCredit(UnsignedInteger.ZERO); + drainResponse.setDrain(true); + + // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null); + flowResponseSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + flowResponseSender.setChannel(flowMatcher.getActualChannel()); + drainResponse.setHandle(calculateLinkHandle(flowMatcher)); + drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher)); + drainResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); // Assuming no 'in-flight' messages. + drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); + } + }); + + flowMatcher.onSuccess(flowResponseSender); + } addHandler(flowMatcher); } + private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) { + UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle(); + + return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET)); + } + + private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) { + UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount(); + UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit(); + + return dc.add(lc); + } + public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, final MessageAnnotationsDescribedType messageAnnotationsDescribedType, final PropertiesDescribedType propertiesDescribedType, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
