Author: davsclaus Date: Mon Dec 6 14:19:33 2010 New Revision: 1042656 URL: http://svn.apache.org/viewvc?rev=1042656&view=rev Log: CAMEL-3394: Fixed splitter not using error handling if exception was thrown during evaluation of next splitted piece to use.
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterThrowExceptionInExpressionTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1042656&r1=1042655&r2=1042656&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Dec 6 14:19:33 2010 @@ -174,10 +174,16 @@ public class MulticastProcessor extends // multicast uses fine grained error handling on the output processors // so use try .. catch to cater for this + boolean exhaust = false; try { boolean sync = true; pairs = createProcessorExchangePairs(exchange); + + // after we have created the processors we consider the exchange as exhausted if an unhandled + // exception was thrown, (used in the catch block) + exhaust = true; + if (isParallelProcessing()) { // ensure an executor is set when running in parallel ObjectHelper.notNull(executorService, "executorService", this); @@ -194,14 +200,14 @@ public class MulticastProcessor extends } catch (Throwable e) { exchange.setException(e); // and do the done work - doDone(exchange, null, callback, true); + doDone(exchange, null, callback, true, exhaust); return true; } // multicasting was processed successfully // and do the done work Exchange subExchange = result.get() != null ? result.get() : null; - doDone(exchange, subExchange, callback, true); + doDone(exchange, subExchange, callback, true, exhaust); return true; } @@ -455,7 +461,7 @@ public class MulticastProcessor extends result.set(subExchange); } // and do the done work - doDone(original, subExchange, callback, false); + doDone(original, subExchange, callback, false, true); return; } @@ -465,7 +471,7 @@ public class MulticastProcessor extends // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); // and do the done work - doDone(original, subExchange, callback, false); + doDone(original, subExchange, callback, false, true); return; } @@ -501,7 +507,7 @@ public class MulticastProcessor extends result.set(subExchange); } // and do the done work - doDone(original, subExchange, callback, false); + doDone(original, subExchange, callback, false, true); return; } @@ -511,7 +517,7 @@ public class MulticastProcessor extends // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); // and do the done work - doDone(original, subExchange, callback, false); + doDone(original, subExchange, callback, false, true); return; } @@ -520,7 +526,7 @@ public class MulticastProcessor extends // do the done work subExchange = result.get() != null ? result.get() : null; - doDone(original, subExchange, callback, false); + doDone(original, subExchange, callback, false, true); } }); } finally { @@ -589,15 +595,16 @@ public class MulticastProcessor extends * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part * @param callback the callback * @param doneSync the <tt>doneSync</tt> parameter to call on callback + * @param exhaust whether or not error handling is exhausted */ - protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync) { + protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync, boolean exhaust) { // cleanup any per exchange aggregation strategy removeAggregationStrategyFromExchange(original); if (original.getException() != null) { // multicast uses error handling on its output processors and they have tried to redeliver // so we shall signal back to the other error handlers that we are exhausted and they should not // also try to redeliver as we will then do that twice - original.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); + original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); } if (subExchange != null) { // and copy the current result to original so it will contain this exception Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterThrowExceptionInExpressionTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterThrowExceptionInExpressionTest.java?rev=1042656&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterThrowExceptionInExpressionTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/SplitterThrowExceptionInExpressionTest.java Mon Dec 6 14:19:33 2010 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.ExpressionEvaluationException; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class SplitterThrowExceptionInExpressionTest extends ContextTestSupport { + + public void testSplitterAndVerifyMock () throws Exception { + getMockEndpoint("mock:error").expectedMessageCount(0); + getMockEndpoint("mock:error2").expectedMessageCount(1); + getMockEndpoint("mock:split").expectedMessageCount(0); + getMockEndpoint("mock:result").expectedMessageCount(0); + + template.sendBody("direct:start", "A,B,C"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + onException(ExpressionEvaluationException.class) + .handled(true).to("mock://error"); + + from("direct://start") + .onException(ExpressionEvaluationException.class) + .handled(true).to("mock://error2") + .end() + .split(new MyExpression()) + .to("mock://split") + .end() + .to("log:result"); + } + }; + } + + private class MyExpression implements Expression { + + public <T> T evaluate(Exchange exchange, Class<T> type) { + // force an exception early, to test that the onException error handlers + // can kick in anyway + throw new ExpressionEvaluationException(null, exchange, null); + } + } +}