Hi guys, Could you please explain how asyncDelayedRedelivery of redelivery policies really works, because I have not to block the route when some exchange fails and it should be redelivered. So I’m a little bit confused of how to use async. redelivery properly with different types of endpoints.
I have a route that that have timer endpoint as its source. Events are fired every second. I also configured executor service with 10 threads for my error handler. The 1st and the 4th sent messages are marked to be always redelivered. So I expected the 6th sent message (4th arrived) to be arrived no later than 2 seconds (time fires every second) after the 3rd sent message (2nd arrived), because the redelivery is asynchronous. However my test fails because the redelivery of the 1st message happens synchronously and the 6th sent message arrives about 6 seconds after the 3rd sent message. I also tried to use seda endpoint. It seems to work fine until I set waitForTaskToComplete parameter to Always. In that case the redelivery is synchronous too. With quartz endpoints everything depends on the size of the quartz thread pool, so if its size is more than 1 and redelivery is synchronous quartz continues to fire events until thread pool is exhausted. With async. redelivery the behavior is the same except that the thread that is used for redelivery differs from the calling thread. From all described above I understood that calling thread always waits for the exchange to complete even if the redelivery is should happen asynchronously. // BELOW IS MY UNIT TEST package org.foo.bar; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.naming.Context; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.quartz.QuartzComponent; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; public class AsyncDelayedRedeliveryTest extends CamelTestSupport { private static final int MAXIMUM_REDELIVERIES = 1; @Test public void asyncRedeliveryTimer() throws Exception { context().addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { errorHandler(defaultErrorHandler().executorServiceRef("executorService")); onException(Exception.class) .redeliveryPolicyRef("redeliveryPolicy") .handled(true) .onRedelivery(new Processor() { @Override public void process(Exchange exchange) throws Exception { System.out.println("Redelivered : " + exchange.getIn().getBody() + " : " +Thread.currentThread().getName()); } }) .to("mock:exception"); from("timer:start?repeatCount=10&period=1000") // from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&trigger.misfireInstruction=2") // .to("seda:next?waitForTaskToComplete=Never&timeout=100"); // from("seda:next?concurrentConsumers=2&size=2") .process(new Processor() { private AtomicInteger counter = new AtomicInteger(); @Override public void process(Exchange exchange) throws Exception { if (counter.compareAndSet(0, 1) || counter.compareAndSet(3, 4)) { exchange.setProperty("ThrowException", Boolean.TRUE); exchange.getIn().setBody(counter.get() - 1); } else { exchange.getIn().setBody(counter.getAndIncrement()); } } }) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { if(Boolean.TRUE.equals(exchange.getProperty("ThrowException", Boolean.class))) { throw new RuntimeException("Test Exception!"); } } }) .to("mock:result"); } }); MockEndpoint result = getMockEndpoint("mock:result"); result.whenAnyExchangeReceived(new Processor() { @Override public void process(Exchange exchange) throws Exception { System.out.println("Message : " + exchange.getIn().getBody() + " : " +Thread.currentThread().getName()); } }); result.expectedMessageCount(8); result.allMessages().property("ThrowException").isNull(); // the 1th and 4th sent messages are always redelivered, // so the 4th delivered message is the 6th sent message result.message(3).body().isEqualTo(5); // as we're trying to use async redelivery we're expecting that // the 6th sent message (4th arrived) must arrive no later than 3rd send message (2nd arrived) result.message(3).arrives().noLaterThan(2).seconds().afterPrevious(); // the same as the prev. assertion result.message(2).arrives().noLaterThan(2).seconds().beforeNext(); MockEndpoint exception = getMockEndpoint("mock:exception"); exception.expectedMessageCount(2); exception.allMessages().property("ThrowException").isNotNull(); startCamelContext(); assertMockEndpointsSatisfied(20, TimeUnit.SECONDS); } @Override protected Context createJndiContext() throws Exception { Context jndiContext = super.createJndiContext(); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setAsyncDelayedRedelivery(false); redeliveryPolicy.setLogRetryAttempted(true); redeliveryPolicy.setLogExhausted(false); redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES); redeliveryPolicy.setRedeliveryDelay(5000); jndiContext.bind("redeliveryPolicy", redeliveryPolicy); Properties props = new Properties(); props.setProperty("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler"); props.setProperty("org.quartz.scheduler.rmi.export", "false"); props.setProperty("org.quartz.scheduler.rmi.proxy", "false"); props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransaction", "false"); props.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); props.setProperty("org.quartz.threadPool.threadCount", "5"); props.setProperty("org.quartz.threadPool.threadPriority", "5"); props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true"); props.setProperty("org.quartz.jobStore.misfireThreshold", "1"); props.setProperty("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore"); props.setProperty("org.quartz.scheduler.skipUpdateCheck", "true"); QuartzComponent quartz = new QuartzComponent(); quartz.setProperties(props); jndiContext.bind("quartz", quartz); ExecutorService executors = Executors.newScheduledThreadPool(10); jndiContext.bind("executorService", executors); return jndiContext; } @Override public boolean isUseRouteBuilder() { return false; } } Best Regards, Sergey _______________________________________________________ The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia. If you need assistance please contact our Contact Center (+7495) 258 0500 or go to www.troika.ru/eng/Contacts/system.wbp