Author: joed Date: Wed Oct 19 00:20:09 2011 New Revision: 1185928 URL: http://svn.apache.org/viewvc?rev=1185928&view=rev Log: Dynamic delays via header on a redelivery. Full test-suite done in core.
https://issues.apache.org/jira/browse/CAMEL-4558 Thanks Rich! Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1185928&r1=1185927&r2=1185928&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Oct 19 00:20:09 2011 @@ -16,12 +16,12 @@ */ package org.apache.camel; -import java.util.List; -import java.util.Map; - import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; +import java.util.List; +import java.util.Map; + /** * An Exchange is the message container holding the information during the entire routing of * a {@link Message} received by a {@link Consumer}. @@ -159,6 +159,7 @@ public interface Exchange { String REDELIVERY_COUNTER = "CamelRedeliveryCounter"; String REDELIVERY_MAX_COUNTER = "CamelRedeliveryMaxCounter"; String REDELIVERY_EXHAUSTED = "CamelRedeliveryExhausted"; + String REDELIVERY_DELAY = "CamelRedeliveryDelay"; String ROLLBACK_ONLY = "CamelRollbackOnly"; String ROLLBACK_ONLY_LAST = "CamelRollbackOnlyLast"; String ROUTE_STOP = "CamelRouteStop"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1185928&r1=1185927&r2=1185928&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Wed Oct 19 00:20:09 2011 @@ -16,32 +16,18 @@ */ package org.apache.camel.processor; -import java.util.concurrent.Callable; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.camel.AsyncCallback; -import org.apache.camel.AsyncProcessor; -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.LoggingLevel; -import org.apache.camel.Message; -import org.apache.camel.Predicate; -import org.apache.camel.Processor; +import org.apache.camel.*; import org.apache.camel.model.OnExceptionDefinition; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.SubUnitOfWorkCallback; import org.apache.camel.spi.ThreadPoolProfile; -import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; -import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.*; import org.apache.camel.util.CamelLogger; -import org.apache.camel.util.EventHelper; -import org.apache.camel.util.ExchangeHelper; -import org.apache.camel.util.MessageHelper; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ServiceHelper; + +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Base redeliverable error handler that also supports a final dead letter queue in case @@ -268,7 +254,7 @@ public abstract class RedeliveryErrorHan if (data.redeliveryCounter > 0) { // calculate delay - data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter); + data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); if (data.redeliveryDelay > 0) { // okay there is a delay so create a scheduled task to have it executed in the future @@ -358,6 +344,32 @@ public abstract class RedeliveryErrorHan } /** + * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY} + * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p> + * + * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay} + * and {@link RedeliveryData#redeliveryCounter} are copied in.</p> + * + * @param exchange The current exchange in question. + * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation. + * @param redeliveryDelay The default redelivery delay from RedeliveryData + * @param redeliveryCounter The redeliveryCounter + * @return The time to wait before the next redelivery. + */ + protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter){ + Message message = exchange.getIn(); + Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class); + if (delay == null) { + delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter); + }else{ + if (log.isDebugEnabled()) { + log.debug("Redelivery delay is {} from Message.getHeader(Exchange.REDELIVERY_DELAY)", new Object[]{delay}); + } + } + return delay; + } + + /** * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. * <p/> * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java?rev=1185928&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryWithExceptionAndFaultDelayInHeader.java Wed Oct 19 00:20:09 2011 @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +public class RedeliveryWithExceptionAndFaultDelayInHeader extends ContextTestSupport { + + private static int counter; + + public void testOk() throws Exception { + counter = 0; + + getMockEndpoint("mock:result").expectedMessageCount(1); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("Bye World", out); + + assertMockEndpointsSatisfied(); + } + + public void testTransientAndPersistentError() throws Exception { + counter = 0; + + getMockEndpoint("mock:result").expectedMessageCount(0); + + String out = template.requestBody("direct:start", "Boom", String.class); + assertEquals("Persistent error", out); + + assertMockEndpointsSatisfied(); + } + + public void testTransientAndPersistentErrorWithExchange() throws Exception { + counter = 0; + + getMockEndpoint("mock:result").expectedMessageCount(0); + + Exchange out = template.request("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Boom"); + } + }); + assertTrue("Should be failed", out.isFailed()); + assertNull("No exception", out.getException()); + assertTrue(out.getOut() != null && out.getOut().isFault()); + assertEquals("Persistent error", out.getOut().getBody()); + + assertMockEndpointsSatisfied(); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(defaultErrorHandler().maximumRedeliveries(5)); + + from("direct:start") + + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader(Exchange.REDELIVERY_DELAY, 100); + counter++; + if (counter < 3) { + throw new IllegalArgumentException("Try again"); + } + + if (exchange.getIn().getBody().equals("Boom")) { + exchange.getOut().setFault(true); + exchange.getOut().setBody("Persistent error"); + } else { + exchange.getOut().setBody("Bye World"); + } + } + }).to("mock:result"); + } + }; + } +}
