Author: rgodfrey Date: Mon Jul 15 14:40:07 2013 New Revision: 1503272 URL: http://svn.apache.org/r1503272 Log: QPID-4659 : [Java Broker] remove redundant code
Removed: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/HeaderPropertiesConverter.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1503272&r1=1503271&r2=1503272&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Mon Jul 15 14:40:07 2013 @@ -376,329 +376,6 @@ class Subscription_1_0 implements Subscr } - private StoredMessage<MessageMetaData_1_0> convert010Message(final MessageTransferMessage serverMessage) - { - final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - - return convertServerMessage(metaData, serverMessage); - - } - - private MessageMetaData_1_0 convertMetaData(final MessageTransferMessage serverMessage) - { - List<Section> sections = new ArrayList<Section>(3); - final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); - final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); - - Header header = new Header(); - if(deliveryProps != null) - { - header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); - if(deliveryProps.hasPriority()) - { - header.setPriority(UnsignedByte.valueOf((byte)deliveryProps.getPriority().getValue())); - } - if(deliveryProps.hasTtl()) - { - header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); - } - sections.add(header); - } - - Properties props = new Properties(); - if(msgProps != null) - { - // props.setAbsoluteExpiryTime(); - if(msgProps.hasContentEncoding()) - { - props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); - } - - if(msgProps.hasCorrelationId()) - { - props.setCorrelationId(msgProps.getCorrelationId()); - } - // props.setCreationTime(); - // props.setGroupId(); - // props.setGroupSequence(); - if(msgProps.hasMessageId()) - { - props.setMessageId(msgProps.getMessageId()); - } - if(msgProps.hasReplyTo()) - { - props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); - } - if(msgProps.hasContentType()) - { - props.setContentType(Symbol.valueOf(msgProps.getContentType())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } - } - // props.setReplyToGroupId(); - props.setSubject(serverMessage.getRoutingKey()); - // props.setTo(); - if(msgProps.hasUserId()) - { - props.setUserId(new Binary(msgProps.getUserId())); - } - - sections.add(props); - - if(msgProps.getApplicationHeaders() != null) - { - sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); - } - } - return new MessageMetaData_1_0(sections, _sectionEncoder); - } - - private StoredMessage<MessageMetaData_1_0> convert08Message(final AMQMessage serverMessage) - { - final MessageMetaData_1_0 metaData = convertMetaData(serverMessage); - - return convertServerMessage(metaData, serverMessage); - - - } - - private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, - final ServerMessage serverMessage) - { - final String mimeType = serverMessage.getMessageHeader().getMimeType(); - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getContent(ByteBuffer.wrap(data), 0); - - Section bodySection = convertMessageBody(mimeType, data); - - final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection); - - return new StoredMessage<MessageMetaData_1_0>() - { - @Override - public MessageMetaData_1_0 getMetaData() - { - return metaData; - } - - @Override - public long getMessageNumber() - { - return serverMessage.getMessageNumber(); - } - - @Override - public void addContent(int offsetInMessage, ByteBuffer src) - { - throw new UnsupportedOperationException(); - } - - @Override - public int getContent(int offsetInMessage, ByteBuffer dst) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - int size; - if(dst.remaining()<buf.remaining()) - { - buf.limit(dst.remaining()); - size = dst.remaining(); - } - else - { - size = buf.remaining(); - } - dst.put(buf); - return size; - } - - @Override - public ByteBuffer getContent(int offsetInMessage, int size) - { - ByteBuffer buf = allData.duplicate(); - buf.position(offsetInMessage); - buf = buf.slice(); - if(size < buf.remaining()) - { - buf.limit(size); - } - return buf; - } - - @Override - public StoreFuture flushToStore() - { - throw new UnsupportedOperationException(); - } - - @Override - public void remove() - { - serverMessage.getStoredMessage().remove(); - } - }; - } - - private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection) - { - int headerSize = (int) metaData.getStorableSize(); - - _sectionEncoder.reset(); - _sectionEncoder.encodeObject(bodySection); - Binary dataEncoding = _sectionEncoder.getEncoding(); - - final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); - metaData.writeToBuffer(0,allData); - allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); - return allData; - } - - private static Section convertMessageBody(String mimeType, byte[] data) - { - if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) - { - String text = new String(data); - return new AmqpValue(text); - } - else if("jms/map-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - LinkedHashMap map = new LinkedHashMap(); - final int entries = reader.readIntImpl(); - for (int i = 0; i < entries; i++) - { - try - { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - map.put(propName, value); - } - catch (EOFException e) - { - throw new IllegalArgumentException(e); - } - catch (TypedBytesFormatException e) - { - throw new IllegalArgumentException(e); - } - - } - - return new AmqpValue(map); - - } - else if("amqp/map".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return new AmqpValue(decoder.readMap()); - - } - else if("amqp/list".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return new AmqpValue(decoder.readList()); - } - else if("jms/stream-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - List list = new ArrayList(); - while (reader.remaining() != 0) - { - try - { - list.add(reader.readObject()); - } - catch (TypedBytesFormatException e) - { - throw new RuntimeException(e); // TODO - Implement - } - catch (EOFException e) - { - throw new RuntimeException(e); // TODO - Implement - } - } - return new AmqpValue(list); - } - else - { - return new Data(new Binary(data)); - - } - } - - private MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage) - { - - List<Section> sections = new ArrayList<Section>(3); - - Header header = new Header(); - - header.setDurable(serverMessage.isPersistent()); - - BasicContentHeaderProperties contentHeader = - (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); - - header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); - final long expiration = serverMessage.getExpiration(); - final long arrivalTime = serverMessage.getArrivalTime(); - - if(expiration > arrivalTime) - { - header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); - } - sections.add(header); - - - Properties props = new Properties(); - - props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); - - props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } - - final AMQShortString correlationId = contentHeader.getCorrelationId(); - if(correlationId != null) - { - props.setCorrelationId(new Binary(correlationId.getBytes())); - } - // props.setCreationTime(); - // props.setGroupId(); - // props.setGroupSequence(); - final AMQShortString messageId = contentHeader.getMessageId(); - if(messageId != null) - { - props.setMessageId(new Binary(messageId.getBytes())); - } - props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); - - // props.setReplyToGroupId(); - props.setSubject(serverMessage.getRoutingKey()); - // props.setTo(); - if(contentHeader.getUserId() != null) - { - props.setUserId(new Binary(contentHeader.getUserId().getBytes())); - } - sections.add(props); - - sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); - - return new MessageMetaData_1_0(sections, _sectionEncoder); - } - public void queueDeleted(final AMQQueue queue) { //TODO --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org