[CAMEL-6364] Improve processor wrapping for Try/Catch/Finally

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66939043
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66939043
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66939043

Branch: refs/heads/master
Commit: 66939043e3acc33f522440166c819cd62cf98c9b
Parents: d824bce
Author: Guillaume Nodet <[email protected]>
Authored: Fri May 17 11:33:17 2013 +0200
Committer: Guillaume Nodet <[email protected]>
Committed: Fri May 17 13:33:02 2013 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/Exchange.java   |    1 +
 .../org/apache/camel/model/FinallyDefinition.java  |    3 +-
 .../java/org/apache/camel/model/TryDefinition.java |   16 +-
 .../org/apache/camel/processor/AOPProcessor.java   |    2 +-
 .../org/apache/camel/processor/CatchProcessor.java |   67 ++++-
 .../apache/camel/processor/FinallyProcessor.java   |   83 +++++
 .../org/apache/camel/processor/TryProcessor.java   |  282 ++-------------
 7 files changed, 187 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java 
b/camel-core/src/main/java/org/apache/camel/Exchange.java
index 3420e73..1c2e1bc 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -106,6 +106,7 @@ public interface Exchange {
     String DUPLICATE_MESSAGE         = "CamelDuplicateMessage";
 
     String EXCEPTION_CAUGHT           = "CamelExceptionCaught";
+    String EXCEPTION_HANDLED          = "CamelExceptionHandled";
     String EVALUATE_EXPRESSION_RESULT = "CamelEvaluateExpressionResult";
     String ERRORHANDLER_HANDLED       = "CamelErrorHandlerHandled";
     String EXTERNAL_REDELIVERED       = "CamelExternalRedelivered";

http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java
index defa7e3..836ef62 100644
--- a/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java
@@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
 
 import org.apache.camel.Processor;
+import org.apache.camel.processor.FinallyProcessor;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -55,6 +56,6 @@ public class FinallyDefinition extends 
OutputDefinition<FinallyDefinition> {
         }
 
         // do finally does mandate a child processor
-        return this.createChildProcessor(routeContext, true);
+        return new FinallyProcessor(this.createChildProcessor(routeContext, 
false));
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
index d64a7e6..6f824fe 100644
--- a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
@@ -76,18 +76,20 @@ public class TryDefinition extends 
OutputDefinition<TryDefinition> {
             throw new IllegalArgumentException("Definition has no children on 
" + this);
         }
 
-        Processor finallyProcessor = null;
-        if (finallyClause != null) {
-            finallyProcessor = createProcessor(routeContext, finallyClause);
-        }
-
-        List<CatchProcessor> catchProcessors = new ArrayList<CatchProcessor>();
+        List<Processor> catchProcessors = new ArrayList<Processor>();
         if (catchClauses != null) {
             for (CatchDefinition catchClause : catchClauses) {
-                catchProcessors.add(catchClause.createProcessor(routeContext));
+                catchProcessors.add(createProcessor(routeContext, 
catchClause));
             }
         }
 
+        FinallyDefinition finallyDefinition = finallyClause;
+        if (finallyDefinition == null) {
+            finallyDefinition = new FinallyDefinition();
+            finallyDefinition.setParent(this);
+        }
+        Processor finallyProcessor = createProcessor(routeContext, 
finallyDefinition);
+
         // must have either a catch or finally
         if (finallyClause == null && catchClauses == null) {
             throw new IllegalArgumentException("doTry must have one or more 
catch or finally blocks on " + this);

http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java
index ceaf333..2ac5ca3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java
@@ -25,7 +25,7 @@ import org.apache.camel.Processor;
  */
 public class AOPProcessor extends TryProcessor {
 
-    public AOPProcessor(Processor tryProcessor, List<CatchProcessor> 
catchClauses, Processor finallyProcessor) {
+    public AOPProcessor(Processor tryProcessor, List<Processor> catchClauses, 
Processor finallyProcessor) {
         super(tryProcessor, catchClauses, finallyProcessor);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
index b897ace..20fcb29 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -19,11 +19,15 @@ package org.apache.camel.processor;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A processor which catches exceptions.
@@ -31,6 +35,8 @@ import org.apache.camel.util.ObjectHelper;
  * @version 
  */
 public class CatchProcessor extends DelegateAsyncProcessor implements 
Traceable {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(CatchProcessor.class);
+
     private final List<Class<? extends Throwable>> exceptions;
     private final Predicate onWhen;
     private final Predicate handled;
@@ -51,9 +57,63 @@ public class CatchProcessor extends DelegateAsyncProcessor 
implements Traceable
         return "catch";
     }
 
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        Exception e = exchange.getException();
+        Throwable caught = catches(exchange, e);
+        // If a previous catch clause handled the exception or if this clause 
does not match, exit
+        if (exchange.getProperty(Exchange.EXCEPTION_HANDLED) != null || caught 
== null) {
+            callback.done(true);
+            return true;
+        }
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("This CatchProcessor catches the exception: {} caused 
by: {}", caught.getClass().getName(), e.getMessage());
+        }
+
+        // store the last to endpoint as the failure endpoint
+        if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
+        }
+        // give the rest of the pipeline another chance
+        exchange.setProperty(Exchange.EXCEPTION_HANDLED, true);
+        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+        exchange.setException(null);
+        // and we should not be regarded as exhausted as we are in a try .. 
catch block
+        exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
+        // is the exception handled by the catch clause
+        final boolean handled = handles(exchange);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("The exception is handled: {} for the exception: {} 
caused by: {}",
+                    new Object[]{handled, e.getClass().getName(), 
e.getMessage()});
+        }
+
+        boolean sync = super.processNext(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                if (!handled) {
+                    if (exchange.getException() == null) {
+                        
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class));
+                    }
+                }
+                // always clear redelivery exhausted in a catch clause
+                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
+
+                if (!doneSync) {
+                    // signal callback to continue routing async
+                    ExchangeHelper.prepareOutToIn(exchange);
+                }
+
+                callback.done(doneSync);
+            }
+        });
+
+        return sync;
+    }
+
     /**
      * Returns with the exception that is caught by this processor.
-     * 
+     *
      * This method traverses exception causes, so sometimes the exception
      * returned from this method might be one of causes of the parameter
      * passed.
@@ -62,7 +122,7 @@ public class CatchProcessor extends DelegateAsyncProcessor 
implements Traceable
      * @param exception the thrown exception
      * @return Throwable that this processor catches. <tt>null</tt> if nothing 
matches.
      */
-    public Throwable catches(Exchange exchange, Throwable exception) {
+    protected Throwable catches(Exchange exchange, Throwable exception) {
         // use the exception iterator to walk the caused by hierarchy
         Iterator<Throwable> it = 
ObjectHelper.createExceptionIterator(exception);
         while (it.hasNext()) {
@@ -79,14 +139,13 @@ public class CatchProcessor extends DelegateAsyncProcessor 
implements Traceable
         return null;
     }
 
-
     /**
      * Whether this catch processor handles the exception it have caught
      *
      * @param exchange  the current exchange
      * @return <tt>true</tt> if this processor handles it, <tt>false</tt> 
otherwise.
      */
-    public boolean handles(Exchange exchange) {
+    protected boolean handles(Exchange exchange) {
         if (handled == null) {
             // handle by default
             return true;

http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
new file mode 100644
index 0000000..c88c7c9
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Traceable;
+import org.apache.camel.util.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Processor to handle do finally supporting asynchronous routing engine
+ *
+ * @version
+ */
+public class FinallyProcessor extends DelegateAsyncProcessor implements 
Traceable {
+
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(FinallyProcessor.class);
+
+    public FinallyProcessor(Processor processor) {
+        super(processor);
+    }
+
+    @Override
+    protected boolean processNext(final Exchange exchange, final AsyncCallback 
callback) {
+        // clear exception so finally block can be executed
+        final Exception e = exchange.getException();
+        exchange.setException(null);
+        // but store the caught exception as a property
+        if (e != null) {
+            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+        }
+        // store the last to endpoint as the failure endpoint
+        if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
+            exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
+        }
+
+        boolean sync = super.processNext(exchange, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                if (e == null) {
+                    exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
+                } else {
+                    // set exception back on exchange
+                    exchange.setException(e);
+                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
+                }
+
+                if (!doneSync) {
+                    // signal callback to continue routing async
+                    ExchangeHelper.prepareOutToIn(exchange);
+                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                }
+                callback.done(doneSync);
+            }
+        });
+        return sync;
+    }
+
+    @Override
+    public String toString() {
+        return "Finally{" + getProcessor() + "}";
+    }
+
+    public String getTraceLabel() {
+        return "finally";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
index 6cac402..bc8f98f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -17,7 +17,6 @@
 package org.apache.camel.processor;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -43,20 +42,20 @@ import org.slf4j.LoggerFactory;
 public class TryProcessor extends ServiceSupport implements AsyncProcessor, 
Navigate<Processor>, Traceable {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(TryProcessor.class);
 
-    protected final AsyncProcessor tryProcessor;
-    protected final DoCatchProcessor catchProcessor;
-    protected final DoFinallyProcessor finallyProcessor;
-    private List<AsyncProcessor> processors;
+    protected final Processor tryProcessor;
+    protected final List<Processor> catchClauses;
+    protected final Processor finallyProcessor;
 
-    public TryProcessor(Processor tryProcessor, List<CatchProcessor> 
catchClauses, Processor finallyProcessor) {
-        this.tryProcessor = 
AsyncProcessorConverterHelper.convert(tryProcessor);
-        this.catchProcessor = new DoCatchProcessor(catchClauses);
-        this.finallyProcessor = new DoFinallyProcessor(finallyProcessor);
+    public TryProcessor(Processor tryProcessor, List<Processor> catchClauses, 
Processor finallyProcessor) {
+        this.tryProcessor = tryProcessor;
+        this.catchClauses = catchClauses;
+        this.finallyProcessor = finallyProcessor;
     }
 
     public String toString() {
+        String catchText = catchClauses == null || catchClauses.isEmpty() ? 
"": " Catches {" + catchClauses + "}";
         String finallyText = (finallyProcessor == null) ? "" : " Finally {" + 
finallyProcessor + "}";
-        return "Try {" + tryProcessor + "} " + (catchProcessor != null ? 
catchProcessor : "") + finallyText;
+        return "Try {" + tryProcessor + "}" + catchText + finallyText;
     }
 
     public String getTraceLabel() {
@@ -68,14 +67,18 @@ public class TryProcessor extends ServiceSupport implements 
AsyncProcessor, Navi
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        Iterator<AsyncProcessor> processors = getProcessors().iterator();
+        Iterator<Processor> processors = next().iterator();
+
+        Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED);
+        exchange.setProperty(Exchange.EXCEPTION_HANDLED, null);
 
         while (continueRouting(processors, exchange)) {
             ExchangeHelper.prepareOutToIn(exchange);
 
             // process the next processor
-            AsyncProcessor processor = processors.next();
-            boolean sync = process(exchange, callback, processor, processors);
+            Processor processor = processors.next();
+            AsyncProcessor async = 
AsyncProcessorConverterHelper.convert(processor);
+            boolean sync = process(exchange, callback, processors, async, 
lastHandled);
 
             // continue as long its being processed synchronously
             if (!sync) {
@@ -89,13 +92,15 @@ public class TryProcessor extends ServiceSupport implements 
AsyncProcessor, Navi
         }
 
         ExchangeHelper.prepareOutToIn(exchange);
+        exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
         LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
         callback.done(true);
         return true;
     }
 
     protected boolean process(final Exchange exchange, final AsyncCallback 
callback,
-                              final AsyncProcessor processor, final 
Iterator<AsyncProcessor> processors) {
+                              final Iterator<Processor> processors, final 
AsyncProcessor processor,
+                              final Object lastHandled) {
         // this does the actual processing so log at trace level
         LOG.trace("Processing exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
 
@@ -113,8 +118,8 @@ public class TryProcessor extends ServiceSupport implements 
AsyncProcessor, Navi
                     ExchangeHelper.prepareOutToIn(exchange);
 
                     // process the next processor
-                    AsyncProcessor processor = processors.next();
-                    doneSync = process(exchange, callback, processor, 
processors);
+                    AsyncProcessor processor = 
AsyncProcessorConverterHelper.convert(processors.next());
+                    doneSync = process(exchange, callback, processors, 
processor, lastHandled);
 
                     if (!doneSync) {
                         LOG.trace("Processing exchangeId: {} is continued 
being processed asynchronously", exchange.getExchangeId());
@@ -125,6 +130,7 @@ public class TryProcessor extends ServiceSupport implements 
AsyncProcessor, Navi
                 }
 
                 ExchangeHelper.prepareOutToIn(exchange);
+                exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled);
                 LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
                 callback.done(false);
             }
@@ -133,11 +139,7 @@ public class TryProcessor extends ServiceSupport 
implements AsyncProcessor, Navi
         return sync;
     }
 
-    protected Collection<AsyncProcessor> getProcessors() {
-        return processors;
-    }
-
-    protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange 
exchange) {
+    protected boolean continueRouting(Iterator<Processor> it, Exchange 
exchange) {
         Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
         if (stop != null) {
             boolean doStop = 
exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
@@ -152,16 +154,11 @@ public class TryProcessor extends ServiceSupport 
implements AsyncProcessor, Navi
     }
 
     protected void doStart() throws Exception {
-        processors = new ArrayList<AsyncProcessor>();
-        processors.add(tryProcessor);
-        processors.add(catchProcessor);
-        processors.add(finallyProcessor);
-        ServiceHelper.startServices(tryProcessor, catchProcessor, 
finallyProcessor);
+        ServiceHelper.startServices(tryProcessor, catchClauses, 
finallyProcessor);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(finallyProcessor, catchProcessor, 
tryProcessor);
-        processors.clear();
+        ServiceHelper.stopServices(tryProcessor, catchClauses, 
finallyProcessor);
     }
 
     public List<Processor> next() {
@@ -172,8 +169,8 @@ public class TryProcessor extends ServiceSupport implements 
AsyncProcessor, Navi
         if (tryProcessor != null) {
             answer.add(tryProcessor);
         }
-        if (catchProcessor != null) {
-            answer.add(catchProcessor);
+        if (catchClauses != null) {
+            answer.addAll(catchClauses);
         }
         if (finallyProcessor != null) {
             answer.add(finallyProcessor);
@@ -182,230 +179,7 @@ public class TryProcessor extends ServiceSupport 
implements AsyncProcessor, Navi
     }
 
     public boolean hasNext() {
-        return tryProcessor != null;
-    }
-
-    /**
-     * Processor to handle do catch supporting asynchronous routing engine
-     */
-    private final class DoCatchProcessor extends ServiceSupport implements 
AsyncProcessor, Navigate<Processor>, Traceable {
-
-        private final List<CatchProcessor> catchClauses;
-
-        private DoCatchProcessor(List<CatchProcessor> catchClauses) {
-            this.catchClauses = catchClauses;
-        }
-
-        public void process(Exchange exchange) throws Exception {
-            AsyncProcessorHelper.process(this, exchange);
-        }
-
-        public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
-            Exception e = exchange.getException();
-
-            if (catchClauses == null || e == null) {
-                return true;
-            }
-
-            // find a catch clause to use
-            CatchProcessor processor = null;
-            for (CatchProcessor catchClause : catchClauses) {
-                Throwable caught = catchClause.catches(exchange, e);
-                if (caught != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This TryProcessor catches the exception: {} 
caused by: {}", caught.getClass().getName(), e.getMessage());
-                    }
-                    processor = catchClause;
-                    break;
-                }
-            }
-
-            if (processor != null) {
-                // create the handle processor which performs the actual logic
-                // this processor just lookup the right catch clause to use 
and then let the
-                // HandleDoCatchProcessor do all the hard work (separate of 
concerns)
-                HandleDoCatchProcessor cool = new 
HandleDoCatchProcessor(processor);
-                return AsyncProcessorHelper.process(cool, exchange, callback);
-            } else {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("This TryProcessor does not catch the exception: 
{} caused by: {}", e.getClass().getName(), e.getMessage());
-                }
-            }
-
-            return true;
-        }
-
-        @Override
-        protected void doStart() throws Exception {
-            ServiceHelper.startService(catchClauses);
-        }
-
-        @Override
-        protected void doStop() throws Exception {
-            ServiceHelper.stopServices(catchClauses);
-        }
-
-        @Override
-        public String toString() {
-            return "Catches{" + catchClauses + "}";
-        }
-
-        public String getTraceLabel() {
-            return "doCatch";
-        }
-
-        public List<Processor> next() {
-            List<Processor> answer = new ArrayList<Processor>();
-            if (catchProcessor != null) {
-                answer.addAll(catchClauses);
-            }
-            return answer;
-        }
-
-        public boolean hasNext() {
-            return catchClauses != null && catchClauses.size() > 0;
-        }
-    }
-
-    /**
-     * Processor to handle do finally supporting asynchronous routing engine
-     */
-    private final class DoFinallyProcessor extends DelegateAsyncProcessor 
implements Traceable {
-
-        private DoFinallyProcessor(Processor processor) {
-            super(processor);
-        }
-
-        @Override
-        protected boolean processNext(final Exchange exchange, final 
AsyncCallback callback) {
-            // clear exception so finally block can be executed
-            final Exception e = exchange.getException();
-            exchange.setException(null);
-            // but store the caught exception as a property
-            if (e != null) {
-                exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-            }
-            // store the last to endpoint as the failure endpoint
-            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
-            }
-
-            boolean sync = super.processNext(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // we only have to handle async completion of the pipeline
-                    if (doneSync) {
-                        return;
-                    }
-
-                    if (e == null) {
-                        exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
-                    } else {
-                        // set exception back on exchange
-                        exchange.setException(e);
-                        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-                    }
-
-                    // signal callback to continue routing async
-                    ExchangeHelper.prepareOutToIn(exchange);
-                    LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
-                    callback.done(false);
-                }
-            });
-
-            if (sync) {
-                if (e == null) {
-                    exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
-                } else {
-                    // set exception back on exchange
-                    exchange.setException(e);
-                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
-                }
-            }
-
-            return sync;
-        }
-
-        @Override
-        public String toString() {
-            return "Finally{" + getProcessor() + "}";
-        }
-
-        public String getTraceLabel() {
-            return "doFinally";
-        }
-    }
-
-    /**
-     * Processor to handle do catch supporting asynchronous routing engine
-     */
-    private final class HandleDoCatchProcessor extends DelegateAsyncProcessor {
-
-        private final CatchProcessor catchClause;
-
-        private HandleDoCatchProcessor(CatchProcessor processor) {
-            super(processor);
-            this.catchClause = processor;
-        }
-
-        @Override
-        protected boolean processNext(final Exchange exchange, final 
AsyncCallback callback) {
-            final Exception caught = exchange.getException();
-            if (caught == null) {
-                return true;
-            }
-
-            // store the last to endpoint as the failure endpoint
-            if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
-                exchange.setProperty(Exchange.FAILURE_ENDPOINT, 
exchange.getProperty(Exchange.TO_ENDPOINT));
-            }
-            // give the rest of the pipeline another chance
-            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, caught);
-            exchange.setException(null);
-            // and we should not be regarded as exhausted as we are in a try 
.. catch block
-            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-
-            // is the exception handled by the catch clause
-            final Boolean handled = catchClause.handles(exchange);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("The exception is handled: {} for the exception: {} 
caused by: {}",
-                        new Object[]{handled, caught.getClass().getName(), 
caught.getMessage()});
-            }
-
-            boolean sync = super.processNext(exchange, new AsyncCallback() {
-                public void done(boolean doneSync) {
-                    // we only have to handle async completion of the pipeline
-                    if (doneSync) {
-                        return;
-                    }
-
-                    if (!handled) {
-                        if (exchange.getException() == null) {
-                            
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class));
-                        }
-                    }
-                    // always clear redelivery exhausted in a catch clause
-                    exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-
-                    // signal callback to continue routing async
-                    ExchangeHelper.prepareOutToIn(exchange);
-                    callback.done(false);
-                }
-            });
-
-            if (sync) {
-                // set exception back on exchange
-                if (!handled) {
-                    if (exchange.getException() == null) {
-                        
exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, 
Exception.class));
-                    }
-                }
-                // always clear redelivery exhausted in a catch clause
-                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
-            }
-
-            return sync;
-        }
+        return tryProcessor != null || catchClauses != null && 
!catchClauses.isEmpty() || finallyProcessor != null;
     }
 
 }

Reply via email to