This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a65583  CAMEL-17660: amel-core - Error handler OnRedeliveryProcessor 
should be able to control to stop routing
2a65583 is described below

commit 2a655836767de6f7d8e9db10ccf6dffda2c3bee1
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 16 15:41:49 2022 +0100

    CAMEL-17660: amel-core - Error handler OnRedeliveryProcessor should be able 
to control to stop routing
---
 .../errorhandler/RedeliveryErrorHandler.java       |  9 +++
 .../issues/ErrorHandlerOnRedeliveryStopTest.java   | 84 ++++++++++++++++++++++
 2 files changed, 93 insertions(+)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 218556b..2bdff91 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -864,6 +864,15 @@ public abstract class RedeliveryErrorHandler extends 
ErrorHandlerSupport
             // letting onRedeliver be executed at first
             deliverToOnRedeliveryProcessor();
 
+            if (exchange.isRouteStop()) {
+                // the on redelivery can mark that the exchange should stop 
and therefore not perform a redelivery
+                // and if so then we are done so continue callback
+                AsyncCallback cb = callback;
+                taskFactory.release(this);
+                reactiveExecutor.schedule(cb);
+                return;
+            }
+
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Redelivering exchangeId: {} -> {} for Exchange: 
{}", exchange.getExchangeId(), outputAsync,
                         exchange);
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/issues/ErrorHandlerOnRedeliveryStopTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/issues/ErrorHandlerOnRedeliveryStopTest.java
new file mode 100644
index 0000000..a3b1ee9
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/issues/ErrorHandlerOnRedeliveryStopTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.ExpressionAdapter;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class ErrorHandlerOnRedeliveryStopTest extends ContextTestSupport {
+
+    private final AtomicInteger counter = new AtomicInteger(5);
+
+    @Test
+    public void testRetryWhile() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:start", "Hello World");
+            fail("Should throw an exception");
+        } catch (Exception e) {
+            RejectedExecutionException ree = 
assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+            Assertions.assertEquals("I do not want to do this anymore", 
ree.getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+
+                errorHandler(defaultErrorHandler()
+                        .retryWhile(new ExpressionAdapter() {
+                            @Override
+                            public Object evaluate(Exchange exchange) {
+                                return counter.getAndDecrement() > 0;
+                            }
+                        })
+                        .onRedelivery(new MyRedeliveryProcessor())
+                        .redeliveryDelay(0)); // run fast
+
+                from("direct:start")
+                        .throwException(new IllegalArgumentException("Forced"))
+                        .to("mock:result");
+            }
+        };
+    }
+
+    private class MyRedeliveryProcessor implements Processor {
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            if (counter.get() == 0) {
+                exchange.setException(new RejectedExecutionException("I do not 
want to do this anymore"));
+                exchange.setRouteStop(true); // stop redelivery
+            }
+        }
+    }
+}

Reply via email to