Hi Hadrian This one must be a typo + public ExceptionType handled(boolean cond) { + ConstantLanguage constant = new ConstantLanguage(); + return handled(constant.createPredicate("true")); + }
The Boolean cond parameter is not used! Med venlig hilsen Claus Ibsen ...................................... Silverbullet Skovsgårdsvænget 21 8362 Hørning Tlf. +45 2962 7576 Web: www.silverbullet.dk -----Original Message----- From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] Sent: 11. oktober 2008 03:50 To: [EMAIL PROTECTED] Subject: svn commit: r703622 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/builder/ Author: hadrian Date: Fri Oct 10 18:49:53 2008 New Revision: 703622 URL: http://svn.apache.org/viewvc?rev=703622&view=rev Log: CAMEL-960 Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=703622&r1=703621&r2=703622&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Fri Oct 10 18:49:53 2008 @@ -34,6 +34,7 @@ String AGGREGATED_COUNT = "org.apache.camel.Exchange.AggregatedCount"; + String EXCEPTION_HANDLED_PROPERTY = "CamelExceptionHandled"; /** * Returns the [EMAIL PROTECTED] ExchangePattern} (MEP) of this exchange. * Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java?rev=703622&r1=703621&r2=703622&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ExceptionType.java Fri Oct 10 18:49:53 2008 @@ -27,9 +27,11 @@ import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; +import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.builder.ErrorHandlerBuilder; +import org.apache.camel.language.constant.ConstantLanguage; import org.apache.camel.processor.CatchProcessor; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.spi.RouteContext; @@ -46,7 +48,7 @@ @XmlElement(name = "exception") private List<String> exceptions = new ArrayList<String>(); - @XmlElement(name = "redeliveryPolicy", required = false) + @XmlElement(name = "redeliveryPolicy", required = false) private RedeliveryPolicyType redeliveryPolicy; @XmlElementRef private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>(); @@ -54,6 +56,8 @@ private List<Class> exceptionClasses; @XmlTransient private Processor errorHandler; + @XmlTransient + private Predicate handledPolicy; public ExceptionType() { } @@ -115,6 +119,16 @@ // Fluent API //------------------------------------------------------------------------- + public ExceptionType handled(boolean cond) { + ConstantLanguage constant = new ConstantLanguage(); + return handled(constant.createPredicate("true")); + } + + public ExceptionType handled(Predicate cond) { + setHandledPolicy(cond); + return this; + } + public ExceptionType backOffMultiplier(double backOffMultiplier) { getOrCreateRedeliveryPolicy().backOffMultiplier(backOffMultiplier); return this; @@ -196,6 +210,14 @@ this.redeliveryPolicy = redeliveryPolicy; } + public Predicate getHandledPolicy() { + return handledPolicy; + } + + public void setHandledPolicy(Predicate handledPolicy) { + this.handledPolicy = handledPolicy; + } + // Implementation methods //------------------------------------------------------------------------- protected RedeliveryPolicyType getOrCreateRedeliveryPolicy() { Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=703622&r1=703621&r2=703622&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri Oct 10 18:49:53 2008 @@ -16,19 +16,14 @@ */ package org.apache.camel.processor; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; import java.util.concurrent.RejectedExecutionException; -import javax.xml.transform.Source; - import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.Predicate; import org.apache.camel.Processor; -import org.apache.camel.converter.stream.StreamCache; import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; import org.apache.camel.model.ExceptionType; import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; @@ -50,6 +45,15 @@ public static final String REDELIVERED = "org.apache.camel.Redelivered"; public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException"; + + private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); + private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; + private Processor output; + private Processor deadLetter; + private AsyncProcessor outputAsync; + private RedeliveryPolicy redeliveryPolicy; + private Logger logger; + private class RedeliveryData { int redeliveryCounter; long redeliveryDelay; @@ -58,16 +62,9 @@ // default behaviour which can be overloaded on a per exception basis RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; Processor failureProcessor = deadLetter; + Predicate handledPredicate = null; } - private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); - private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; - private Processor output; - private Processor deadLetter; - private AsyncProcessor outputAsync; - private RedeliveryPolicy redeliveryPolicy; - private Logger logger; - public DeadLetterChannel(Processor output, Processor deadLetter) { this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(), ErrorHandlerSupport.createDefaultExceptionPolicyStrategy()); @@ -130,6 +127,7 @@ ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e); if (exceptionPolicy != null) { data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy); + data.handledPredicate = exceptionPolicy.getHandledPolicy(); Processor processor = exceptionPolicy.getErrorHandler(); if (processor != null) { data.failureProcessor = processor; @@ -140,7 +138,7 @@ // should we redeliver or not? if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) { // we did not success with the redelivery so now we let the failure processor handle it - setFailureHandled(exchange, true); + setFailureHandled(exchange); // must decrement the redelivery counter as we didn't process the redelivery but is // handling by the failure handler. So we must -1 to not let the counter be out-of-sync decrementRedeliveryCounter(exchange); @@ -148,12 +146,13 @@ AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor); boolean sync = afp.process(exchange, new AsyncCallback() { public void done(boolean sync) { - restoreExceptionOnExchange(exchange); + restoreExceptionOnExchange(exchange, data.handledPredicate); callback.done(data.sync); } }); - restoreExceptionOnExchange(exchange); + // The line below shouldn't be needed, it is invoked by the AsyncCallback above + //restoreExceptionOnExchange(exchange, data.handledPredicate); logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor); return sync; } @@ -169,7 +168,6 @@ data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); } - // process the exchange boolean sync = outputAsync.process(exchange, new AsyncCallback() { public void done(boolean sync) { @@ -203,19 +201,18 @@ return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null; } - public static void setFailureHandled(Exchange exchange, boolean isHandled) { - if (isHandled) { - exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); - exchange.setException(null); - } else { - exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); - exchange.removeProperty(FAILURE_HANDLED_PROPERTY); - } - + public static void setFailureHandled(Exchange exchange) { + exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); + exchange.setException(null); } - public static void restoreExceptionOnExchange(Exchange exchange) { - exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); + protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) { + if (handledPredicate == null || !handledPredicate.matches(exchange)) { + // exception not handled, put exception back in the exchange + exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); + } else { + exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE); + } } public void process(Exchange exchange) throws Exception { @@ -300,7 +297,6 @@ } } - @Override protected void doStart() throws Exception { ServiceHelper.startServices(output, deadLetter); @@ -310,5 +306,4 @@ protected void doStop() throws Exception { ServiceHelper.stopServices(deadLetter, output); } - } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=703622&r1=703621&r2=703622&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Fri Oct 10 18:49:53 2008 @@ -62,11 +62,16 @@ Exchange nextExchange = original; boolean first = true; while (true) { - if (nextExchange.isFailed()) { + boolean handledException = Boolean.TRUE.equals( + nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY)); + if (nextExchange.isFailed() || handledException) { + // The Exchange.EXCEPTION_HANDLED_PROPERTY property is only set if satisfactory handling was done + // by the error handler. It's still an exception, the exchange still failed. if (LOG.isDebugEnabled()) { LOG.debug("Message exchange has failed so breaking out of pipeline: " + nextExchange + " exception: " + nextExchange.getException() + " fault: " - + nextExchange.getFault(false)); + + nextExchange.getFault(false) + + (handledException ? " handled by the error handler" : "")); } break; } Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java?rev=703622&r1=703621&r2=703622&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderTest.java Fri Oct 10 18:49:53 2008 @@ -35,11 +35,14 @@ public class ExceptionBuilderTest extends ContextTestSupport { private static final String MESSAGE_INFO = "messageInfo"; + private static final String RESULT_QUEUE = "mock:result"; private static final String ERROR_QUEUE = "mock:error"; private static final String BUSINESS_ERROR_QUEUE = "mock:badBusiness"; private static final String SECURITY_ERROR_QUEUE = "mock:securityError"; public void testNPE() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); mock.expectedMessageCount(1); mock.expectedHeaderReceived(MESSAGE_INFO, "Damm a NPE"); @@ -52,10 +55,12 @@ // expected } - mock.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(result, mock); } public void testIOException() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); mock.expectedMessageCount(1); mock.expectedHeaderReceived(MESSAGE_INFO, "Damm somekind of IO exception"); @@ -68,10 +73,12 @@ // expected } - mock.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(result, mock); } public void testException() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); mock.expectedMessageCount(1); mock.expectedHeaderReceived(MESSAGE_INFO, "Damm just exception"); @@ -84,10 +91,12 @@ // expected } - mock.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(result, mock); } public void testMyBusinessException() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); MockEndpoint mock = getMockEndpoint(BUSINESS_ERROR_QUEUE); mock.expectedMessageCount(1); mock.expectedHeaderReceived(MESSAGE_INFO, "Damm my business is not going to well"); @@ -100,11 +109,13 @@ // expected } - mock.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(result, mock); } public void testSecurityConfiguredWithTwoExceptions() throws Exception { // test that we also handles a configuration with 2 or more exceptions + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); MockEndpoint mock = getMockEndpoint(SECURITY_ERROR_QUEUE); mock.expectedMessageCount(1); mock.expectedHeaderReceived(MESSAGE_INFO, "Damm some security error"); @@ -117,11 +128,13 @@ // expected } - mock.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(result, mock); } public void testSecurityConfiguredWithExceptionList() throws Exception { // test that we also handles a configuration with a list of exceptions + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); mock.expectedMessageCount(1); mock.expectedHeaderReceived(MESSAGE_INFO, "Damm some access error"); @@ -134,7 +147,7 @@ // expected } - mock.assertIsSatisfied(); + MockEndpoint.assertIsSatisfied(result, mock); } public static class MyBaseBusinessException extends Exception { Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java?rev=703622&view=auto ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java (added) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ExceptionBuilderWithHandledExceptionTest.java Fri Oct 10 18:49:53 2008 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import java.io.IOException; +import java.net.ConnectException; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test to test exception configuration + */ +public class ExceptionBuilderWithHandledExceptionTest extends ContextTestSupport { + + private static final String MESSAGE_INFO = "messageInfo"; + private static final String RESULT_QUEUE = "mock:result"; + private static final String ERROR_QUEUE = "mock:error"; + + public void testHandledException() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); + MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with NullPointerException"); + + template.sendBody("direct:a", "Hello NPE"); + MockEndpoint.assertIsSatisfied(result, mock); + } + + public void testHandledExceptionWithExpression() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); + MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with IOException"); + + template.sendBodyAndHeader("direct:a", "Hello IOE", "foo", "bar"); + MockEndpoint.assertIsSatisfied(result, mock); + } + + public void testUnhandledException() throws Exception { + MockEndpoint result = getMockEndpoint(RESULT_QUEUE); + result.expectedMessageCount(0); + MockEndpoint mock = getMockEndpoint(ERROR_QUEUE); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived(MESSAGE_INFO, "Handled exchange with IOException"); + + try { + template.sendBodyAndHeader("direct:a", "Hello IOE", "foo", "something that does not match"); + fail("Should have thrown a IOException"); + } catch (RuntimeCamelException e) { + assertTrue(e.getCause() instanceof IOException); + // expected, failure is not handled because predicate doesn't match + } + + MockEndpoint.assertIsSatisfied(result, mock); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // START SNIPPET: exceptionBuilder1 + onException(NullPointerException.class) + .maximumRedeliveries(0) + .handled(true) + .setHeader(MESSAGE_INFO, constant("Handled exchange with NullPointerException")) + .to(ERROR_QUEUE); + + onException(IOException.class) + .maximumRedeliveries(0) + .handled(header("foo").isEqualTo("bar")) + .setHeader(MESSAGE_INFO, constant("Handled exchange with IOException")) + .to(ERROR_QUEUE); + // END SNIPPET: exceptionBuilder1 + + from("direct:a").process(new Processor() { + public void process(Exchange exchange) throws Exception { + String s = exchange.getIn().getBody(String.class); + if ("Hello NPE".equals(s)) { + throw new NullPointerException(); + } else if ("Hello IOE".equals(s)) { + // specialized IOException + throw new ConnectException("Forced for testing - can not connect to remote server"); + } + exchange.getOut().setBody("Hello World"); + } + }).to("mock:result"); + } + }; + } +}