Repository: qpid-broker-j Updated Branches: refs/heads/master f218a1dd8 -> a94d6ffe8
QPID-7434: Add missing properties conversion for message conversion layer from 1.0 to 0-10 and add unit tests * also fix some 1.0 to 0-8 conversion issues Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a94d6ffe Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a94d6ffe Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a94d6ffe Branch: refs/heads/master Commit: a94d6ffe8e3afee7cf5e4db5ef061fbf432f8524 Parents: f218a1d Author: Lorenz Quack <lqu...@apache.org> Authored: Mon Jul 31 16:58:00 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Mon Jul 31 16:58:00 2017 +0100 ---------------------------------------------------------------------- .../v1_0/MessageConverter_from_1_0.java | 125 +++ .../MessageConverter_1_0_to_v0_10.java | 344 ++++++- .../PropertyConverter_1_0_to_0_10Test.java | 956 +++++++++++++++++++ .../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 195 ++-- .../PropertyConverter_1_0_to_0_8Test.java | 6 +- 5 files changed, 1450 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a94d6ffe/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index a7b7a0a..cf35014 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -49,6 +49,10 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; +import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; public class MessageConverter_from_1_0 @@ -223,4 +227,125 @@ public class MessageConverter_from_1_0 } return result; } + + public static UnsignedInteger getGroupSequence(final Message_1_0 serverMsg) + { + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + return properties.getGroupSequence(); + } + } + return null; + } + + public static String getGroupId(final Message_1_0 serverMsg) + { + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + return properties.getGroupId(); + } + } + return null; + } + + public static Date getCreationTime(final Message_1_0 serverMsg) + { + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + return properties.getCreationTime(); + } + } + return null; + } + + public static Date getAbsoluteExpiryTime(final Message_1_0 serverMsg) + { + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + return properties.getAbsoluteExpiryTime(); + } + } + return null; + } + + public static Long getTtl(final Message_1_0 serverMsg) + { + HeaderSection headerSection = serverMsg.getHeaderSection(); + if (headerSection != null) + { + Header header = headerSection.getValue(); + if (header != null) + { + UnsignedInteger ttl = header.getTtl(); + if (ttl != null) + { + return ttl.longValue(); + } + } + } + return null; + } + + public static Binary getUserId(final Message_1_0 serverMsg) + { + Binary userId = null; + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + userId = properties.getUserId(); + } + } + return userId; + } + + public static Object getCorrelationId(final Message_1_0 serverMsg) + { + Object correlationIdObject = null; + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + correlationIdObject = properties.getCorrelationId(); + + } + } + return correlationIdObject; + } + + public static Object getMessageId(final Message_1_0 serverMsg) + { + Object messageId = null; + final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); + if (propertiesSection != null) + { + final Properties properties = propertiesSection.getValue(); + if (properties != null) + { + messageId = properties.getMessageId(); + + } + } + return messageId; + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a94d6ffe/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java index 851374a..3d67655 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -20,33 +20,56 @@ */ package org.apache.qpid.server.protocol.converter.v0_10_v1_0; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertBodyToObject; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertValue; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getAbsoluteExpiryTime; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getCorrelationId; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getCreationTime; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getGroupId; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getGroupSequence; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getMessageId; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getTtl; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getUserId; + +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter; +import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry; +import org.apache.qpid.server.message.mimecontentconverter.ObjectToMimeContentConverter; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.PluggableService; +import org.apache.qpid.server.protocol.converter.MessageConversionException; import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; -import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; -import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; -import org.apache.qpid.server.protocol.v1_0.Message_1_0; -import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; import org.apache.qpid.server.protocol.v0_10.transport.Header; +import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode; import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryPriority; import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; import org.apache.qpid.server.protocol.v0_10.transport.ReplyTo; +import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.store.StoredMessage; @PluggableService public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1_0, MessageTransferMessage> { + private static final int MAX_STR8_LENGTH = 0xFF; + private static final int MAX_VBIN16_LENGTH = 0xFFFF; @Override public Class<Message_1_0> getInputClass() @@ -72,10 +95,16 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 } + @Override + public String getType() + { + return "v1-0 to v0-10"; + } + private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace) { - Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); + Object bodyObject = convertBodyToObject(serverMsg); final ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(bodyObject); final byte[] messageContent = converter == null ? new byte[] {} : converter.toMimeContent(bodyObject); @@ -153,66 +182,311 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1 DeliveryProperties deliveryProps = new DeliveryProperties(); MessageProperties messageProps = new MessageProperties(); - final MessageMetaData_1_0.MessageHeader_1_0 origHeader = serverMsg.getMessageHeader(); + final MessageMetaData_1_0.MessageHeader_1_0 origHeader = serverMsg.getMessageHeader(); + + setExchangeAndRoutingKeyOnDeliveryProperties(deliveryProps, origHeader, addressSpace); + deliveryProps.setDeliveryMode(serverMsg.isPersistent() + ? MessageDeliveryMode.PERSISTENT + : MessageDeliveryMode.NON_PERSISTENT); deliveryProps.setExpiration(serverMsg.getExpiration()); deliveryProps.setPriority(MessageDeliveryPriority.get(origHeader.getPriority())); - deliveryProps.setRoutingKey(serverMsg.getInitialRoutingAddress()); - deliveryProps.setTimestamp(origHeader.getTimestamp()); - messageProps.setContentEncoding(origHeader.getEncoding()); + Date creationTime = getCreationTime(serverMsg); + final long arrivalTime = serverMsg.getArrivalTime(); + if (creationTime != null) + { + deliveryProps.setTimestamp(creationTime.getTime()); + } + else + { + deliveryProps.setTimestamp(arrivalTime); + } + + Long ttl = getTtl(serverMsg); + Date absoluteExpiryTime = getAbsoluteExpiryTime(serverMsg); + if (ttl != null) + { + deliveryProps.setTtl(ttl); + deliveryProps.setExpiration(arrivalTime + ttl); + } + else if (absoluteExpiryTime != null) + { + final long time = absoluteExpiryTime.getTime(); + deliveryProps.setTtl(Math.max(0, time - arrivalTime)); + deliveryProps.setExpiration(time); + } + + UUID messageId = getMessageIdAsUUID(serverMsg); + if (messageId != null) + { + messageProps.setMessageId(messageId); + } + + byte[] correlationId = getCorrelationIdAsBytes(serverMsg); + if (correlationId != null) + { + messageProps.setCorrelationId(correlationId); + } + + final String encoding = origHeader.getEncoding(); + if (encoding != null) + { + messageProps.setContentEncoding(ensureStr8("content-encoding", encoding)); + } + messageProps.setContentLength(size); messageProps.setContentType(bodyMimeType); - if(origHeader.getCorrelationId() != null) + byte[] userId = getUserIdAsBytes(serverMsg); + if (userId != null && userId.length <= MAX_VBIN16_LENGTH) { - messageProps.setCorrelationId(origHeader.getCorrelationId().getBytes()); + messageProps.setUserId(ensureVBin16("user-id", userId)); } + final String origReplyTo = origHeader.getReplyTo(); - if(origReplyTo != null && !origReplyTo.equals("")) + if (origReplyTo != null && !origReplyTo.equals("")) { - ReplyTo replyTo; - if(origReplyTo.startsWith("/")) + messageProps.setReplyTo(getReplyTo(addressSpace, origReplyTo)); + } + + Map<String, Object> appHeaders = getApplicationHeaders(serverMsg); + + messageProps.setApplicationHeaders(appHeaders); + + Header header = new Header(deliveryProps, messageProps, null); + return new MessageMetaData_0_10(header, size, arrivalTime); + } + + private Map<String, Object> getApplicationHeaders(final Message_1_0 serverMsg) + { + final MessageMetaData_1_0.MessageHeader_1_0 origHeader = serverMsg.getMessageHeader(); + final Map<String, Object> applicationProperties = serverMsg.getMessageHeader().getHeadersAsMap(); + for(String key: applicationProperties.keySet()) + { + ensureStr8("application-properties[\"" + key + "\"]", key); + } + + Map<String, Object> appHeaders = + new LinkedHashMap((Map<String, Object>) convertValue(applicationProperties)); + if (origHeader.getSubject() != null) + { + if (!appHeaders.containsKey("qpid.subject")) { - replyTo = new ReplyTo("",origReplyTo); + appHeaders.put("qpid.subject", origHeader.getSubject()); } - else if(origReplyTo.contains("/")) + + if (!appHeaders.containsKey("x-jms-type")) + { + appHeaders.put("x-jms-type", origHeader.getSubject()); + } + } + + String groupId = getGroupId(serverMsg); + if (groupId != null && !appHeaders.containsKey("JMSXGroupID")) + { + appHeaders.put("JMSXGroupID", groupId); + } + + UnsignedInteger groupSequence = getGroupSequence(serverMsg); + if (groupSequence != null && !appHeaders.containsKey("JMSXGroupSeq")) + { + appHeaders.put("JMSXGroupSeq", groupSequence.intValue()); + } + + return appHeaders; + } + + private ReplyTo getReplyTo(final NamedAddressSpace addressSpace, final String origReplyTo) + { + String exchange, routingKey; + if (origReplyTo.startsWith("/")) + { + exchange = ""; + routingKey = origReplyTo; + } + else if (origReplyTo.contains("/")) + { + String[] parts = origReplyTo.split("/", 2); + exchange = parts[0]; + routingKey = parts[1]; + } + else if (addressSpace.getAttainedMessageDestination(origReplyTo) instanceof Exchange) + { + exchange = origReplyTo; + routingKey = ""; + } + else + { + exchange = ""; + routingKey = origReplyTo; + } + return new ReplyTo(ensureStr8("reply-to[\"exchange\"]", exchange), + ensureStr8("reply-to[\"routing-key\"]", routingKey)); + } + + private void setExchangeAndRoutingKeyOnDeliveryProperties(final DeliveryProperties deliveryProps, + final MessageMetaData_1_0.MessageHeader_1_0 origHeader, + final NamedAddressSpace addressSpace) + { + final String to = origHeader.getTo(); + final String subject = origHeader.getSubject() == null ? "" : origHeader.getSubject(); + + final String exchangeName; + final String routingKey; + + if (to != null) + { + if (to.startsWith("/")) { - String[] parts = origReplyTo.split("/",2); - replyTo = new ReplyTo(parts[0],parts[1]); + //TODO: get local address from global + throw new MessageConversionException("Could not convert message from 1.0 to 0-10 because conversion of 'to' failed. Global addresses cannot be converted."); } - else if(addressSpace.getAttainedMessageDestination(origReplyTo) instanceof Exchange) + + int separatorPosition = to.indexOf('/'); + if (separatorPosition != -1) { - replyTo = new ReplyTo(origReplyTo,""); + exchangeName = to.substring(0, separatorPosition); + routingKey = to.substring(separatorPosition + 1); } else { - replyTo = new ReplyTo("",origReplyTo); + MessageDestination destination = addressSpace.getAttainedMessageDestination(to); + if (destination instanceof Queue) + { + exchangeName = ""; + routingKey = to; + } + else + { + exchangeName = to; + routingKey = subject; + } } - messageProps.setReplyTo(replyTo); } + else + { + exchangeName = ""; + routingKey = subject; + } + deliveryProps.setRoutingKey(ensureStr8("to' or 'subject", routingKey)); + deliveryProps.setExchange(ensureStr8("to", exchangeName)); + } - Map<String, Object> appHeaders = - (Map<String, Object>) MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader() - .getHeadersAsMap()); - if(origHeader.getSubject() != null && !appHeaders.containsKey("qpid.subject")) + private byte[] getUserIdAsBytes(final Message_1_0 serverMsg) + { + Binary userId = getUserId(serverMsg); + if (userId != null) { - appHeaders = new LinkedHashMap<>(appHeaders); - appHeaders.put("qpid.subject", origHeader.getSubject()); + return userId.getArray(); } - messageProps.setApplicationHeaders(appHeaders); + return null; + } - Header header = new Header(deliveryProps, messageProps, null); - return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); + private UUID getMessageIdAsUUID(final Message_1_0 serverMsg) + { + Object messageId = getMessageId(serverMsg); + if (messageId == null) + { + return null; + } + else if (messageId instanceof UUID) + { + return (UUID)messageId; + } + else if (messageId instanceof String) + { + try + { + return UUID.fromString(((String) messageId)); + } + catch (IllegalArgumentException e) + { + return UUID.nameUUIDFromBytes(((String) messageId).getBytes(UTF_8)); + } + } + else if (messageId instanceof Binary) + { + return UUID.nameUUIDFromBytes(((Binary) messageId).getArray()); + } + else if (messageId instanceof byte[]) + { + return UUID.nameUUIDFromBytes((byte[]) messageId); + } + else if (messageId instanceof UnsignedLong) + { + return UUID.nameUUIDFromBytes(longToBytes(((UnsignedLong) messageId).longValue())); + } + else + { + return UUID.nameUUIDFromBytes(String.valueOf(messageId).getBytes(UTF_8)); + } } + private byte[] getCorrelationIdAsBytes(final Message_1_0 serverMsg) + { + final Object correlationIdObject = getCorrelationId(serverMsg); + final byte[] correlationId; + if (correlationIdObject == null) + { + correlationId = null; + } + else if (correlationIdObject instanceof Binary) + { + correlationId = ((Binary) correlationIdObject).getArray(); + } + else if (correlationIdObject instanceof byte[]) + { + correlationId = (byte[]) correlationIdObject; + } + else if (correlationIdObject instanceof UUID) + { + UUID uuid = (UUID)correlationIdObject; + correlationId = longToBytes(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + } + else if (correlationIdObject instanceof UnsignedLong) + { + correlationId = longToBytes(((UnsignedLong) correlationIdObject).longValue()); + } + else + { + correlationId = String.valueOf(correlationIdObject).getBytes(UTF_8); + } + return ensureVBin16("correlation-id", correlationId); + } + private String ensureStr8(final String propertyName, String string) + { + if (string != null && string.length() > MAX_STR8_LENGTH) + { + throw new MessageConversionException(String.format( + "Could not convert message from 1.0 to 0-10 because conversion of '%s' failed." + + " The string length exceeds allowed maximum.", + propertyName)); + } + return string; + } - @Override - public String getType() + private byte[] ensureVBin16(final String propertyName, final byte[] result) { - return "v1-0 to v0-10"; + if (result != null && result.length > MAX_VBIN16_LENGTH) + { + throw new MessageConversionException(String.format( + "Could not convert message from 1.0 to 0-10 because conversion of '%s' failed." + + " The array length exceeds allowed maximum.", + propertyName)); + } + return result; } + private byte[] longToBytes(long... x) + { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * x.length); + for (long l : x) + { + buffer.putLong(l); + } + return buffer.array(); + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a94d6ffe/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java new file mode 100644 index 0000000..a2515c2 --- /dev/null +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/PropertyConverter_1_0_to_0_10Test.java @@ -0,0 +1,956 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.converter.v0_10_v1_0; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.converter.MessageConversionException; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties; +import org.apache.qpid.server.protocol.v0_10.transport.MessageDeliveryMode; +import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties; +import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.test.utils.QpidTestCase; + +public class PropertyConverter_1_0_to_0_10Test extends QpidTestCase +{ + private NamedAddressSpace _namedAddressSpace; + private MessageConverter_1_0_to_v0_10 _messageConverter; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _namedAddressSpace = mock(NamedAddressSpace.class); + _messageConverter = new MessageConverter_1_0_to_v0_10(); + } + + public void testContentEncodingConversion() + { + + String contentEncoding = "my-test-encoding"; + final Properties properties = new Properties(); + properties.setContentEncoding(Symbol.valueOf(contentEncoding)); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected content encoding", contentEncoding, messageProperties.getContentEncoding()); + } + + public void testContentEncodingConversionWhenLengthExceeds255() + { + String contentEncoding = generateLongString(); + final Properties properties = new Properties(); + properties.setContentEncoding(Symbol.valueOf(contentEncoding)); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("expected exception not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testApplicationPropertiesConversion() + { + Map<String, Object> properties = new HashMap<>(); + properties.put("testProperty1", "testProperty1Value"); + properties.put("intProperty", 1); + ApplicationProperties applicationProperties = new ApplicationProperties(properties); + Message_1_0 message = createTestMessage(applicationProperties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> headers = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected headers", properties, new HashMap<>(headers)); + } + + public void testSubjectConversion() + { + final String subject = "testSubject"; + Properties properties = new Properties(); + properties.setSubject(subject); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> headers = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected qpid.subject is missing from headers", subject, headers.get("qpid.subject")); + assertEquals("Unexpected type", subject, headers.get("x-jms-type")); + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected routing-key", subject, deliveryProperties.getRoutingKey()); + } + + public void testSubjectDoesNoReplaceApplicationPropertyQpidSubject() + { + final String subject = "testSubject"; + Properties properties = new Properties(); + properties.setSubject(subject); + final String qpidSubject = "testSubject2"; + Map<String, Object> applicationPropertiesMap = Collections.singletonMap("qpid.subject", qpidSubject); + ApplicationProperties applicationProperties = new ApplicationProperties(applicationPropertiesMap); + + Message_1_0 message = createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + applicationProperties, + 0, + null); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> headers = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected qpid.subject is missing from headers", qpidSubject, headers.get("qpid.subject")); + assertEquals("Unexpected type", subject, headers.get("x-jms-type")); + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected routing-key", subject, deliveryProperties.getRoutingKey()); + } + + public void testSubjectDoesNoReplaceApplicationPropertyXJMSType() + { + final String subject = "testSubject"; + Properties properties = new Properties(); + properties.setSubject(subject); + final String jmsType = "testJmsType"; + Map<String, Object> applicationPropertiesMap = Collections.singletonMap("x-jms-type", jmsType); + ApplicationProperties applicationProperties = new ApplicationProperties(applicationPropertiesMap); + + Message_1_0 message = createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + applicationProperties, + 0, + null); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> headers = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected qpid.subject is missing from headers", subject, headers.get("qpid.subject")); + assertEquals("Unexpected type", jmsType, headers.get("x-jms-type")); + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected routing-key", subject, deliveryProperties.getRoutingKey()); + } + + public void testSubjectConversionWhenSubjectExceeds255() + { + final String subject = generateLongString(); + Properties properties = new Properties(); + properties.setSubject(subject); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("Expected conversion exception"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testDurableConversion() + { + final Header header = new Header(); + header.setDurable(true); + Message_1_0 message = createTestMessage(header); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected deliveryMode", MessageDeliveryMode.PERSISTENT, deliveryProperties.getDeliveryMode()); + } + + public void testNonDurableConversion() + { + final Header header = new Header(); + header.setDurable(false); + Message_1_0 message = createTestMessage(header); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected deliveryMode", + MessageDeliveryMode.NON_PERSISTENT, + deliveryProperties.getDeliveryMode()); + } + + public void testPriorityConversion() + { + final Header header = new Header(); + final byte priority = (byte) 7; + header.setPriority(UnsignedByte.valueOf(priority)); + Message_1_0 message = createTestMessage(header); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected priority", priority, deliveryProperties.getPriority().getValue()); + } + + public void testCorrelationIdStringConversion() + { + final String correlationId = "testCorrelationId"; + Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertTrue("Unexpected correlationId", + Arrays.equals(correlationId.getBytes(), messageProperties.getCorrelationId())); + } + + public void testCorrelationIdLongStringConversion() + { + final String correlationId = generateLongLongString(); + Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("Expected exception not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testCorrelationIdULongConversion() + { + final UnsignedLong correlationId = UnsignedLong.valueOf(-1); + Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertTrue("Unexpected correlationId", + Arrays.equals(longToBytes(correlationId.longValue()), messageProperties.getCorrelationId())); + } + + public void testCorrelationIdUUIDConversion() + { + final UUID correlationId = UUID.randomUUID(); + Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final byte[] expectedBytes = + longToBytes(correlationId.getMostSignificantBits(), correlationId.getLeastSignificantBits()); + assertTrue("Unexpected correlationId", Arrays.equals(expectedBytes, messageProperties.getCorrelationId())); + } + + public void testCorrelationIdBinaryConversion() + { + final String testCorrelationId = "testCorrelationId"; + final Binary correlationId = new Binary(testCorrelationId.getBytes(UTF_8)); + Properties properties = new Properties(); + properties.setCorrelationId(correlationId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertTrue("Unexpected correlationId", + Arrays.equals(testCorrelationId.getBytes(), messageProperties.getCorrelationId())); + } + + public void testReplyToConversionWhenResultExceeds255() + { + final String replyTo = generateLongString() + "/" + generateLongString(); + Properties properties = new Properties(); + properties.setReplyTo(replyTo); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("expected exception not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testTTLConversion() + { + long ttl = 10000; + long arrivalTime = System.currentTimeMillis(); + long expectedExpiration = arrivalTime + ttl; + Header header = new Header(); + header.setTtl(UnsignedInteger.valueOf(ttl)); + Message_1_0 message = createTestMessage(header, arrivalTime); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected ttl", ttl, deliveryProperties.getTtl()); + assertEquals("Unexpected expiration", expectedExpiration, deliveryProperties.getExpiration()); + } + + public void testAbsoluteExpiryTimeConversion() + { + long ttl = 10000; + long arrivalTime = System.currentTimeMillis(); + long expiryTime = arrivalTime + ttl; + Properties properties = new Properties(); + properties.setAbsoluteExpiryTime(new Date(expiryTime)); + Message_1_0 message = createTestMessage(properties, arrivalTime); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected ttl", ttl, deliveryProperties.getTtl()); + assertEquals("Unexpected expiration", expiryTime, deliveryProperties.getExpiration()); + } + + public void testConversionOfTtlTakesPrecedenceOverAbsoluteExpiryTime() + { + long ttl = 10000; + final long time = System.currentTimeMillis(); + long absoluteExpiryTime = time + ttl; + long arrivalTime = time + 1; + + Header header = new Header(); + header.setTtl(UnsignedInteger.valueOf(ttl)); + + Properties properties = new Properties(); + properties.setAbsoluteExpiryTime(new Date(absoluteExpiryTime)); + + Message_1_0 message = createTestMessage(header, + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + new ApplicationProperties(Collections.emptyMap()), + arrivalTime, + null); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected ttl", ttl, deliveryProperties.getTtl()); + assertEquals("Unexpected expiration", arrivalTime + ttl, deliveryProperties.getExpiration()); + } + + public void testMessageIdStringConversion() + { + final String messageId = "testMessageId"; + Properties properties = new Properties(); + properties.setMessageId(messageId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", + UUID.nameUUIDFromBytes(messageId.getBytes(UTF_8)), + messageProperties.getMessageId()); + } + + public void testMessageIdUuidAsStringConversion() + { + final UUID messageId = UUID.randomUUID(); + Properties properties = new Properties(); + properties.setMessageId(messageId.toString()); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", messageId, messageProperties.getMessageId()); + } + + public void testMessageIdUUIDConversion() + { + final UUID messageId = UUID.randomUUID(); + Properties properties = new Properties(); + properties.setMessageId(messageId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", messageId, messageProperties.getMessageId()); + } + + public void testMessageIdUnsignedLongConversion() + { + final UnsignedLong messageId = UnsignedLong.valueOf(-1); + Properties properties = new Properties(); + properties.setMessageId(messageId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", + UUID.nameUUIDFromBytes(longToBytes(messageId.longValue())), + messageProperties.getMessageId()); + } + + public void testMessageIdBinaryConversion() + { + final String messageId = "testMessageId"; + Properties properties = new Properties(); + properties.setMessageId(new Binary(messageId.getBytes(UTF_8))); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", + UUID.nameUUIDFromBytes(messageId.getBytes(UTF_8)), + messageProperties.getMessageId()); + } + + public void testMessageIdByteArrayConversion() + { + final byte[] messageId = "testMessageId".getBytes(UTF_8); + Properties properties = new Properties(); + properties.setMessageId(messageId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", UUID.nameUUIDFromBytes(messageId), messageProperties.getMessageId()); + } + + public void testMessageIdBinaryConversionWhenNonUtf8() + { + final byte[] messageId = new byte[]{(byte) 0xc3, 0x28}; + Properties properties = new Properties(); + properties.setMessageId(new Binary(messageId)); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected messageId", UUID.nameUUIDFromBytes(messageId), messageProperties.getMessageId()); + } + + public void testCreationTimeConversion() + { + final long timestamp = System.currentTimeMillis() - 10000; + final long arrivalTime = timestamp + 1; + Properties properties = new Properties(); + properties.setCreationTime(new Date(timestamp)); + Message_1_0 message = createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + new ApplicationProperties(Collections.emptyMap()), + arrivalTime, + null); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected timestamp", timestamp, deliveryProperties.getTimestamp()); + } + + public void testArrivalTimeConversion() + { + final long arrivalTime = System.currentTimeMillis() - 10000; + Message_1_0 message = createTestMessage(new Header(), arrivalTime); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected timestamp", arrivalTime, deliveryProperties.getTimestamp()); + } + + public void testUserIdConversion() + { + final String userId = "test-userId"; + Properties properties = new Properties(); + properties.setUserId(new Binary(userId.getBytes())); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertTrue("Unexpected user-id", Arrays.equals(userId.getBytes(UTF_8), messageProperties.getUserId())); + } + + public void testUserIdConversionWhenLengthExceeds16Bit() + { + final String userId = generateLongLongString(); + Properties properties = new Properties(); + properties.setUserId(new Binary(userId.getBytes())); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertNull("Unexpected user-id", messageProperties.getUserId()); + } + + public void testGroupIdConversion() + { + String testGroupId = generateLongString(); + Properties properties = new Properties(); + properties.setGroupId(testGroupId); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> applicationHeaders = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected group-id", testGroupId, applicationHeaders.get("JMSXGroupID")); + } + + public void testGroupIdDoesNotReplaceApplicationPropertiesJMSXGroupID() + { + String testGroupId = "group1"; + Properties properties = new Properties(); + properties.setGroupId(testGroupId); + final String JMSXGroupID = "group2"; + Map<String, Object> applicationPropertiesMap = Collections.singletonMap("JMSXGroupID", JMSXGroupID); + ApplicationProperties applicationProperties = new ApplicationProperties(applicationPropertiesMap); + + Message_1_0 message = createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + applicationProperties, + 0, + null); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> applicationHeaders = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected group-id", JMSXGroupID, applicationHeaders.get("JMSXGroupID")); + } + + public void testGroupSequenceConversion() + { + int testGroupSequence = 1; + Properties properties = new Properties(); + properties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence)); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> applicationHeaders = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected group-id", testGroupSequence, applicationHeaders.get("JMSXGroupSeq")); + } + + public void testGroupSequenceDoesNotReplaceApplicationPropertiesJMSXGroupSeq() + { + int testGroupSequence = 1; + Properties properties = new Properties(); + properties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence)); + + final int JMSXGroupSeq = 2; + Map<String, Object> applicationPropertiesMap = Collections.singletonMap("JMSXGroupSeq", JMSXGroupSeq); + ApplicationProperties applicationProperties = new ApplicationProperties(applicationPropertiesMap); + + Message_1_0 message = createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + applicationProperties, + 0, + null); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + final Map<String, Object> applicationHeaders = messageProperties.getApplicationHeaders(); + assertEquals("Unexpected JMSXGroupSeq", JMSXGroupSeq, applicationHeaders.get("JMSXGroupSeq")); + } + + + public void testApplicationPropertiesConversionWhenKeyLengthExceeds255() + { + Map<String, Object> properties = Collections.singletonMap("testProperty-" + generateLongString(), "testValue"); + ApplicationProperties applicationProperties = new ApplicationProperties(properties); + Message_1_0 message = createTestMessage(applicationProperties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("Exception is expected"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenExchangeAndRoutingKeyIsSpecified() + { + final String testExchange = "testExchange"; + final String testRoutingKey = "testRoutingKey"; + + String to = testExchange + "/" + testRoutingKey; + Properties properties = new Properties(); + properties.setTo(to); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", testExchange, deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testRoutingKey, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenExchangeIsSpecified() + { + final String testExchange = "testExchange"; + Properties properties = new Properties(); + properties.setTo(testExchange); + Message_1_0 message = createTestMessage(properties); + + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class)); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", testExchange, deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", "", deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenExchangeIsSpecifiedAndSubjectIsSet() + { + final String testExchange = "testExchange"; + final String testRoutingKey = "testRoutingKey"; + Properties properties = new Properties(); + properties.setTo(testExchange); + properties.setSubject(testRoutingKey); + Message_1_0 message = createTestMessage(properties); + + when(_namedAddressSpace.getAttainedMessageDestination(testExchange)).thenReturn(mock(Exchange.class)); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", testExchange, deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testRoutingKey, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenQueueIsSpecified() + { + final String testQueue = "testQueue"; + Properties properties = new Properties(); + properties.setTo(testQueue); + Message_1_0 message = createTestMessage(properties); + + when(_namedAddressSpace.getAttainedMessageDestination(testQueue)).thenReturn(mock(Queue.class)); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", "", deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", testQueue, deliveryProperties.getRoutingKey()); + } + + public void testToConversionWhenGlobalAddress() + { + final String globalAddress = "/testQueue"; + Properties properties = new Properties(); + properties.setTo(globalAddress); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenExchangeLengthExceeds255() + { + final String testExchange = generateLongString(); + final String testRoutingKey = "testRoutingKey"; + + String to = testExchange + "/" + testRoutingKey; + Properties properties = new Properties(); + properties.setTo(to); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenRoutingKeyLengthExceeds255() + { + final String testExchange = "testExchange"; + final String testRoutingKey = generateLongString(); + + String to = testExchange + "/" + testRoutingKey; + Properties properties = new Properties(); + properties.setTo(to); + Message_1_0 message = createTestMessage(properties); + + try + { + _messageConverter.convert(message, _namedAddressSpace); + fail("Exception is not thrown"); + } + catch (MessageConversionException e) + { + // pass + } + } + + public void testToConversionWhenDestinationIsSpecifiedButDoesNotExists() + { + final String testDestination = "testDestination"; + Properties properties = new Properties(); + properties.setTo(testDestination); + Message_1_0 message = createTestMessage(properties); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final DeliveryProperties deliveryProperties = + convertedMessage.getStoredMessage().getMetaData().getDeliveryProperties(); + assertEquals("Unexpected exchange", testDestination, deliveryProperties.getExchange()); + assertEquals("Unexpected routing key", "", deliveryProperties.getRoutingKey()); + } + + public void testContentToContentLengthConversion() + { + final byte[] content = new byte[]{0x31, 0x00, 0x10}; + Message_1_0 message = createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + new Properties(), + new ApplicationProperties(Collections.emptyMap()), + 0, + content); + + final MessageTransferMessage convertedMessage = _messageConverter.convert(message, _namedAddressSpace); + + final MessageProperties messageProperties = + convertedMessage.getStoredMessage().getMetaData().getMessageProperties(); + assertEquals("Unexpected content length", content.length, messageProperties.getContentLength()); + } + + private Message_1_0 createTestMessage(final Header header) + { + return createTestMessage(header, 0); + } + + private Message_1_0 createTestMessage(final Header header, long arrivalTime) + { + return createTestMessage(header, + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + new Properties(), + new ApplicationProperties(Collections.emptyMap()), + arrivalTime, + null); + } + + private Message_1_0 createTestMessage(final Properties properties) + { + return createTestMessage(properties, 0L); + } + + private Message_1_0 createTestMessage(final Properties properties, final long arrivalTime) + { + return createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + properties, + new ApplicationProperties(Collections.emptyMap()), + arrivalTime, + null); + } + + private Message_1_0 createTestMessage(final ApplicationProperties applicationProperties) + { + return createTestMessage(new Header(), + new DeliveryAnnotations(Collections.emptyMap()), + new MessageAnnotations(Collections.emptyMap()), + new Properties(), + applicationProperties, + 0, + null); + } + + private Message_1_0 createTestMessage(final Header header, + final DeliveryAnnotations deliveryAnnotations, + final MessageAnnotations messageAnnotations, + final Properties properties, + final ApplicationProperties applicationProperties, + final long arrivalTime, + final byte[] content) + { + final StoredMessage<MessageMetaData_1_0> storedMessage = mock(StoredMessage.class); + MessageMetaData_1_0 metaData = new MessageMetaData_1_0(header.createEncodingRetainingSection(), + deliveryAnnotations.createEncodingRetainingSection(), + messageAnnotations.createEncodingRetainingSection(), + properties.createEncodingRetainingSection(), + applicationProperties.createEncodingRetainingSection(), + new Footer(Collections.emptyMap()).createEncodingRetainingSection(), + arrivalTime, + content == null ? 0 : content.length); + when(storedMessage.getMetaData()).thenReturn(metaData); + + if (content != null) + { + Binary binary = new Binary(content); + DataSection dataSection = new Data(binary).createEncodingRetainingSection(); + List<QpidByteBuffer> qbbList = dataSection.getEncodedForm(); + int length = (int) QpidByteBufferUtils.remaining(qbbList); + when(storedMessage.getContentSize()).thenReturn(length); + when(storedMessage.getContent(0, length)).thenReturn(qbbList); + } + return new Message_1_0(storedMessage); + } + + private String generateLongString() + { + StringBuilder buffer = new StringBuilder(); + for (int i = 0; i < 256; i++) + { + buffer.append('x'); + } + + return buffer.toString(); + } + + private String generateLongLongString() + { + StringBuilder buffer = new StringBuilder(); + for (int i = 0; i < 1 << 16; i++) + { + buffer.append('x'); + } + + return buffer.toString(); + } + + private byte[] longToBytes(long... x) + { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES * x.length); + for (long l : x) + { + buffer.putLong(l); + } + return buffer.array(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a94d6ffe/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java index 2fb9243..8177444 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -20,6 +20,17 @@ */ package org.apache.qpid.server.protocol.converter.v0_8_v1_0; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertBodyToObject; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.convertValue; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getAbsoluteExpiryTime; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getCorrelationId; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getCreationTime; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getGroupId; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getGroupSequence; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getMessageId; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getTtl; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getUserId; + import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -42,22 +53,15 @@ import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; -import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; -import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection; -import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; -import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection; import org.apache.qpid.server.store.StoredMessage; @PluggableService public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_0, AMQMessage> { - private static final int BASIC_CLASS_ID = 60; - @Override public Class<Message_1_0> getInputClass() @@ -86,7 +90,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ private StoredMessage<MessageMetaData> convertToStoredMessage(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace) { - Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); + Object bodyObject = convertBodyToObject(serverMsg); final ObjectToMimeContentConverter converter = MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter(bodyObject); @@ -169,7 +173,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ props.setContentType(bodyMimeType); props.setEncoding(convertToShortStringForProperty("content-encoding", serverMsg.getMessageHeader().getEncoding())); - props.setCorrelationId(getCorrelationId(serverMsg)); + props.setCorrelationId(getCorrelationIdAsShortString(serverMsg)); props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT); @@ -187,21 +191,21 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } } - props.setMessageId(getMessageId(serverMsg)); + props.setMessageId(getMessageIdAsShortString(serverMsg)); props.setPriority(serverMsg.getMessageHeader().getPriority()); props.setReplyTo(getReplyTo(serverMsg, addressSpace)); - final long timestamp = serverMsg.getMessageHeader().getTimestamp(); - if (timestamp > 0) + Date timestamp = getCreationTime(serverMsg); + if (timestamp != null) { - props.setTimestamp(timestamp); + props.setTimestamp(timestamp.getTime()); } else { props.setTimestamp(serverMsg.getArrivalTime()); } - props.setUserId(getUserId(serverMsg)); + props.setUserId(getUserIdAsShortString(serverMsg)); - Map<String,Object> headerProps = new LinkedHashMap<String, Object>(); + Map<String,Object> headerProps = new LinkedHashMap<>(); if(header.getSubject() != null) { @@ -223,8 +227,7 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ for (String headerName : serverMsg.getMessageHeader().getHeaderNames()) { - headerProps.put(headerName, - MessageConverter_from_1_0.convertValue(serverMsg.getMessageHeader().getHeader(headerName))); + headerProps.put(headerName, convertValue(serverMsg.getMessageHeader().getHeader(headerName))); } final FieldTable headers; @@ -298,123 +301,47 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ routingKey)); } - private UnsignedInteger getGroupSequence(final Message_1_0 serverMsg) + private AMQShortString getUserIdAsShortString(final Message_1_0 serverMsg) { - final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); - if (propertiesSection != null) + Binary userId = getUserId(serverMsg); + if (userId != null) { - final Properties properties = propertiesSection.getValue(); - if (properties != null) + try { - return properties.getGroupSequence(); + return new AMQShortString(userId.getArray()); } - } - return null; - } - - private String getGroupId(final Message_1_0 serverMsg) - { - final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); - if (propertiesSection != null) - { - final Properties properties = propertiesSection.getValue(); - if (properties != null) + catch (IllegalArgumentException e) { - return properties.getGroupId(); + return null; } } return null; } - private AMQShortString getUserId(final Message_1_0 serverMsg) + private AMQShortString getMessageIdAsShortString(final Message_1_0 serverMsg) { - final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); - if (propertiesSection != null) + Object messageId = getMessageId(serverMsg); + try { - final Properties properties = propertiesSection.getValue(); - if (properties != null) + if (messageId instanceof Binary) { - Binary userId = properties.getUserId(); - if (userId != null) - { - try - { - return new AMQShortString(userId.getArray()); - } - catch (IllegalArgumentException e) - { - return null; - } - } + return new AMQShortString(((Binary) messageId).getArray()); } - } - return null; - } - - private AMQShortString getMessageId(final Message_1_0 serverMsg) - { - final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); - if (propertiesSection != null) - { - final Properties properties = propertiesSection.getValue(); - if (properties != null) + else if (messageId instanceof byte[]) { - Object messageId = properties.getMessageId(); - try - { - if (messageId instanceof Binary) - { - return new AMQShortString(((Binary) messageId).getArray()); - } - else if (messageId instanceof byte[]) - { - return new AMQShortString(((byte[]) messageId)); - } - else - { - return AMQShortString.valueOf(messageId); - } - } - catch (IllegalArgumentException e) - { - // pass - } + return new AMQShortString(((byte[]) messageId)); } - } - return null; - - } - - private Date getAbsoluteExpiryTime(final Message_1_0 serverMsg) - { - final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); - if (propertiesSection != null) - { - final Properties properties = propertiesSection.getValue(); - if (properties != null) + else { - return properties.getAbsoluteExpiryTime(); + return AMQShortString.valueOf(messageId); } } - return null; - } - - private Long getTtl(final Message_1_0 serverMsg) - { - HeaderSection headerSection = serverMsg.getHeaderSection(); - if (headerSection != null) + catch (IllegalArgumentException e) { - Header header = headerSection.getValue(); - if (header != null) - { - UnsignedInteger ttl = header.getTtl(); - if (ttl != null) - { - return ttl.longValue(); - } - } + // pass } return null; + } private AMQShortString getReplyTo(final Message_1_0 serverMsg, final NamedAddressSpace addressSpace) @@ -431,36 +358,28 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_ } } - private AMQShortString getCorrelationId(final Message_1_0 serverMsg) + private AMQShortString getCorrelationIdAsShortString(final Message_1_0 serverMsg) { - AMQShortString correlationId = null; - final PropertiesSection propertiesSection = serverMsg.getPropertiesSection(); - if (propertiesSection != null) + Object correlationIdObject = getCorrelationId(serverMsg); + final AMQShortString correlationId; + try { - final Properties properties = propertiesSection.getValue(); - if (properties != null) + if (correlationIdObject instanceof Binary) { - final Object correlationIdObject = properties.getCorrelationId(); - try - { - if (correlationIdObject instanceof Binary) - { - correlationId = new AMQShortString(((Binary) correlationIdObject).getArray()); - } - else if (correlationIdObject instanceof byte[]) - { - correlationId = new AMQShortString(((byte[]) correlationIdObject)); - } - else - { - correlationId = AMQShortString.valueOf(correlationIdObject); - } - } - catch (IllegalArgumentException e) - { - throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'correlation-id' failed.", e); - } + correlationId = new AMQShortString(((Binary) correlationIdObject).getArray()); + } + else if (correlationIdObject instanceof byte[]) + { + correlationId = new AMQShortString(((byte[]) correlationIdObject)); } + else + { + correlationId = AMQShortString.valueOf(correlationIdObject); + } + } + catch (IllegalArgumentException e) + { + throw new MessageConversionException("Could not convert message from 1.0 to 0-8 because conversion of 'correlation-id' failed.", e); } return correlationId; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a94d6ffe/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java index ecf85ac..067fe35 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/PropertyConverter_1_0_to_0_8Test.java @@ -479,7 +479,7 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase assertEquals("Unexpected timestamp", arrivalTime, convertedProperties.getTimestamp()); } - public void testUserIdyConversion() + public void testUserIdConversion() { final String userId = "test-userId"; Properties properties = new Properties(); @@ -492,7 +492,7 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase assertEquals("Unexpected user-id", userId, convertedProperties.getUserIdAsString()); } - public void testUserIdyConversionWhenLengthExceeds255() + public void testUserIdConversionWhenLengthExceeds255() { final String userId = generateLongString(); Properties properties = new Properties(); @@ -505,7 +505,7 @@ public class PropertyConverter_1_0_to_0_8Test extends QpidTestCase assertNull("Unexpected user-id", convertedProperties.getUserId()); } - public void testUserIdyConversionWhenNonUtf8() + public void testUserIdConversionWhenNonUtf8() { final byte[] userId = new byte[]{(byte) 0xc3, 0x28}; Properties properties = new Properties(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org