Author: davsclaus
Date: Tue Aug 26 23:24:27 2008
New Revision: 689379
URL: http://svn.apache.org/viewvc?rev=689379&view=rev
Log:
CAMEL-857: DLC and maximum redelivery fixed
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
(with props)
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=689379&r1=689378&r2=689379&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Tue Aug 26 23:24:27 2008
@@ -21,7 +21,6 @@
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
@@ -130,7 +129,11 @@
}
if
(!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ // we did not success with the redelivery so now we let the
failure processor handle it
setFailureHandled(exchange, true);
+ // must decrement the redelivery counter as we didn't process
the redelivery but is
+ // handling by the failure handler. So we must -1 to not let
the counter be out-of-sync
+ decrementRedeliveryCounter(exchange);
AsyncProcessor afp =
AsyncProcessorTypeConverter.convert(data.failureProcessor);
boolean sync = afp.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
@@ -151,7 +154,7 @@
exchange.setProperty(EXCEPTION_CAUSE_PROPERTY,
exchange.getException());
exchange.setException(null);
-
+
boolean sync = outputAsync.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
// Only handle the async case...
@@ -264,6 +267,24 @@
return next;
}
+ /**
+ * Prepares the redelivery counter and boolean flag for the failure handle
processor
+ */
+ private void decrementRedeliveryCounter(Exchange exchange) {
+ Message in = exchange.getIn();
+ Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
+ if (counter != null) {
+ int prev = counter - 1;
+ in.setHeader(REDELIVERY_COUNTER, prev);
+ // set boolean flag according to counter
+ in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
+ } else {
+ // not redelivered
+ in.setHeader(REDELIVERY_COUNTER, 0);
+ in.setHeader(REDELIVERED, Boolean.FALSE);
+ }
+ }
+
@Override
protected void doStart() throws Exception {
ServiceHelper.startServices(output, deadLetter);
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?rev=689379&r1=689378&r2=689379&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
Tue Aug 26 23:24:27 2008
@@ -32,7 +32,7 @@
* <p>
* The default values is:
* <ul>
- * <li>maximumRedeliveries = 6</li>
+ * <li>maximumRedeliveries = 5</li>
* <li>initialRedeliveryDelay = 1000L</li>
* <li>maximumRedeliveryDelay = 60 * 1000L</li>
* <li>backOffMultiplier = 2</li>
@@ -40,6 +40,9 @@
* <li>collisionAvoidanceFactor = 0.15d</li>
* <li>useCollisionAvoidance = false</li>
* </ul>
+ * <p/>
+ * Setting the maximumRedeliveries to a negative value such as -1 will then
always redeliver (unlimited).
+ * Setting the maximumRedeliveries to 0 will disable redelivery.
*
* @version $Revision$
*/
@@ -47,7 +50,7 @@
protected static transient Random randomNumberGenerator;
private static final transient Log LOG =
LogFactory.getLog(RedeliveryPolicy.class);
- protected int maximumRedeliveries = 6;
+ protected int maximumRedeliveries = 5;
protected long initialRedeliveryDelay = 1000L;
protected long maximumRedeliveryDelay = 60 * 1000L;
protected double backOffMultiplier = 2;
@@ -80,7 +83,8 @@
if (getMaximumRedeliveries() < 0) {
return true;
}
- return redeliveryCounter < getMaximumRedeliveries();
+ // redeliver until we hitted the max
+ return redeliveryCounter <= getMaximumRedeliveries();
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java?rev=689379&r1=689378&r2=689379&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java
Tue Aug 26 23:24:27 2008
@@ -291,7 +291,7 @@
protected RouteBuilder createTryCatchFinallyNoEnd() {
return new RouteBuilder() {
public void configure() {
- errorHandler(deadLetterChannel().maximumRedeliveries(2));
+ errorHandler(deadLetterChannel().maximumRedeliveries(1));
from("direct:a").tryBlock().process(validator).process(toProcessor)
.handle(ValidationException.class).process(orderProcessor).finallyBlock()
.process(orderProcessor2).process(orderProcessor3); //
continuation of the finallyBlock clause
@@ -324,7 +324,7 @@
expected.add("VALIDATE");
expected.add("INVOKED2");
expected.add("INVOKED3");
- // exchange should be processed twice for an uncaught exception and
maximumRedeliveries(2)
+ // exchange should be processed twice for an uncaught exception and
maximumRedeliveries(1)
expected.add("VALIDATE");
expected.add("INVOKED2");
expected.add("INVOKED3");
@@ -335,7 +335,7 @@
protected RouteBuilder createTryCatchFinallyEnd() {
return new RouteBuilder() {
public void configure() {
- errorHandler(deadLetterChannel().maximumRedeliveries(2));
+ errorHandler(deadLetterChannel().maximumRedeliveries(1));
from("direct:a").tryBlock().process(validator).process(toProcessor)
.handle(ValidationException.class).process(orderProcessor).finallyBlock()
.process(orderProcessor2).end().process(orderProcessor3);
@@ -367,7 +367,7 @@
ArrayList<String> expected = new ArrayList<String>();
expected.add("VALIDATE");
expected.add("INVOKED2");
- // exchange should be processed twice for an uncaught exception and
maximumRedeliveries(2)
+ // exchange should be processed twice for an uncaught exception and
maximumRedeliveries(1)
expected.add("VALIDATE");
expected.add("INVOKED2");
// orderProcessor3 will not be invoked past end() with an uncaught
exception
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java?rev=689379&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
Tue Aug 26 23:24:27 2008
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test to verify that error handling using thread() pool also works as
expected.
+ */
+public class BelasThreadErrorHandlerIssue794Test extends ContextTestSupport {
+ private static int counter;
+
+ public void testThreadErrorHandlerRedeliveryNoThread() throws Exception {
+ counter = 0;
+
+ // We expect the exchange here after 1 delivery and 2 re-deliveries
+ MockEndpoint mock= getMockEndpoint("mock:noThread");
+ mock.expectedMessageCount(1);
+
mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
+
mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(2);
+
+ template.sendBody("direct:inNoThread", "Hello World");
+
+ mock.assertIsSatisfied();
+ assertEquals(3, counter); // One call + 2 re-deliveries
+ }
+
+ // TODO: Look into these unit tests
+
+/* public void testThreadErrorHandlerRedeliveryBeforeThread() throws
Exception {
+ counter = 0;
+
+ // We expect the exchange here after 1 delivery and 2 re-deliveries
+ MockEndpoint mock= getMockEndpoint("mock:beforeThread");
+ mock.expectedMessageCount(1);
+
mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
+
mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(2);
+
+ template.sendBody("direct:inBeforeThread", "Hello World");
+
+ mock.assertIsSatisfied();
+ }*/
+
+/* public void testThreadErrorHandlerCallBeforeThread() throws Exception {
+ counter = 0;
+
+ template.sendBody("direct:inBeforeThread", "Hello World");
+
+ assertEquals(3, counter); // One call + 2 re-deliveries
+ }
+
+ public void testThreadErrorHandlerRedeliveryAfterThread() throws Exception
{
+ counter = 0;
+
+ // We expect the exchange here after 1 delivery and 2 re-deliveries
+ MockEndpoint mock= getMockEndpoint("mock:afterThread");
+ mock.expectedMessageCount(1);
+
mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
+
mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(2);
+
+ template.sendBody("direct:inAfterThread", "Hello World");
+
+ mock.assertIsSatisfied();
+ }
+
+ public void testThreadErrorHandlerCallAfterThread() throws Exception {
+ counter = 0;
+
+ template.sendBody("direct:inAfterThread", "Hello World");
+
+ assertEquals(3, counter); // One call + 2 re-deliveries
+ }*/
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:inNoThread")
+
.errorHandler(deadLetterChannel("mock:noThread").maximumRedeliveries(2))
+ .process(new Processor() {
+ public void process(Exchange exchange)
throws Exception {
+ counter++;
+ throw new Exception("Forced
exception by unit test");
+ }
+ });
+
+ from("direct:inBeforeThread")
+
.errorHandler(deadLetterChannel("mock:beforeThread").maximumRedeliveries(2))
+ .thread(2)
+ .process(new Processor() {
+ public void process(Exchange exchange)
throws Exception {
+ counter++;
+ throw new Exception("Forced
exception by unit test");
+ }
+ });
+
+ from("direct:inAfterThread")
+ .thread(2)
+
.errorHandler(deadLetterChannel("mock:afterThread").maximumRedeliveries(2))
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ counter++;
+ throw new Exception("Forced exception
by unit test");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue794Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java?rev=689379&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
Tue Aug 26 23:24:27 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test to verift that redelivery counters is working as expected.
+ */
+public class DeadLetterChannelRedeliveryTest extends ContextTestSupport {
+
+ private static int counter;
+
+ public void testRedeliveryTest() throws Exception {
+ counter = 0;
+
+ // We expect the exchange here after 1 delivery and 2 re-deliveries
+ MockEndpoint mock = getMockEndpoint("mock:error");
+ mock.expectedMessageCount(1);
+
mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
+
mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(2);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisifed();
+
+ assertEquals(3, counter); // One call + 2 re-deliveries
+ }
+
+ public void testNoRedeliveriesTest() throws Exception {
+ counter = 0;
+
+ // We expect the exchange here after 1 delivery
+ MockEndpoint mock = getMockEndpoint("mock:no");
+ mock.expectedMessageCount(1);
+
mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.FALSE);
+
mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(0);
+
+ template.sendBody("direct:no", "Hello World");
+
+ assertMockEndpointsSatisifed();
+
+ assertEquals(1, counter); // One call
+ }
+
+ public void testOneRedeliveryTest() throws Exception {
+ counter = 0;
+
+ // We expect the exchange here after 1 delivery and 1 re delivery
+ MockEndpoint mock = getMockEndpoint("mock:one");
+ mock.expectedMessageCount(1);
+
mock.message(0).header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE);
+
mock.message(0).header("org.apache.camel.RedeliveryCounter").isEqualTo(1);
+
+ template.sendBody("direct:one", "Hello World");
+
+ assertMockEndpointsSatisifed();
+
+ assertEquals(2, counter); // One call + 1 re-delivery
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:start")
+
.errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(2))
+ .process(new Processor() {
+ public void process(Exchange exchange)
throws Exception {
+ counter++;
+ throw new Exception("Forced
exception by unit test");
+ }
+ });
+
+ from("direct:no")
+
.errorHandler(deadLetterChannel("mock:no").maximumRedeliveries(0))
+ .process(new Processor() {
+ public void process(Exchange exchange)
throws Exception {
+ counter++;
+ throw new Exception("Forced
exception by unit test");
+ }
+ });
+
+ from("direct:one")
+
.errorHandler(deadLetterChannel("mock:one").maximumRedeliveries(1))
+ .process(new Processor() {
+ public void process(Exchange exchange)
throws Exception {
+ counter++;
+ throw new Exception("Forced
exception by unit test");
+ }
+ });
+ }
+ };
+ }
+
+}
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelRedeliveryTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java?rev=689379&r1=689378&r2=689379&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryPolicyPerExceptionTest.java
Tue Aug 26 23:24:27 2008
@@ -56,15 +56,14 @@
MockEndpoint.assertIsSatisfied(a, b);
-
List<Exchange> list = b.getReceivedExchanges();
assertTrue("List should not be empty!", !list.isEmpty());
Exchange exchange = list.get(0);
Message in = exchange.getIn();
log.info("Found message with headers: " + in.getHeaders());
- assertMessageHeader(in, DeadLetterChannel.REDELIVERY_COUNTER, 1);
- assertMessageHeader(in, DeadLetterChannel.REDELIVERED, true);
+ assertMessageHeader(in, DeadLetterChannel.REDELIVERY_COUNTER, 0);
+ assertMessageHeader(in, DeadLetterChannel.REDELIVERED, false);
}
@Override
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java?rev=689379&r1=689378&r2=689379&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java
Tue Aug 26 23:24:27 2008
@@ -44,8 +44,8 @@
public void testInvalidMessage() throws Exception {
validEndpoint.expectedMessageCount(0);
- // allEndpoint receives 6 messages, as redelivery is involved
- allEndpoint.expectedMessageCount(6);
+ // allEndpoint receives 1 + 5 messages, ordinary (1 attempt) and
redelivery (5 attempts) is involved
+ allEndpoint.expectedMessageCount(1 + 5);
template.sendBodyAndHeader("direct:start", "<invalid/>", "foo",
"notMatchedHeaderValue");