use AmqpSequence body to send StreamMessages
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2e050059 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2e050059 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2e050059 Branch: refs/heads/master Commit: 2e05005947c842d05291aadf06ce960a5bb8e7a4 Parents: 026d83a Author: Robert Gemmell <rob...@apache.org> Authored: Mon Oct 6 14:05:05 2014 +0100 Committer: Robert Gemmell <rob...@apache.org> Committed: Mon Oct 6 14:05:05 2014 +0100 ---------------------------------------------------------------------- .../message/AmqpJmsStreamMessageFacade.java | 4 +- .../StreamMessageIntegrationTest.java | 4 +- .../message/AmqpJmsStreamMessageFacadeTest.java | 12 ++++ .../types/EncodedAmqpSequenceMatcher.java | 59 ++++++++++++++++++++ .../matchers/types/EncodedAmqpValueMatcher.java | 1 + 5 files changed, 76 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e050059/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java index fae76e6..f0355bf 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java @@ -51,7 +51,7 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements */ public AmqpJmsStreamMessageFacade(AmqpConnection connection) { super(connection); - list = initializeEmptyBodyList(false); + list = initializeEmptyBodyList(true); setAnnotation(JMS_MSG_TYPE, JMS_STREAM_MESSAGE); } @@ -70,7 +70,7 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements Section body = getAmqpMessage().getBody(); if (body == null) { - list = initializeEmptyBodyList(false); + list = initializeEmptyBodyList(true); } else if (body instanceof AmqpValue) { Object value = ((AmqpValue) body).getValue(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e050059/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java index a9e860a..b337f8d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java @@ -44,7 +44,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSec import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; -import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpSequenceMatcher; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; @@ -200,7 +200,7 @@ public class StreamMessageIntegrationTest extends QpidJmsTestCase { messageMatcher.setHeadersMatcher(headersMatcher); messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); messageMatcher.setPropertiesMatcher(propertiesMatcher); - messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(list)); + messageMatcher.setMessageContentMatcher(new EncodedAmqpSequenceMatcher(list)); //send the message testPeer.expectTransfer(messageMatcher); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e050059/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacadeTest.java index fd823af..ec0e3bd 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacadeTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacadeTest.java @@ -43,6 +43,7 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.message.Message; import org.junit.Before; import org.junit.Test; @@ -75,6 +76,17 @@ public class AmqpJmsStreamMessageFacadeTest extends QpidJmsTestCase { assertEquals("unexpected value for message type annotation value", AmqpMessageSupport.JMS_STREAM_MESSAGE, annotationsMap.get(AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_MSG_TYPE))); } + @Test + public void testNewMessageToSendContainsAmqpSequenceBody() throws Exception { + AmqpJmsStreamMessageFacade amqpStreamMessageFacade = createNewStreamMessageFacade(); + + Message protonMessage = amqpStreamMessageFacade.getAmqpMessage(); + Section body = protonMessage.getBody(); + + assertNotNull("Body section was not present", body); + assertTrue("Body section was not of expected type: " + body.getClass(), body instanceof AmqpSequence); + } + @Test(expected = MessageEOFException.class) public void testPeekWithNewMessageToSendThrowsMEOFE() throws Exception { AmqpJmsStreamMessageFacade amqpStreamMessageFacade = createNewStreamMessageFacade(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e050059/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpSequenceMatcher.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpSequenceMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpSequenceMatcher.java new file mode 100644 index 0000000..f718f98 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpSequenceMatcher.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.test.testpeer.matchers.types; + +import java.util.List; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.hamcrest.Description; + +public class EncodedAmqpSequenceMatcher extends EncodedAmqpTypeMatcher +{ + private static final Symbol DESCRIPTOR_SYMBOL = Symbol.valueOf("amqp:amqp-sequence:list"); + private static final UnsignedLong DESCRIPTOR_CODE = UnsignedLong.valueOf(0x0000000000000076L); + + /** + * @param expectedValue the value that is expected to be IN the + * received {@link AmqpSequence} + */ + public EncodedAmqpSequenceMatcher(List<Object> expectedValue) + { + this(expectedValue,false); + } + + /** + * @param expectedValue the value that is expected to be IN the + * received {@link AmqpSequence} + * @param permitTrailingBytes if it is permitted for bytes to be left in the Binary after consuming the {@link AmqpSequence} + */ + public EncodedAmqpSequenceMatcher(Object expectedValue, boolean permitTrailingBytes) + { + super(DESCRIPTOR_SYMBOL, DESCRIPTOR_CODE, expectedValue, permitTrailingBytes); + } + + @Override + public void describeTo(Description description) + { + description + .appendText("a Binary encoding of an AmqpSequence that wraps: ") + .appendValue(getExpectedValue()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2e050059/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java index 93dcc36..26f9a3d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java @@ -20,6 +20,7 @@ package org.apache.qpid.jms.test.testpeer.matchers.types; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.hamcrest.Description; public class EncodedAmqpValueMatcher extends EncodedAmqpTypeMatcher --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org