Hello Claus! As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot use it in Camel in the way I would like it. To only have one redelivery policy in place is not sufficient for us (if you configure the redelivery policy in your ActiveMQConnectionFactory). We have multiple applications sharing the same broker with different requirements (online, batch, ...).
I know it's also possible to configure it on the ActiveMQConnection [1], but as I know we don't have this possibility in Camel right now. Correct me if I'm wrong. May be this is a better solution for my needs, when we have to possibilities to configure the redelivery policy as an option in the ActiveMqComponent: from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")... or from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")... But for this, I think I have to open a JIRA for ActiveMQ, right? [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html Best, Christian On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <claus.ib...@gmail.com> wrote: > Frankly I thing the redelivery logic should be part of Apache ActiveMQ. > > Your solution is brittle in the fact that you consume the message, and > then send it back to the broker. What happens if you cannot send the > message back to the broker? > It would be better if the broker handled all this out of the box. > > ActiveMQ already has redelivery policy where you can configure various > delays options such as exponential backoff etc. > > And there is a JIRA ticket for ActiveMQ to support asynchronous > scheduled redeliveries. > https://issues.apache.org/jira/browse/AMQ-1853 > Which has the potential for consumers to pickup the next message, and > not block waiting to issue the redelivery. > > > > On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller > <christian.muel...@gmail.com> wrote: > > Hello! > > > > I didn't get so much responses as I hoped. May it was not clear what I > try > > to do or what I miss at present in Camel. > > Therefore a build a small unit test (and attached it) to demonstrate what > > I'm looking for (for convenience I also paste the code into this mail at > the > > end). > > > > I build my own (simple) redelivery processor which I use to re-enqueue > > messages into ActiveMQ after the default Camel error handler was kicked > in > > and catched the exception. I use the same logic as Camel does (but much > more > > simpler for this test) to trace the redelivery delay and count. But > instead > > to wait for the retry, I put the message message into ActiveMQ for > > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by > > ActiveMQ to delay the delivery of the message: > > > > public class RedeliveryTest extends CamelTestSupport { > > > > @EndpointInject(uri = "mock:intercept") > > private MockEndpoint intercept; > > > > @EndpointInject(uri = "mock:dlq") > > private MockEndpoint dlq; > > > > @Before > > public void setUp() throws Exception { > > disableJMX(); > > > > super.setUp(); > > } > > > > @Test > > public void redelieryTest() throws Exception { > > context.addRoutes(new RouteBuilder() { > > @Override > > public void configure() throws Exception { > > from("activemq:queue:dlq") > > .to("mock:dlq"); > > } > > }); > > > > intercept.expectedMessageCount(6); // 1 + 5 redeliveries > > dlq.expectedMessageCount(1); > > > > template.sendBody("activemq:queue:start", "Hello Camel!"); > > > > intercept.assertIsSatisfied(); > > dlq.assertIsSatisfied(); > > > > Exchange exchange = dlq.getExchanges().get(0); > > assertEquals(new Long(5), > > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter")); > > assertEquals(new Long(3200l), > > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay")); > > } > > > > @Override > > protected JndiRegistry createRegistry() throws Exception { > > JndiRegistry registry = super.createRegistry(); > > > > ActiveMQComponent activemq = > > > ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false"); > > registry.bind("activemq", activemq); > > > > return registry; > > } > > > > @Override > > protected RouteBuilder createRouteBuilder() throws Exception { > > return new RouteBuilder() { > > @Override > > public void configure() throws Exception { > > context.addInterceptStrategy(new Tracer()); > > > > ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor = > > new ActiveMqRedeliveryProcessor(); > > > > > activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start"); > > > > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq"); > > activeMqRedeliveryProcessor.setRedeliveryDelay(200); > > activeMqRedeliveryProcessor.setBackOffMultiplier(2.0); > > activeMqRedeliveryProcessor.setMaximumRedeliveries(5); > > > > onException(Exception.class) > > .handled(true) > > .bean(activeMqRedeliveryProcessor) > > .end(); > > > > from("activemq:queue:start").routeId("main") > > .to("mock:intercept") > > .throwException(new Exception("forced Exception for > > test!")); > > } > > }; > > } > > } > > > > And the ActiveMqRedeliveryProcessor is: > > > > public class ActiveMqRedeliveryProcessor { > > > > private String redeliveryEndpoint; > > private String deadLetterEndpoint; > > > > private long redeliveryDelay = 0l; > > private double backOffMultiplier = 1; > > private int maximumRedeliveries = 0; > > > > public void process(Exchange exchange) throws Exception { > > JmsMessage message = exchange.getIn(JmsMessage.class); > > > > Long delay = message.getHeader("CamelActiveMqRedeliveryDelay", > > Long.class); > > Long redeliveryCount = > > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class); > > > > if (redeliveryCount == null) { > > redeliveryCount = new Long(0); > > } > > > > ProducerTemplate template = new > > DefaultProducerTemplate(exchange.getContext()); > > template.start(); > > > > if (redeliveryCount < maximumRedeliveries) { > > redeliveryCount = new Long(redeliveryCount + 1); > > message.setHeader("CamelActiveMqRedeliveryCounter", > > redeliveryCount); > > > > if (delay == null) { > > delay = new Long(redeliveryDelay); > > } else { > > delay = new Long((long) (delay * backOffMultiplier)); > > } > > message.setHeader("scheduledJobId", null); > > message.setHeader("CamelActiveMqRedeliveryDelay", delay); > > message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, > delay); > > > > template.send(redeliveryEndpoint, exchange); > > } else { > > template.send(deadLetterEndpoint, exchange); > > } > > template.stop(); > > } > > > > public void setRedeliveryDelay(long redeliveryDelay) { > > this.redeliveryDelay = redeliveryDelay; > > } > > > > public void setBackOffMultiplier(double backOffMultiplier) { > > this.backOffMultiplier = backOffMultiplier; > > } > > > > public void setMaximumRedeliveries(int maximumRedeliveries) { > > this.maximumRedeliveries = maximumRedeliveries; > > } > > > > public void setRedeliveryEndpoint(String redeliveryEndpoint) { > > this.redeliveryEndpoint = redeliveryEndpoint; > > } > > > > public void setDeadLetterEndpoint(String deadLetterEndpoint) { > > this.deadLetterEndpoint = deadLetterEndpoint; > > } > > } > > > > I'm looking for a better integration into Camel for this feature, if > other > > people also think this is a common use case (let ActiveMQ handle the > > redelivery instead of having long live inflight messages if the delay is > > more than a few minutes). This integration could looks like: > > > > errorHandler( > > activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ") > > .maximumRedeliveries(8) > > .deliveryDelay(60000) > > .useExponentialBackOff() > > .backOffMultiplier(2)); > > Where "activemq:queue:FOO" is the queue for redelivery and > > "activemq:queue:DLQ" is the dead letter queue. > > > > I'm really interested in your opinions whether this is also useful for > other > > people or is this only a special requirement from my site. > > > > Best, > > Christian > > > > > > -- > Claus Ibsen > ----------------- > FuseSource > Email: cib...@fusesource.com > Web: http://fusesource.com > Twitter: davsclaus, fusenews > Blog: http://davsclaus.blogspot.com/ > Author of Camel in Action: http://www.manning.com/ibsen/ >