Author: davsclaus Date: Mon Dec 6 15:28:52 2010 New Revision: 1042676 URL: http://svn.apache.org/viewvc?rev=1042676&view=rev Log: CAMEL-3395: Fixed Splitter not setting correltion correlation id from splitted exchange to their parent exchange.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterCorrelationIdIssueTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1042676&r1=1042675&r2=1042676&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Mon Dec 6 15:28:52 2010 @@ -137,7 +137,7 @@ public class Splitter extends MulticastP public Object next() { Object part = iterator.next(); - Exchange newExchange = exchange.copy(); + Exchange newExchange = ExchangeHelper.createCopy(exchange, true); if (part instanceof Message) { newExchange.setIn((Message)part); } else { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=1042676&r1=1042675&r2=1042676&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Mon Dec 6 15:28:52 2010 @@ -168,6 +168,8 @@ public final class ExchangeHelper { * @param handover whether the on completion callbacks should be handed over to the new copy. */ public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) { + String id = exchange.getExchangeId(); + Exchange copy = exchange.copy(); // do not share the unit of work copy.setUnitOfWork(null); @@ -177,7 +179,23 @@ public final class ExchangeHelper { uow.handoverSynchronization(copy); } // set a correlation id so we can track back the original exchange - copy.setProperty(Exchange.CORRELATION_ID, exchange.getExchangeId()); + copy.setProperty(Exchange.CORRELATION_ID, id); + return copy; + } + + /** + * Creates a new instance and copies from the current message exchange so that it can be + * forwarded to another destination as a new instance. + * + * @param exchange original copy of the exchange + * @param preserveExchangeId whether or not the exchange id should be preserved + * @return the copy + */ + public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) { + Exchange copy = exchange.copy(); + if (preserveExchangeId) { + copy.setExchangeId(exchange.getExchangeId()); + } return copy; } Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterCorrelationIdIssueTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterCorrelationIdIssueTest.java?rev=1042676&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterCorrelationIdIssueTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterCorrelationIdIssueTest.java Mon Dec 6 15:28:52 2010 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version $Revision$ + */ +public class SplitterCorrelationIdIssueTest extends ContextTestSupport { + + public void testSplitCorrelationId() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:split"); + mock.expectedMessageCount(3); + + Exchange exchange = template.send("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("A,B,C"); + } + }); + + assertMockEndpointsSatisfied(); + + // match that all exchange id is unique + String parent = exchange.getExchangeId(); + String split1 = mock.getReceivedExchanges().get(0).getExchangeId(); + String split2 = mock.getReceivedExchanges().get(1).getExchangeId(); + String split3 = mock.getReceivedExchanges().get(2).getExchangeId(); + assertNotSame(parent, split1); + assertNotSame(parent, split2); + assertNotSame(parent, split3); + assertNotSame(split1, split2); + assertNotSame(split2, split3); + assertNotSame(split3, split1); + + // match correlation id from split -> parent + String corr1 = mock.getReceivedExchanges().get(0).getProperty(Exchange.CORRELATION_ID, String.class); + String corr2 = mock.getReceivedExchanges().get(1).getProperty(Exchange.CORRELATION_ID, String.class); + String corr3 = mock.getReceivedExchanges().get(2).getProperty(Exchange.CORRELATION_ID, String.class); + assertEquals(parent, corr1); + assertEquals(parent, corr2); + assertEquals(parent, corr3); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .split(body().tokenize(",")) + .to("mock:split"); + } + }; + } +}