Implement get / set of JMS Destination from the AMQP message facade Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/eae45830 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/eae45830 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/eae45830
Branch: refs/heads/master Commit: eae45830c04e88823f6d3e38f1cc8ece891ae664 Parents: 8efad7c Author: Timothy Bish <[email protected]> Authored: Wed Sep 24 16:06:58 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Sep 24 16:06:58 2014 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 5 + .../amqp/message/AmqpDestinationHelper.java | 200 +++++++++++++++++++ .../amqp/message/AmqpJmsBytesMessageFacade.java | 10 +- .../amqp/message/AmqpJmsMapMessageFacade.java | 11 +- .../amqp/message/AmqpJmsMessageBuilder.java | 37 ++-- .../amqp/message/AmqpJmsMessageFacade.java | 38 ++-- .../message/AmqpJmsObjectMessageFacade.java | 15 +- .../message/AmqpJmsStreamMessageFacade.java | 11 +- .../amqp/message/AmqpJmsTextMessageFacade.java | 11 +- .../amqp/message/AmqpMessageSupport.java | 24 +++ .../amqp/message/AmqpJmsMessageFacadeTest.java | 22 +- 11 files changed, 315 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 28b989c..1f51b46 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -355,6 +355,11 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver protected void doClose() { } + + public AmqpConnection getConnection() { + return this.session.getConnection(); + } + public AmqpSession getSession() { return this.session; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java new file mode 100644 index 0000000..9d7445a --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.provider.amqp.message; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsQueue; +import org.apache.qpid.jms.JmsTemporaryQueue; +import org.apache.qpid.jms.JmsTemporaryTopic; +import org.apache.qpid.jms.JmsTopic; + +/** + * A set of static utility method useful when mapping JmsDestination types to / from the AMQP + * destination fields in a Message that's being sent or received. + */ +public class AmqpDestinationHelper { + + public static final AmqpDestinationHelper INSTANCE = new AmqpDestinationHelper(); + + public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type"; + public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type"; + + static final String QUEUE_ATTRIBUTE = "queue"; + static final String TOPIC_ATTRIBUTE = "topic"; + static final String TEMPORARY_ATTRIBUTE = "temporary"; + + public static final String QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE; + public static final String TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE; + public static final String TEMP_QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE; + public static final String TEMP_TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE; + + // TODO - The Type Annotation seems like it could just be a byte value + // TODO - How do we deal with the case where no type is present? + + /** + * Decode the provided To address, type description, and consumer destination + * information such that an appropriate Destination object can be returned. + * + * If an address and type description is provided then this will be used to + * create the Destination. If the type information is missing, it will be + * derived from the consumer destination if present, or default to a generic + * destination if not. + * + * If the address is null then the consumer destination is returned, unless + * the useConsumerDestForTypeOnly flag is true, in which case null will be + * returned. + */ + + public JmsDestination buildJmsDestination(AmqpJmsMessageFacade message, JmsDestination consumerDestination) { + String to = message.getAmqpMessage().getAddress(); + String toTypeString = (String) message.getAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME); + Set<String> typeSet = null; + + if (toTypeString != null) { + typeSet = splitAttributes(toTypeString); + } + + return createDestination(to, typeSet, consumerDestination, false); + } + + public JmsDestination buildJmsReplyTo(AmqpJmsMessageFacade message, JmsDestination consumerDestination) { + String replyTo = message.getAmqpMessage().getReplyTo(); + String replyToTypeString = (String) message.getAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME); + Set<String> typeSet = null; + + if (replyToTypeString != null) { + typeSet = splitAttributes(replyToTypeString); + } + + return createDestination(replyTo, typeSet, consumerDestination, true); + } + + private JmsDestination createDestination(String address, Set<String> typeSet, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) { + if (address == null) { + return useConsumerDestForTypeOnly ? null : consumerDestination; + } + + if (typeSet != null && !typeSet.isEmpty()) { + if (typeSet.contains(QUEUE_ATTRIBUTE)) { + if (typeSet.contains(TEMPORARY_ATTRIBUTE)) { + return new JmsTemporaryQueue(address); + } else { + return new JmsQueue(address); + } + } else if (typeSet.contains(TOPIC_ATTRIBUTE)) { + if (typeSet.contains(TEMPORARY_ATTRIBUTE)) { + return new JmsTemporaryTopic(address); + } else { + return new JmsTopic(address); + } + } + } + + if (consumerDestination.isQueue()) { + if (consumerDestination.isTemporary()) { + return new JmsTemporaryQueue(address); + } else { + return new JmsQueue(address); + } + } else if (consumerDestination.isTopic()) { + if (consumerDestination.isTemporary()) { + return new JmsTemporaryTopic(address); + } else { + return new JmsTopic(address); + } + } + + // fall back to a straight Destination + // TODO - We don't have a non-abstract destination to create right now + // and JMS doesn't really define a true non Topic / Queue destination + // so how this would be handled elsewhere seems a mystery. + return null; + } + + public void setToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) { + String address = destination.getName(); + String typeString = toTypeAnnotation(destination); + + message.getAmqpMessage().setAddress(address); + + if (address == null || typeString == null) { + message.removeAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME); + } else { + message.setAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString); + } + } + + public void setReplyToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) { + String replyToAddress = destination.getName(); + String typeString = toTypeAnnotation(destination); + + message.getAmqpMessage().setReplyTo(replyToAddress); + + if (replyToAddress == null || typeString == null) { + message.removeAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME); + } else { + message.setAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString); + } + } + + /** + * @return the annotation type string, or null if the supplied destination + * is null or can't be classified + */ + private String toTypeAnnotation(JmsDestination destination) { + if (destination == null) { + return null; + } + + if (destination.isQueue()) { + if (destination.isTemporary()) { + return TEMP_QUEUE_ATTRIBUTES_STRING; + } else { + return QUEUE_ATTRIBUTES_STRING; + } + } else if (destination.isTopic()) { + if (destination.isTemporary()) { + return TEMP_TOPIC_ATTRIBUTES_STRING; + } else { + return TOPIC_ATTRIBUTES_STRING; + } + } + + return null; + } + + public Set<String> splitAttributes(String typeString) { + if (typeString == null) { + return null; + } + + HashSet<String> typeSet = new HashSet<String>(); + + // Split string on commas and their surrounding whitespace + for (String attr : typeString.split("\\s*,\\s*")) { + // ignore empty values + if (!attr.equals("")) { + typeSet.add(attr); + } + } + + return typeSet; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java index 1cefc2a..664d414 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java @@ -21,6 +21,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_M import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; @@ -42,6 +43,7 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J * Creates a new facade instance * * @param connection + * the AmqpConnection that under which this facade was created. */ public AmqpJmsBytesMessageFacade(AmqpConnection connection) { super(connection); @@ -52,13 +54,13 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J * Creates a new Facade around an incoming AMQP Message for dispatch to the * JMS Consumer instance. * - * @param connection - * the connection that created this Facade. + * @param consumer + * the consumer that received this message. * @param message * the incoming Message instance that is being wrapped. */ - public AmqpJmsBytesMessageFacade(AmqpConnection connection, Message message) { - super(connection, message); + public AmqpJmsBytesMessageFacade(AmqpConsumer consumer, Message message) { + super(consumer, message); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java index 511d1e3..fdda9e4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.qpid.jms.message.facade.JmsMapMessageFacade; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Section; @@ -44,7 +45,7 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms * Create a new facade ready for sending. * * @param connection - * the connection instance that created this facade. + * the AmqpConnection that under which this facade was created. */ public AmqpJmsMapMessageFacade(AmqpConnection connection) { super(connection); @@ -56,14 +57,14 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms * Creates a new Facade around an incoming AMQP Message for dispatch to the * JMS Consumer instance. * - * @param connection - * the connection that created this Facade. + * @param consumer + * the consumer that received this message. * @param message * the incoming Message instance that is being wrapped. */ @SuppressWarnings("unchecked") - public AmqpJmsMapMessageFacade(AmqpConnection connection, Message message) { - super(connection, message); + public AmqpJmsMapMessageFacade(AmqpConsumer consumer, Message message) { + super(consumer, message); Section body = getAmqpMessage().getBody(); if (body == null) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java index c31c50b..a09cee2 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java @@ -25,7 +25,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_S import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE; import java.io.IOException; -import java.util.Map; import org.apache.qpid.jms.message.JmsBytesMessage; import org.apache.qpid.jms.message.JmsMapMessage; @@ -33,8 +32,7 @@ import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.message.JmsObjectMessage; import org.apache.qpid.jms.message.JmsStreamMessage; import org.apache.qpid.jms.message.JmsTextMessage; -import org.apache.qpid.jms.provider.amqp.AmqpConnection; -import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.message.Message; /** @@ -50,8 +48,8 @@ public class AmqpJmsMessageBuilder { * Create a new JmsMessage and underlying JmsMessageFacade that represents the proper * message type for the incoming AMQP message. * - * @param connection - * The provider AMQP Connection instance where this message arrived at. + * @param consumer + * The provider AMQP Consumer instance where this message arrived at. * @param message * The Proton Message object that will be wrapped. * @@ -59,10 +57,10 @@ public class AmqpJmsMessageBuilder { * * @throws IOException if an error occurs while creating the message objects. */ - public static JmsMessage createJmsMessage(AmqpConnection connection, Message message) throws IOException { + public static JmsMessage createJmsMessage(AmqpConsumer consumer, Message message) throws IOException { // First we try the easy way, if the annotation is there we don't have to work hard. - JmsMessage result = createFromMsgAnnotation(connection, message); + JmsMessage result = createFromMsgAnnotation(consumer, message); if (result != null) { return result; } @@ -71,23 +69,23 @@ public class AmqpJmsMessageBuilder { throw new IOException("Could not create a JMS message from incoming message"); } - private static JmsMessage createFromMsgAnnotation(AmqpConnection connection, Message message) throws IOException { - Object annotation = getMessageAnnotation(JMS_MSG_TYPE, message); + private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message) throws IOException { + Object annotation = AmqpMessageSupport.getMessageAnnotation(JMS_MSG_TYPE, message); if (annotation != null) { switch ((byte) annotation) { case JMS_MESSAGE: - return new JmsMessage(new AmqpJmsMessageFacade(connection, message)); + return new JmsMessage(new AmqpJmsMessageFacade(consumer, message)); case JMS_BYTES_MESSAGE: - return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection, message)); + return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(consumer, message)); case JMS_TEXT_MESSAGE: - return new JmsTextMessage(new AmqpJmsTextMessageFacade(connection, message)); + return new JmsTextMessage(new AmqpJmsTextMessageFacade(consumer, message)); case JMS_MAP_MESSAGE: - return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection, message)); + return new JmsMapMessage(new AmqpJmsMapMessageFacade(consumer, message)); case JMS_STREAM_MESSAGE: - return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection, message)); + return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(consumer, message)); case JMS_OBJECT_MESSAGE: - return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(connection, message)); + return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message)); default: throw new IOException("Invalid JMS Message Type annotation found in message"); } @@ -95,13 +93,4 @@ public class AmqpJmsMessageBuilder { return null; } - - private static Object getMessageAnnotation(String key, Message message) { - if (message.getMessageAnnotations() != null) { - Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); - return annotations.get(AmqpMessageSupport.getSymbol(key)); - } - - return null; - } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java index 4937000..acd3012 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -37,6 +37,7 @@ import org.apache.qpid.jms.exceptions.IdConversionException; import org.apache.qpid.jms.message.facade.JmsMessageFacade; import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; @@ -74,6 +75,9 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { /** * Create a new AMQP Message Facade with an empty message instance. + * + * @param connection + * the AmqpConnection that under which this facade was created. */ public AmqpJmsMessageFacade(AmqpConnection connection) { this.message = Proton.message(); @@ -87,15 +91,15 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { * Creates a new Facade around an incoming AMQP Message for dispatch to the * JMS Consumer instance. * - * @param connection - * the connection that created this Facade. + * @param consumer + * the consumer that received this message. * @param message * the incoming Message instance that is being wrapped. */ @SuppressWarnings("unchecked") - public AmqpJmsMessageFacade(AmqpConnection connection, Message message) { + public AmqpJmsMessageFacade(AmqpConsumer consumer, Message message) { this.message = message; - this.connection = connection; + this.connection = consumer.getConnection(); annotations = message.getMessageAnnotations(); if (annotations != null) { @@ -112,8 +116,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { syntheticTTL = System.currentTimeMillis() + ttl; } - // TODO - Set destination - // TODO - Set replyTo + this.destination = AmqpDestinationHelper.INSTANCE.buildJmsDestination(this, consumer.getDestination()); + this.replyTo = AmqpDestinationHelper.INSTANCE.buildJmsReplyTo(this, consumer.getDestination()); } /** @@ -223,7 +227,6 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public void clearProperties() { - clearProperties(); //_propJMS_AMQP_TTL = null; message.setReplyToGroupId(null); message.setUserId(null); @@ -235,7 +238,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public JmsMessageFacade copy() throws JMSException { - AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection, message); + AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection); copyInto(copy); return copy; } @@ -250,12 +253,12 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE; String baseStringId = helper.toBaseMessageIdString(underlying); - //Ensure the ID: prefix is present. - //TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix. - //TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities? - // I Ended up putting it in JmsMessage after the above comment, as a workaround for the current JmsDefaultMessageFacade usage. - if(baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) - { + // Ensure the ID: prefix is present. + // TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix. + // TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities? + // I Ended up putting it in JmsMessage after the above comment, as a workaround for the + // current JmsDefaultMessageFacade usage. + if (baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) { baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId; } return new JmsMessageId(baseStringId); @@ -514,8 +517,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public void setDestination(JmsDestination destination) { this.destination = destination; - - // TODO + lazyCreateAnnotations(); + AmqpDestinationHelper.INSTANCE.setToAddressFromDestination(this, destination); } @Override @@ -526,7 +529,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { @Override public void setReplyTo(JmsDestination replyTo) { this.replyTo = replyTo; - // TODO Auto-generated method stub + lazyCreateAnnotations(); + AmqpDestinationHelper.INSTANCE.setReplyToAddressFromDestination(this, replyTo); } public void setReplyToGroupId(String replyToGroupId) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java index 3696653..1668a6c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java @@ -27,6 +27,7 @@ import javax.jms.JMSException; import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.message.Message; /** @@ -38,7 +39,10 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements private AmqpObjectTypeDelegate delegate; /** + * Creates a new facade instance + * * @param connection + * the AmqpConnection that under which this facade was created. */ public AmqpJmsObjectMessageFacade(AmqpConnection connection) { super(connection); @@ -49,11 +53,16 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements } /** - * @param connection + * Creates a new Facade around an incoming AMQP Message for dispatch to the + * JMS Consumer instance. + * + * @param consumer + * the consumer that received this message. * @param message + * the incoming Message instance that is being wrapped. */ - public AmqpJmsObjectMessageFacade(AmqpConnection connection, Message message) { - super(connection, message); + public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message) { + super(consumer, message); // TODO detect the content type and init the proper delegate. initDelegate(false); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java index 0999225..90cd97e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java @@ -27,6 +27,7 @@ import javax.jms.MessageEOFException; import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Section; @@ -45,7 +46,7 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements * Create a new facade ready for sending. * * @param connection - * the connection instance that created this facade. + * the AmqpConnection that under which this facade was created. */ public AmqpJmsStreamMessageFacade(AmqpConnection connection) { super(connection); @@ -57,14 +58,14 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements * Creates a new Facade around an incoming AMQP Message for dispatch to the * JMS Consumer instance. * - * @param connection - * the connection that created this Facade. + * @param consumer + * the consumer that received this message. * @param message * the incoming Message instance that is being wrapped. */ @SuppressWarnings("unchecked") - public AmqpJmsStreamMessageFacade(AmqpConnection connection, Message message) { - super(connection, message); + public AmqpJmsStreamMessageFacade(AmqpConsumer consumer, Message message) { + super(consumer, message); Section body = getAmqpMessage().getBody(); if (body == null) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java index 6c2421b..c2d6fc2 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java @@ -30,6 +30,7 @@ import javax.jms.JMSException; import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.facade.JmsTextMessageFacade; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Data; @@ -56,7 +57,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm * Create a new AMQP Message facade ready for sending. * * @param connection - * The AMQP Connection that created this message. + * the AmqpConnection that under which this facade was created. */ public AmqpJmsTextMessageFacade(AmqpConnection connection) { super(connection); @@ -68,13 +69,13 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm * Creates a new Facade around an incoming AMQP Message for dispatch to the * JMS Consumer instance. * - * @param connection - * the connection that created this Facade. + * @param consumer + * the consumer that received this message. * @param message * the incoming Message instance that is being wrapped. */ - public AmqpJmsTextMessageFacade(AmqpConnection connection, Message message) { - super(connection, message); + public AmqpJmsTextMessageFacade(AmqpConsumer consumer, Message message) { + super(consumer, message); } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java index a01d415..563352b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms.provider.amqp.message; +import java.util.Map; + import javax.jms.Destination; import javax.jms.Queue; import javax.jms.TemporaryQueue; @@ -23,6 +25,7 @@ import javax.jms.TemporaryTopic; import javax.jms.Topic; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.message.Message; /** * Support class containing constant values and static methods that are @@ -166,4 +169,25 @@ public final class AmqpMessageSupport { return null; } + + /** + * Safe way to access message annotations which will check internal structure and + * either return the annotation if it exists or null if the annotation or any annotations + * are present. + * + * @param key + * the String key to use to lookup an annotation. + * @param message + * the AMQP message object that is being examined. + * + * @return the given annotation value or null if not present in the message. + */ + public static Object getMessageAnnotation(String key, Message message) { + if (message != null && message.getMessageAnnotations() != null) { + Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue(); + return annotations.get(AmqpMessageSupport.getSymbol(key)); + } + + return null; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java index a276c7f..af68cf3 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java @@ -20,7 +20,10 @@ */ package org.apache.qpid.jms.provider.amqp.message; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.HashMap; @@ -29,6 +32,7 @@ import java.util.UUID; import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.provider.amqp.AmqpConnection; +import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -36,7 +40,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.message.Message; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; @@ -46,8 +49,15 @@ public class AmqpJmsMessageFacadeTest { return new AmqpJmsMessageFacade(createMockAmqpConnection()); } - private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConnection amqpConnection, Message message) { - return new AmqpJmsMessageFacade(amqpConnection, message); + private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConsumer amqpConsumer, Message message) { + return new AmqpJmsMessageFacade(amqpConsumer, message); + } + + + private AmqpConsumer createMockAmqpConsumer() { + AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class); + Mockito.when(consumer.getConnection()).thenReturn(createMockAmqpConnection()); + return consumer; } private AmqpConnection createMockAmqpConnection() { @@ -225,7 +235,7 @@ public class AmqpJmsMessageFacadeTest { expected = AmqpMessageIdHelper.JMS_ID_PREFIX + expected; } - AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message); + AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message); assertNotNull("Expected a correlationId on received message", amqpMessageFacade.getCorrelationId()); @@ -305,7 +315,7 @@ public class AmqpJmsMessageFacadeTest { props.setMessageId(underlyingIdObject); message.setProperties(props); - AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message); + AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message); assertNotNull("Expected a messageId on received message", amqpMessageFacade.getMessageId()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
