Author: davsclaus Date: Wed Oct 31 16:34:03 2012 New Revision: 1404233 URL: http://svn.apache.org/viewvc?rev=1404233&view=rev Log: CAMEL-5763: Added option includeSentJMSMessageID to camel-jms to allow to enrich Camel Message with the actual JMSMessageID for the sent message. Can be used by end users for logging purpose etc.
Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java (with props) camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java (with props) camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java (with props) Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java Added: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java?rev=1404233&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java (added) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java Wed Oct 31 16:34:03 2012 @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.camel.Exchange; + +import static org.apache.camel.component.jms.JmsMessageHelper.getJMSMessageID; + +/** + * {@link MessageSentCallback} used to enrich the Camel {@link Exchange} with + * the actual <tt>JMSMessageID</tt> after sending to a JMS Destination using + * {@link org.apache.camel.ExchangePattern#InOnly} style. + */ +public class InOnlyMessageSentCallback implements MessageSentCallback { + + private final Exchange exchange; + + public InOnlyMessageSentCallback(Exchange exchange) { + this.exchange = exchange; + } + + @Override + public void sent(Session session, Message message, Destination destination) { + if (exchange != null) { + String id = getJMSMessageID(message); + if (id != null) { + if (exchange.hasOut()) { + exchange.getOut().setHeader("JMSMessageID", id); + } else { + exchange.getIn().setHeader("JMSMessageID", id); + } + } + } + } + +} Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/InOnlyMessageSentCallback.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java?rev=1404233&r1=1404232&r2=1404233&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java Wed Oct 31 16:34:03 2012 @@ -377,6 +377,10 @@ public class JmsComponent extends Defaul getConfiguration().setAllowNullBody(allowNullBody); } + public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { + getConfiguration().setIncludeSentJMSMessageID(includeSentJMSMessageID); + } + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1404233&r1=1404232&r2=1404233&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Wed Oct 31 16:34:03 2012 @@ -136,6 +136,7 @@ public class JmsConfiguration implements private String replyToCacheLevelName; private boolean allowNullBody = true; private MessageListenerContainerFactory messageListenerContainerFactory; + private boolean includeSentJMSMessageID; public JmsConfiguration() { } @@ -1295,4 +1296,21 @@ public class JmsConfiguration implements public void setMessageListenerContainerFactory(MessageListenerContainerFactory messageListenerContainerFactory) { this.messageListenerContainerFactory = messageListenerContainerFactory; } + + public boolean isIncludeSentJMSMessageID() { + return includeSentJMSMessageID; + } + + /** + * Whether to include the actual JMSMessageID set on the Message by the JMS vendor + * on the Camel Message as a header when sending InOnly messages. + * <p/> + * Can be enable to gather the actual JMSMessageID for InOnly messages, which allows to access + * the message id, which can be used for logging and tracing purposes. + * <p/> + * This option is default <tt>false</tt>. + */ + public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { + this.includeSentJMSMessageID = includeSentJMSMessageID; + } } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1404233&r1=1404232&r2=1404233&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Wed Oct 31 16:34:03 2012 @@ -1022,8 +1022,8 @@ public class JmsEndpoint extends Default } @ManagedAttribute - public void setAsyncStopListener(boolean asyncStoptListener) { - configuration.setAsyncStopListener(asyncStoptListener); + public void setAsyncStopListener(boolean asyncStopListener) { + configuration.setAsyncStopListener(asyncStopListener); } @ManagedAttribute @@ -1041,6 +1041,16 @@ public class JmsEndpoint extends Default configuration.setAllowNullBody(allowNullBody); } + @ManagedAttribute + public boolean isIncludeSentJMSMessageID() { + return configuration.isIncludeSentJMSMessageID(); + } + + @ManagedAttribute + public void setIncludeSentJMSMessageID(boolean includeSentJMSMessageID) { + configuration.setIncludeSentJMSMessageID(includeSentJMSMessageID); + } + public MessageListenerContainerFactory getMessageListenerContainerFactory() { return configuration.getMessageListenerContainerFactory(); } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java?rev=1404233&r1=1404232&r2=1404233&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java Wed Oct 31 16:34:03 2012 @@ -234,6 +234,22 @@ public final class JmsMessageHelper { } /** + * Gets the JMSMessageID from the message. + * + * @param message the message + * @return the JMSMessageID, or <tt>null</tt> if not able to get + */ + public static String getJMSMessageID(Message message) { + try { + return message.getJMSMessageID(); + } catch (Exception e) { + // ignore if JMS broker do not support this + } + + return null; + } + + /** * Sets the JMSDeliveryMode on the message. * * @param exchange the exchange Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1404233&r1=1404232&r2=1404233&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Wed Oct 31 16:34:03 2012 @@ -38,7 +38,6 @@ import org.apache.camel.impl.DefaultAsyn import org.apache.camel.spi.UuidGenerator; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; -import org.apache.camel.util.ValueHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsOperations; @@ -173,7 +172,6 @@ public class JmsProducer extends Default if (msgIdAsCorrId) { messageSentCallback = new UseMessageIdAsCorrelationIdMessageSentCallback(replyManager, provisionalCorrelationId, endpoint.getRequestTimeout()); } - final ValueHolder<MessageSentCallback> sentCallback = new ValueHolder<MessageSentCallback>(messageSentCallback); final String originalCorrelationId = in.getHeader("JMSCorrelationID", String.class); boolean generateFreshCorrId = (ObjectHelper.isEmpty(originalCorrelationId) && !msgIdAsCorrId) @@ -205,7 +203,7 @@ public class JmsProducer extends Default } }; - doSend(true, destinationName, destination, messageCreator, sentCallback.get()); + doSend(true, destinationName, destination, messageCreator, messageSentCallback); // after sending then set the OUT message id to the JMSMessageID so its identical setMessageId(exchange); @@ -264,6 +262,8 @@ public class JmsProducer extends Default destinationName = null; } final String to = destinationName != null ? destinationName : "" + destination; + MessageSentCallback messageSentCallback = getEndpoint().getConfiguration().isIncludeSentJMSMessageID() + ? new InOnlyMessageSentCallback(exchange) : null; MessageCreator messageCreator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { @@ -353,7 +353,7 @@ public class JmsProducer extends Default } }; - doSend(false, destinationName, destination, messageCreator, null); + doSend(false, destinationName, destination, messageCreator, messageSentCallback); // after sending then set the OUT message id to the JMSMessageID so its identical setMessageId(exchange); @@ -370,7 +370,7 @@ public class JmsProducer extends Default * @param destinationName the destination name * @param destination the destination (if no name provided) * @param messageCreator the creator to create the {@link Message} to send - * @param callback optional callback for inOut messages + * @param callback optional callback to invoke when message has been sent */ protected void doSend(boolean inOut, String destinationName, Destination destination, MessageCreator messageCreator, MessageSentCallback callback) { @@ -389,7 +389,7 @@ public class JmsProducer extends Default } } else { if (template != null) { - template.send(destination, messageCreator); + template.send(destination, messageCreator, callback); } } } else if (destinationName != null) { @@ -399,7 +399,7 @@ public class JmsProducer extends Default } } else { if (template != null) { - template.send(destinationName, messageCreator); + template.send(destinationName, messageCreator, callback); } } } else { Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java?rev=1404233&r1=1404232&r2=1404233&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java Wed Oct 31 16:34:03 2012 @@ -17,13 +17,12 @@ package org.apache.camel.component.jms.reply; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.apache.camel.component.jms.MessageSentCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static org.apache.camel.component.jms.JmsMessageHelper.getJMSMessageID; /** * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>. @@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory; */ public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback { - private static final Logger LOG = LoggerFactory.getLogger(MessageSelectorCreator.class); private ReplyManager replyManager; private String correlationId; private long requestTimeout; @@ -47,13 +45,7 @@ public class UseMessageIdAsCorrelationId } public void sent(Session session, Message message, Destination destination) { - String newCorrelationID = null; - try { - newCorrelationID = message.getJMSMessageID(); - } catch (JMSException e) { - // ignore - LOG.warn("Cannot get JMSMessageID from message: " + message + ". This exception will be ignored.", e); - } + String newCorrelationID = getJMSMessageID(message); if (newCorrelationID != null) { replyManager.updateCorrelationId(correlationId, newCorrelationID, requestTimeout); } Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java?rev=1404233&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java Wed Oct 31 16:34:03 2012 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class JmsInOnlyIncludeSentJMSMessageIDTest extends CamelTestSupport { + + @Test + public void testJmsInOnlyIncludeSentJMSMessageID() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:done"); + mock.expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + Exchange done = mock.getReceivedExchanges().get(0); + assertNotNull(done); + + Object body = done.getIn().getBody(); + assertEquals("Hello World", body); + + String id = done.getIn().getHeader("JMSMessageID", String.class); + assertNotNull("Should have enriched with JMSMessageID", id); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("activemq:queue:foo?includeSentJMSMessageID=true") + .to("mock:done"); + } + }; + } +} Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyIncludeSentJMSMessageIDTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java?rev=1404233&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java Wed Oct 31 16:34:03 2012 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * + */ +public class JmsNotInOnlyIncludeSentJMSMessageIDTest extends CamelTestSupport { + + @Test + public void testJmsNotInOnlyIncludeSentJMSMessageID() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:done"); + mock.expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + Exchange done = mock.getReceivedExchanges().get(0); + assertNotNull(done); + + Object body = done.getIn().getBody(); + assertEquals("Hello World", body); + + String id = done.getIn().getHeader("JMSMessageID", String.class); + assertNull("Should not have enriched with JMSMessageID", id); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + return camelContext; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("activemq:queue:foo") + .to("mock:done"); + } + }; + } +} Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsNotInOnlyIncludeSentJMSMessageIDTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date