Repository: qpid-jms Updated Branches: refs/heads/master 148e1b9e9 -> 2bd7b1cc4
use the message envelopes to track deliveries etc, remove JmsMessageId class and have the facades use String for messageid Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2bd7b1cc Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2bd7b1cc Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2bd7b1cc Branch: refs/heads/master Commit: 2bd7b1cc4a33f3f5d012ca89c855ae93414e665b Parents: 2728bf6 Author: Robert Gemmell <[email protected]> Authored: Thu Sep 25 14:34:10 2014 +0100 Committer: Robert Gemmell <[email protected]> Committed: Thu Sep 25 15:23:42 2014 +0100 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsSession.java | 18 ++-- .../jms/message/JmsInboundMessageDispatch.java | 21 +++++ .../org/apache/qpid/jms/message/JmsMessage.java | 27 ++---- .../message/JmsMessagePropertyIntercepter.java | 5 +- .../jms/message/JmsOutboundMessageDispatch.java | 18 ++++ .../jms/message/facade/JmsMessageFacade.java | 32 ++++--- .../defaults/JmsDefaultMessageFacade.java | 14 ++-- .../org/apache/qpid/jms/meta/JmsMessageId.java | 88 -------------------- .../qpid/jms/provider/amqp/AmqpConsumer.java | 32 ++++--- .../jms/provider/amqp/AmqpFixedProducer.java | 2 +- .../amqp/message/AmqpJmsMessageFacade.java | 14 ++-- .../amqp/message/AmqpJmsMessageFacadeTest.java | 12 ++- 12 files changed, 99 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 92aa83a..0c8343c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -67,7 +67,6 @@ import org.apache.qpid.jms.message.JmsMessageFactory; import org.apache.qpid.jms.message.JmsMessageTransformation; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConsumerId; -import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsSessionId; import org.apache.qpid.jms.meta.JmsSessionInfo; @@ -649,28 +648,22 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa original.setJMSExpiration(timeStamp + timeToLive); } - JmsMessageId msgId = null; + String msgId = null; if (!disableMsgId) { msgId = getNextMessageId(producer); } + original.setJMSMessageID(msgId); boolean isJmsMessageType = original instanceof JmsMessage; if (isJmsMessageType) { ((JmsMessage) original).setConnection(connection); - if (!disableMsgId) { - ((JmsMessage) original).setJMSMessageID(msgId); - } original.setJMSDestination(destination); } JmsMessage copy = JmsMessageTransformation.transformMessage(connection, original); - // Ensure original message gets the destination and message ID as per spec. + // Ensure original message gets the destination as per spec. if (!isJmsMessageType) { - if (!disableMsgId) { - original.setJMSMessageID(msgId.toString()); - copy.setJMSMessageID(msgId); - } original.setJMSDestination(destination); copy.setJMSDestination(destination); } @@ -684,6 +677,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa envelope.setProducerId(producer.getProducerId()); envelope.setDestination(destination); envelope.setSendAsync(!sync); + envelope.setDispatchId(msgId); this.connection.send(envelope); } finally { @@ -858,8 +852,8 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa return new JmsProducerId(sessionInfo.getSessionId(), producerIdGenerator.incrementAndGet()); } - private JmsMessageId getNextMessageId(JmsMessageProducer producer) { - return new JmsMessageId(producer.getProducerId().toString() + "-" + producer.getNextMessageSequence()); + private String getNextMessageId(JmsMessageProducer producer) { + return producer.getProducerId().toString() + "-" + producer.getNextMessageSequence(); } private <T extends JmsMessage> T init(T message) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java index fd8b0b1..46bd4d5 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java @@ -26,6 +26,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId{ private JmsConsumerId consumerId; private JmsMessage message; + private String dispatchId; public JmsMessage getMessage() { return message; @@ -46,4 +47,24 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId{ public void onMessageRedelivered() { this.message.incrementRedeliveryCount(); } + + public void setDispatchId(String dispatchId) + { + this.dispatchId = dispatchId; + } + + @Override + public String toString() { + String result = "JmsInboundMessageDispatch {dispatchId = "; + String id = dispatchId; + if (id == null) { + result = result + "<null>}"; + } else { + result = result + id + "}"; + } + + return result; + } + + //TODO: equals and hashcode? } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java index df104de..eb06e21 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java @@ -34,7 +34,6 @@ import javax.jms.MessageNotWriteableException; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.facade.JmsMessageFacade; -import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.util.TypeConversionSupport; public class JmsMessage implements javax.jms.Message { @@ -89,12 +88,10 @@ public class JmsMessage implements javax.jms.Message { } JmsMessage msg = (JmsMessage) o; - JmsMessageId oMsg = null; - JmsMessageId thisMsg = null; - - thisMsg = facade.getMessageId(); - oMsg = msg.facade.getMessageId(); + String oMsg = msg.facade.getMessageId(); + String thisMsg = facade.getMessageId(); + //TODO: use super.equals if both id are null? return thisMsg != null && oMsg != null && oMsg.equals(thisMsg); } @@ -129,12 +126,7 @@ public class JmsMessage implements javax.jms.Message { @Override public String getJMSMessageID() throws JMSException { - JmsMessageId facadeId = facade.getMessageId(); - if (facadeId == null) { - return null; - } - - String value = facadeId.getValue(); + String value = facade.getMessageId(); if (value != null && !value.startsWith(ID_PREFIX)) { value = ID_PREFIX + value; } @@ -144,16 +136,7 @@ public class JmsMessage implements javax.jms.Message { @Override public void setJMSMessageID(String value) throws JMSException { - if (value != null) { - JmsMessageId id = new JmsMessageId(value); - facade.setMessageId(id); - } else { - facade.setMessageId(null); - } - } - - public void setJMSMessageID(JmsMessageId messageId) throws JMSException { - facade.setMessageId(messageId); + facade.setMessageId(value); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java index 863f420..22116ee 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java @@ -29,7 +29,6 @@ import javax.jms.JMSException; import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.facade.JmsMessageFacade; -import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.util.TypeConversionSupport; /** @@ -239,7 +238,7 @@ public class JmsMessagePropertyIntercepter { if (message.getMessageId() == null) { return null; } - return message.getMessageId().toString(); + return message.getMessageId(); } @Override @@ -248,7 +247,7 @@ public class JmsMessagePropertyIntercepter { if (rc == null) { throw new JMSException("Property JMSMessageID cannot be set from a " + value.getClass().getName() + "."); } - message.setMessageId(new JmsMessageId(rc)); + message.setMessageId(rc); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java index c77739b..44e211f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java @@ -28,6 +28,7 @@ public class JmsOutboundMessageDispatch { private JmsMessage message; private JmsDestination destination; private boolean sendAsync; + private String dispatchId; public JmsDestination getDestination() { return destination; @@ -60,4 +61,21 @@ public class JmsOutboundMessageDispatch { public boolean isSendAsync() { return sendAsync; } + + public void setDispatchId(String dispatchId) { + this.dispatchId = dispatchId; + } + + @Override + public String toString() { + String result = "JmsOutboundMessageDispatch {dispatchId = "; + String id = dispatchId; + if (id == null) { + result = result + "<null>}"; + } else { + result = result + id + "}"; + } + + return result; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java index c8ab9ce..c2d9e8a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java @@ -21,7 +21,6 @@ import java.util.Map; import javax.jms.JMSException; import org.apache.qpid.jms.JmsDestination; -import org.apache.qpid.jms.meta.JmsMessageId; /** * The Message Facade interface defines the required mapping between a Provider's @@ -114,21 +113,6 @@ public interface JmsMessageFacade { JmsMessageFacade copy() throws JMSException; /** - * Return the internal message Id as a JmsMessageId wrapped value. - * - * @return a JmsMessageId that wraps the internal message Id. - */ - JmsMessageId getMessageId(); - - /** - * Updates the message Id using the value of the given JmsMessageId. - * - * @param messageId - * the new JmsMessageId value to assign as the message Id. - */ - void setMessageId(JmsMessageId messageId); - - /** * Gets the timestamp assigned to the message when it was sent. * * @return the message timestamp value. @@ -180,6 +164,21 @@ public interface JmsMessageFacade { void setCorrelationIdBytes(byte[] correlationId); /** + * Returns the message ID set on this message if one exists, null otherwise. + * + * @return the set message ID or null if not set. + */ + String getMessageId(); + + /** + * Sets the message ID for this message. + * + * @param messageId + * The message ID to set on this message, or null to clear. + */ + void setMessageId(String messageId); + + /** * @return true if this message is tagged as being persistent. */ boolean isPersistent(); @@ -353,5 +352,4 @@ public interface JmsMessageFacade { * the group sequence value to assign this message. */ void setGroupSequence(int groupSequence); - } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java index 8a501cc..32e501a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java @@ -26,7 +26,6 @@ import javax.jms.JMSException; import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.message.facade.JmsMessageFacade; -import org.apache.qpid.jms.meta.JmsMessageId; import org.fusesource.hawtbuf.AsciiBuffer; /** @@ -60,7 +59,7 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade { protected byte priority = javax.jms.Message.DEFAULT_PRIORITY; protected String groupId; protected int groupSequence; - protected JmsMessageId messageId; + protected String messageId; protected long expiration; protected long timestamp; protected String correlationId; @@ -95,10 +94,7 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade { target.destination = this.destination; target.replyTo = this.replyTo; target.userId = this.userId; - - if (this.messageId != null) { - target.messageId = this.messageId.copy(); - } + target.messageId = this.messageId; if (this.properties != null) { target.properties = new HashMap<String, Object>(this.properties); @@ -147,12 +143,12 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade { } @Override - public JmsMessageId getMessageId() { - return this.messageId; + public String getMessageId() { + return messageId; } @Override - public void setMessageId(JmsMessageId messageId) { + public void setMessageId(String messageId) { this.messageId = messageId; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java deleted file mode 100644 index 9a75242..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.meta; - - -/** - * JMS Message Id class used to uniquely identify messages for the JMS Framework. - */ -public class JmsMessageId extends JmsAbstractResourceId implements Comparable<JmsMessageId> { - - protected String messageId; - - public JmsMessageId(String messageId) { - this.messageId = messageId; - } - - public JmsMessageId copy() { - JmsMessageId copy = new JmsMessageId(messageId); - return copy; - } - - /** - * @return the set message ID value. - */ - public String getValue() { - return messageId; - } - - @Override - public boolean equals(Object o) { - //TODO: handle messages with no messageId value - if (this == o) { - return true; - } - if (o == null || o.getClass() != getClass()) { - return false; - } - - JmsMessageId id = (JmsMessageId) o; - return id.messageId.equals(this.messageId); - } - - @Override - public int hashCode() { - //TODO: handle messages with no messageId value - if (hashCode == 0) { - hashCode = messageId.hashCode(); - } - return hashCode; - } - - @Override - public int compareTo(JmsMessageId other) { - //TODO: handle messages with no messageId value - int result = -1; - if (other != null) { - result = this.toString().compareTo(other.toString()); - } - return result; - } - - @Override - public String toString() { - String result = "JmsMessageId{messageId = "; - Object id = messageId; - if (id == null) { - result = result + "<null>}"; - } else { - result = result + String.valueOf(id) + "}"; - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 1f51b46..d827287 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -28,7 +28,6 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.meta.JmsConsumerId; import org.apache.qpid.jms.meta.JmsConsumerInfo; -import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; import org.apache.qpid.jms.provider.ProviderListener; @@ -69,7 +68,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver protected final AmqpSession session; protected final InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(AmqpJMSVendor.INSTANCE);; - protected final Map<JmsMessageId, Delivery> delivered = new LinkedHashMap<JmsMessageId, Delivery>(); + protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>(); protected boolean presettle; private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream(); @@ -188,21 +187,20 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver * @throws JMSException if an error occurs accessing the Message properties. */ public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException { - JmsMessageId messageId = envelope.getMessage().getFacade().getMessageId(); Delivery delivery = null; - if (messageId.getProviderHint() instanceof Delivery) { - delivery = (Delivery) messageId.getProviderHint(); + if (envelope.getProviderHint() instanceof Delivery) { + delivery = (Delivery) envelope.getProviderHint(); } else { - delivery = delivered.get(messageId); + delivery = delivered.get(envelope); if (delivery == null) { - LOG.warn("Received Ack for unknown message: {}", messageId); + LOG.warn("Received Ack for unknown message: {}", envelope); return; } } if (ackType.equals(ACK_TYPE.DELIVERED)) { - LOG.debug("Delivered Ack of message: {}", messageId); + LOG.debug("Delivered Ack of message: {}", envelope); if (session.isTransacted()) { Binary txnId = session.getTransactionContext().getAmqpTransactionId(); if (txnId != null) { @@ -214,16 +212,16 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver } } if (!isPresettle()) { - delivered.put(messageId, delivery); + delivered.put(envelope, delivery); } sendFlowIfNeeded(); } else if (ackType.equals(ACK_TYPE.CONSUMED)) { // A Consumer may not always send a delivered ACK so we need to check to // ensure we don't add to much credit to the link. - if (isPresettle() || delivered.remove(messageId) == null) { + if (isPresettle() || delivered.remove(envelope) == null) { sendFlowIfNeeded(); } - LOG.debug("Consumed Ack of message: {}", messageId); + LOG.debug("Consumed Ack of message: {}", envelope); if (!delivery.isSettled()) { delivery.disposition(Accepted.getInstance()); delivery.settle(); @@ -237,7 +235,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver } else if (ackType.equals(ACK_TYPE.POISONED)) { deliveryFailed(delivery, false); } else { - LOG.warn("Unsupporeted Ack Type for message: {}", messageId); + LOG.warn("Unsupported Ack Type for message: {}", envelope); } } @@ -332,9 +330,6 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver return; } - // Store link to delivery in the hint for use in acknowledge requests. - message.getFacade().getMessageId().setProviderHint(incoming); - // We need to signal to the create message that it's being dispatched and for now // the transformer creates the message in write mode, onSend will reset it to read // mode and the consumer will see it as a normal received message. @@ -343,7 +338,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(); envelope.setMessage(message); envelope.setConsumerId(info.getConsumerId()); + // Store link to delivery in the hint for use in acknowledge requests. envelope.setProviderHint(incoming); + //TODO: the below messageId retrieval may result in type conversion costs + envelope.setDispatchId(message.getJMSMessageID()); // Store reference to envelope in delivery context for recovery incoming.setContext(envelope); @@ -408,13 +406,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver ProviderListener listener = session.getProvider().getProviderListener(); if (listener != null) { if (envelope.getMessage() != null) { - LOG.debug("Dispatching received message: {}", envelope.getMessage().getFacade().getMessageId()); + LOG.debug("Dispatching received message: {}", envelope); } else { LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId()); } listener.onMessage(envelope); } else { - LOG.error("Provider listener is not set, message will be dropped."); + LOG.error("Provider listener is not set, message will be dropped: {}", envelope); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index a36e419..dae51f3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -109,7 +109,7 @@ public class AmqpFixedProducer extends AmqpProducer { private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { JmsMessageFacade facade = envelope.getMessage().getFacade(); - LOG.trace("Producer sending message: {}", envelope.getMessage().getFacade().getMessageId()); + LOG.trace("Producer sending message: {}", envelope); byte[] tag = tagGenerator.getNextTag(); Delivery delivery = null; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java index d0ee5af..8afb6db 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java @@ -35,7 +35,6 @@ import javax.jms.MessageFormatException; import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.exceptions.IdConversionException; import org.apache.qpid.jms.message.facade.JmsMessageFacade; -import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.Proton; @@ -288,31 +287,30 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade { } @Override - public JmsMessageId getMessageId() { + public String getMessageId() { Object underlying = message.getMessageId(); AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE; String baseStringId = helper.toBaseMessageIdString(underlying); // Ensure the ID: prefix is present. - // TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix. + // TODO: should we always do this when non-null? AMQP JMS Mapping says never to send the "ID:" prefix. // TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities? // I Ended up putting it in JmsMessage after the above comment, as a workaround for the // current JmsDefaultMessageFacade usage. if (baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) { baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId; } - return new JmsMessageId(baseStringId); + return baseStringId; } @Override - public void setMessageId(JmsMessageId messageId) { + public void setMessageId(String messageId) { if (messageId == null) { message.setMessageId(null); } else { - String value = messageId.getValue(); // Remove the first 'ID:' prefix if present - value = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(value); - message.setMessageId(value); + String stripped = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(messageId); + message.setMessageId(stripped); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java index c1200fb..5a14699 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java @@ -32,7 +32,6 @@ import java.util.UUID; import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.JmsTopic; -import org.apache.qpid.jms.meta.JmsMessageId; import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.AmqpConsumer; import org.apache.qpid.proton.Proton; @@ -251,7 +250,7 @@ public class AmqpJmsMessageFacadeTest { public void testGetMessageIdIsNullOnNewMessage() { AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); - assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId().getValue()); + assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId()); } /** @@ -263,11 +262,10 @@ public class AmqpJmsMessageFacadeTest { AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade(); - JmsMessageId jmsMessageId = new JmsMessageId(testMessageId); - amqpMessageFacade.setMessageId(jmsMessageId); + amqpMessageFacade.setMessageId(testMessageId); - assertEquals("Expected messageId object not returned", jmsMessageId, amqpMessageFacade.getMessageId()); - assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId().getValue()); + assertEquals("Expected messageId not returned", testMessageId, amqpMessageFacade.getMessageId()); + assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId()); } /** @@ -326,7 +324,7 @@ public class AmqpJmsMessageFacadeTest { String expectedString = appendIdAndTypePrefix(testMessageId); - assertEquals("Incorrect messageId value received", new JmsMessageId(expectedString), amqpMessageFacade.getMessageId()); + assertEquals("Incorrect messageId value received", expectedString, amqpMessageFacade.getMessageId()); } private String appendIdAndTypePrefix(Object testMessageId) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
