Rauls first commit! Sent from a mobile device Am 20.09.2012 01:03 schrieb <ra...@apache.org>:
> Author: raulk > Date: Wed Sep 19 23:02:40 2012 > New Revision: 1387808 > > URL: http://svn.apache.org/viewvc?rev=1387808&view=rev > Log: > CAMEL-5390: Option to assign unique correlation ID to JMS messages. Fixed. > > Added: > > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutParallelTest.java > > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutRepeatedInvocationsTest.java > Modified: > > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java > > Modified: > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1387808&r1=1387807&r2=1387808&view=diff > > ============================================================================== > --- > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java > (original) > +++ > camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java > Wed Sep 19 23:02:40 2012 > @@ -52,6 +52,7 @@ import static org.apache.camel.component > */ > public class JmsProducer extends DefaultAsyncProducer { > private static final transient Logger LOG = > LoggerFactory.getLogger(JmsProducer.class); > + private static final String GENERATED_CORRELATION_ID_PREFIX = > "Camel-"; > private final JmsEndpoint endpoint; > private final AtomicBoolean started = new AtomicBoolean(false); > private JmsOperations inOnlyTemplate; > @@ -175,10 +176,13 @@ public class JmsProducer extends Default > final ValueHolder<MessageSentCallback> sentCallback = new > ValueHolder<MessageSentCallback>(messageSentCallback); > > final String originalCorrelationId = > in.getHeader("JMSCorrelationID", String.class); > - if (ObjectHelper.isEmpty(originalCorrelationId) && > !msgIdAsCorrId) { > - in.setHeader("JMSCorrelationID", > getUuidGenerator().generateUuid()); > + boolean generateFreshCorrId = > (ObjectHelper.isEmpty(originalCorrelationId) && !msgIdAsCorrId) > + || (originalCorrelationId != null && > originalCorrelationId.startsWith(GENERATED_CORRELATION_ID_PREFIX)); > + if (generateFreshCorrId) { > + // we append the 'Camel-' prefix to know it was generated by > us > + in.setHeader("JMSCorrelationID", > GENERATED_CORRELATION_ID_PREFIX + getUuidGenerator().generateUuid()); > } > - > + > MessageCreator messageCreator = new MessageCreator() { > public Message createMessage(Session session) throws > JMSException { > Message answer = > endpoint.getBinding().makeJmsMessage(exchange, in, session, null); > > Added: > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutParallelTest.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutParallelTest.java?rev=1387808&view=auto > > ============================================================================== > --- > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutParallelTest.java > (added) > +++ > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutParallelTest.java > Wed Sep 19 23:02:40 2012 > @@ -0,0 +1,78 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.camel.component.jms.issues; > + > +import javax.jms.ConnectionFactory; > + > +import org.apache.camel.CamelContext; > +import org.apache.camel.builder.RouteBuilder; > +import org.apache.camel.component.jms.CamelJmsTestHelper; > +import org.apache.camel.component.mock.MockEndpoint; > +import org.apache.camel.test.junit4.CamelTestSupport; > +import org.junit.Test; > + > +import static > org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; > + > +/** > + * @version > + */ > +public class JmsInOutParallelTest extends CamelTestSupport { > + > + @Test > + public void testInOutParallel() throws Exception { > + MockEndpoint mock = getMockEndpoint("mock:received"); > + mock.setAssertPeriod(2000); > + mock.expectedMessageCount(5); > + String outPayload = template.requestBody("direct:test", "test", > String.class); > + assertEquals("Fully done", outPayload); > + mock.assertIsSatisfied(); > + } > + > + protected CamelContext createCamelContext() throws Exception { > + CamelContext camelContext = super.createCamelContext(); > + ConnectionFactory connectionFactory = > CamelJmsTestHelper.createConnectionFactory(); > + camelContext.addComponent("activemq", > jmsComponentAutoAcknowledge(connectionFactory)); > + return camelContext; > + } > + > + protected RouteBuilder createRouteBuilder() throws Exception { > + return new RouteBuilder() { > + public void configure() throws Exception { > + > + from("direct:test") > + .setBody(constant("1,2,3,4,5")) > + .inOut("activemq:queue:test1?requestTimeout=2000") > + .split().tokenize(",").parallelProcessing() > + .inOut("activemq:queue:test2?requestTimeout=2000") > + .to("mock:received") > + .end() > + .setBody(constant("Fully done")) > + .log("Finished"); > + > + from("activemq:queue:test1") > + .log("Received on queue test1"); > + > + from("activemq:queue:test2") > + .log("Received on queue test2") > + .setBody(constant("Some reply")) > + .delay(constant(100)); > + > + } > + }; > + } > + > +} > > Added: > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutRepeatedInvocationsTest.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutRepeatedInvocationsTest.java?rev=1387808&view=auto > > ============================================================================== > --- > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutRepeatedInvocationsTest.java > (added) > +++ > camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutRepeatedInvocationsTest.java > Wed Sep 19 23:02:40 2012 > @@ -0,0 +1,70 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.camel.component.jms.issues; > + > +import javax.jms.ConnectionFactory; > + > +import org.apache.camel.CamelContext; > +import org.apache.camel.builder.RouteBuilder; > +import org.apache.camel.component.jms.CamelJmsTestHelper; > +import org.apache.camel.component.mock.MockEndpoint; > +import org.apache.camel.test.junit4.CamelTestSupport; > +import org.junit.Test; > + > +import static > org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; > + > +/** > + * @version > + */ > +public class JmsInOutRepeatedInvocationsTest extends CamelTestSupport { > + > + @Test > + public void testInOutRepeatSequentialInvocations() throws Exception { > + MockEndpoint mock = getMockEndpoint("mock:finished"); > + mock.setAssertPeriod(2000); > + mock.expectedMessageCount(1); > + String outPayload = template.requestBody("direct:test", "test", > String.class); > + assertEquals("Some reply", outPayload); > + mock.assertIsSatisfied(); > + } > + > + protected CamelContext createCamelContext() throws Exception { > + CamelContext camelContext = super.createCamelContext(); > + ConnectionFactory connectionFactory = > CamelJmsTestHelper.createConnectionFactory(); > + camelContext.addComponent("activemq", > jmsComponentAutoAcknowledge(connectionFactory)); > + return camelContext; > + } > + > + protected RouteBuilder createRouteBuilder() throws Exception { > + return new RouteBuilder() { > + public void configure() throws Exception { > + > + from("direct:test") > + .inOut("activemq:queue:test1?requestTimeout=200") > + .inOut("activemq:queue:test1?requestTimeout=200") > + .inOut("activemq:queue:test1?requestTimeout=200") > + .to("mock:finished"); > + > + from("activemq:queue:test1") > + .log("Received on queue test1") > + .setBody().constant("Some reply"); > + > + } > + }; > + } > + > +} > > >