Hi guys,

Could you please explain how asyncDelayedRedelivery of redelivery policies 
really works, because I have not to block the route when some exchange fails 
and it should be redelivered. So I’m a little bit confused of how to use async. 
redelivery properly with different types of endpoints.

I have a route that that have timer endpoint as its source. Events are fired 
every second.
I also configured executor service with 10 threads for my error handler.

The 1st  and the 4th sent messages are marked to be always redelivered.
So I expected the 6th sent message (4th arrived) to be arrived no later than 2 
seconds (time fires every second) after the 3rd sent message (2nd arrived), 
because the redelivery is asynchronous. However my test fails because the 
redelivery of the 1st message happens synchronously and the 6th sent message 
arrives about 6 seconds  after the 3rd sent message.

I also tried to use seda endpoint. It seems to work fine until I set 
waitForTaskToComplete parameter to Always. In that case the redelivery is 
synchronous too.

With quartz endpoints everything depends on the size of the quartz thread pool, 
so if its size is more than 1 and redelivery is synchronous quartz continues to 
fire events until thread pool is exhausted. With async. redelivery the behavior 
is the same except that the thread that is used  for redelivery differs from 
the calling thread.

From all described above I understood that calling thread always waits for the 
exchange to complete even if the redelivery is should happen asynchronously.


// BELOW IS MY UNIT TEST

package org.foo.bar;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.naming.Context;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.quartz.QuartzComponent;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class AsyncDelayedRedeliveryTest extends CamelTestSupport {

    private static final int MAXIMUM_REDELIVERIES = 1;

    @Test
    public void asyncRedeliveryTimer() throws Exception {
        context().addRoutes(new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                
errorHandler(defaultErrorHandler().executorServiceRef("executorService"));

                onException(Exception.class)
                    .redeliveryPolicyRef("redeliveryPolicy")
                    .handled(true)
                    .onRedelivery(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception 
{
                            System.out.println("Redelivered : " + 
exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
                        }
                    })
                    .to("mock:exception");

                from("timer:start?repeatCount=10&period=1000")
//                
from("quartz:start?trigger.repeatCount=9&trigger.repeatInterval=1000&trigger.misfireInstruction=2")
//                    .to("seda:next?waitForTaskToComplete=Never&timeout=100");

//                from("seda:next?concurrentConsumers=2&size=2")
                    .process(new Processor() {
                        private AtomicInteger counter = new AtomicInteger();

                        @Override
                        public void process(Exchange exchange) throws Exception 
{
                            if (counter.compareAndSet(0, 1) || 
counter.compareAndSet(3, 4)) {
                                exchange.setProperty("ThrowException", 
Boolean.TRUE);
                                exchange.getIn().setBody(counter.get() - 1);
                            } else {
                                
exchange.getIn().setBody(counter.getAndIncrement());
                            }
                        }
                    })
                    .process(new Processor() {
                        @Override
                        public void process(Exchange exchange) throws Exception 
{
                            
if(Boolean.TRUE.equals(exchange.getProperty("ThrowException", Boolean.class))) {
                                throw new RuntimeException("Test Exception!");
                            }
                        }
                    })
                    .to("mock:result");
            }
        });

        MockEndpoint result = getMockEndpoint("mock:result");
        result.whenAnyExchangeReceived(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                System.out.println("Message :     " + 
exchange.getIn().getBody() + " : " +Thread.currentThread().getName());
            }
        });
        result.expectedMessageCount(8);
        result.allMessages().property("ThrowException").isNull();

        // the 1th and 4th sent messages are always redelivered,
        // so the 4th delivered message is the 6th sent message
        result.message(3).body().isEqualTo(5);
        // as we're trying to use async redelivery we're expecting that
        // the 6th sent message (4th arrived) must arrive no later than 3rd 
send message (2nd arrived)
        result.message(3).arrives().noLaterThan(2).seconds().afterPrevious();
        // the same as the prev. assertion
        result.message(2).arrives().noLaterThan(2).seconds().beforeNext();

        MockEndpoint exception = getMockEndpoint("mock:exception");
        exception.expectedMessageCount(2);
        exception.allMessages().property("ThrowException").isNotNull();

        startCamelContext();

        assertMockEndpointsSatisfied(20, TimeUnit.SECONDS);
    }

    @Override
    protected Context createJndiContext() throws Exception {
        Context jndiContext = super.createJndiContext();

        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setAsyncDelayedRedelivery(false);
        redeliveryPolicy.setLogRetryAttempted(true);
        redeliveryPolicy.setLogExhausted(false);
        redeliveryPolicy.setMaximumRedeliveries(MAXIMUM_REDELIVERIES);
        redeliveryPolicy.setRedeliveryDelay(5000);
        jndiContext.bind("redeliveryPolicy", redeliveryPolicy);

        Properties props = new Properties();
        props.setProperty("org.quartz.scheduler.instanceName", 
"DefaultQuartzScheduler");
        props.setProperty("org.quartz.scheduler.rmi.export", "false");
        props.setProperty("org.quartz.scheduler.rmi.proxy", "false");
        
props.setProperty("org.quartz.scheduler.wrapJobExecutionInUserTransaction", 
"false");
        props.setProperty("org.quartz.threadPool.class", 
"org.quartz.simpl.SimpleThreadPool");
        props.setProperty("org.quartz.threadPool.threadCount", "5");
        props.setProperty("org.quartz.threadPool.threadPriority", "5");
        
props.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread",
 "true");
        props.setProperty("org.quartz.jobStore.misfireThreshold", "1");
        props.setProperty("org.quartz.jobStore.class", 
"org.quartz.simpl.RAMJobStore");
        props.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");

        QuartzComponent quartz = new QuartzComponent();
        quartz.setProperties(props);
        jndiContext.bind("quartz", quartz);

        ExecutorService executors = Executors.newScheduledThreadPool(10);
        jndiContext.bind("executorService", executors);

        return jndiContext;
    }

    @Override
    public boolean isUseRouteBuilder() {
        return false;
    }

}

Best Regards,
Sergey

_______________________________________________________

The information contained in this message may be privileged and conf idential 
and protected from disclosure. If you are not the original intended recipient, 
you are hereby notified that any review, retransmission, dissemination, or 
other use of, or taking of any action in reliance upon, this information is 
prohibited. If you have received this communication in error, please notify the 
sender immediately by replying to this message and delete it from your 
computer. Thank you for your cooperation. Troika Dialog, Russia. 
If you need assistance please contact our Contact Center  (+7495) 258 0500 or 
go to www.troika.ru/eng/Contacts/system.wbp  

Reply via email to