Repository: camel Updated Branches: refs/heads/master 90ed5549e -> 1d3ffffd3
CAMEL-9791: Threads EIP now allow error handler to perform redelivery if adding task to queue is rejected. Thanks to Thibaut Robert for the unit test. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d3ffffd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d3ffffd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d3ffffd Branch: refs/heads/master Commit: 1d3ffffd371c93dc7acc07405ad6eac4490bcb0d Parents: 90ed554 Author: Claus Ibsen <[email protected]> Authored: Mon Apr 4 16:55:54 2016 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Apr 4 16:57:31 2016 +0200 ---------------------------------------------------------------------- .../camel/processor/ThreadsProcessor.java | 4 - ...eadsRejectedExecutionWithDeadLetterTest.java | 91 ++++++++++++++++++++ 2 files changed, 91 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1d3ffffd/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java index c71821f..2f97943 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ThreadsProcessor.java @@ -92,11 +92,7 @@ public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor, if (abort) { exchange.setException(new RejectedExecutionException()); } - LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange); - // we should not continue routing, and no redelivery should be performed - exchange.setProperty(Exchange.ROUTE_STOP, true); - exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true); if (shutdown.get()) { exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); http://git-wip-us.apache.org/repos/asf/camel/blob/1d3ffffd/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java new file mode 100644 index 0000000..9f32e5f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ThreadPoolRejectedPolicy; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ThreadsRejectedExecutionWithDeadLetterTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + public void testThreadsRejectedExecution() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start").errorHandler(deadLetterChannel("mock:failed")) + .to("log:before") + // will use our custom pool + .threads() + .maxPoolSize(1).poolSize(1) // 1 thread max + .maxQueueSize(1) // 1 queued task + //(Test fails whatever the chosen policy below) + .rejectedPolicy(ThreadPoolRejectedPolicy.Abort) + .delay(1000) + .to("log:after") + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(2); + getMockEndpoint("mock:failed").expectedMessageCount(1); + + template.sendBody("seda:start", "Hello World"); // will block + template.sendBody("seda:start", "Hi World"); // will be queued + template.sendBody("seda:start", "Bye World"); // will be rejected + + assertMockEndpointsSatisfied(); + } + + public void testThreadsRejectedExecutionWithRedelivery() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("seda:start").errorHandler(deadLetterChannel("mock:failed").maximumRedeliveries(5)) + .to("log:before") + // will use our custom pool + .threads() + .maxPoolSize(1).poolSize(1) // 1 thread max + .maxQueueSize(1) // 1 queued task + //(Test fails whatever the chosen policy below) + .rejectedPolicy(ThreadPoolRejectedPolicy.Abort) + .delay(1000) + .to("log:after") + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:result").expectedMessageCount(3); + getMockEndpoint("mock:failed").expectedMessageCount(0); + + template.sendBody("seda:start", "Hello World"); // will block + template.sendBody("seda:start", "Hi World"); // will be queued + template.sendBody("seda:start", "Bye World"); // will be rejected and queued on redelivery later + + assertMockEndpointsSatisfied(); + } + +} \ No newline at end of file
