This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b4b66d5b9dc9ce44a5ed497076fd837c73b70b0c
Author: Guillaume Nodet <gno...@gmail.com>
AuthorDate: Tue May 14 14:39:08 2024 +0200

    Streamline SharedCamelInternalProcessor which was copied from 
CamelInternalProcessor but has a more limited scope
---
 .../apache/camel/impl/engine/AdviceIterator.java   |  16 ++--
 .../impl/engine/SharedCamelInternalProcessor.java  | 102 +++++----------------
 2 files changed, 33 insertions(+), 85 deletions(-)

diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java
index 02461653431..d89fdcc92f3 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java
@@ -36,12 +36,16 @@ final class AdviceIterator {
             if (task.hasState()) {
                 state = states[stateIndex--];
             }
-            try {
-                task.after(exchange, state);
-            } catch (Exception e) {
-                exchange.setException(e);
-                // allow all advices to complete even if there was an exception
-            }
+            runAfterTask(task, state, exchange);
+        }
+    }
+
+    static void runAfterTask(CamelInternalProcessorAdvice task, Object state, 
Exchange exchange) {
+        try {
+            task.after(exchange, state);
+        } catch (Exception e) {
+            exchange.setException(e);
+            // allow all advices to complete even if there was an exception
         }
     }
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
index d64b2c174cb..5ab5b8ddf66 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
@@ -16,8 +16,7 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -25,7 +24,6 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.Ordered;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelInternalProcessorAdvice;
@@ -36,7 +34,6 @@ import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
-import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.PluginHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,37 +63,20 @@ import org.slf4j.LoggerFactory;
  * <b>Debugging tips:</b> Camel end users whom want to debug their Camel 
applications with the Camel source code, then
  * make sure to read the source code of this class about the debugging tips, 
which you can find in the
  * {@link #process(Exchange, AsyncCallback, AsyncProcessor, Processor)} method.
- * <p/>
- * The added advices can implement {@link Ordered} to control in which order 
the advices are executed.
  */
 public class SharedCamelInternalProcessor implements SharedInternalProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SharedCamelInternalProcessor.class);
-    private static final Object[] EMPTY_STATES = new Object[0];
     private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessorAwaitManager awaitManager;
     private final ShutdownStrategy shutdownStrategy;
-    private final List<CamelInternalProcessorAdvice<?>> advices;
-    private byte statefulAdvices;
+    private final CamelInternalProcessorAdvice<?> advice;
 
-    public SharedCamelInternalProcessor(CamelContext camelContext, 
CamelInternalProcessorAdvice<?>... advices) {
+    public SharedCamelInternalProcessor(CamelContext camelContext, 
CamelInternalProcessorAdvice<?> advice) {
         this.reactiveExecutor = 
camelContext.getCamelContextExtension().getReactiveExecutor();
         this.awaitManager = 
PluginHelper.getAsyncProcessorAwaitManager(camelContext);
         this.shutdownStrategy = camelContext.getShutdownStrategy();
-
-        if (advices != null) {
-            this.advices = new ArrayList<>(advices.length);
-            for (CamelInternalProcessorAdvice<?> advice : advices) {
-                this.advices.add(advice);
-                if (advice.hasState()) {
-                    statefulAdvices++;
-                }
-            }
-            // ensure advices are sorted, so they are in the order we want
-            this.advices.sort(OrderedComparator.get());
-        } else {
-            this.advices = null;
-        }
+        this.advice = Objects.requireNonNull(advice, "advice");
     }
 
     /**
@@ -129,43 +109,23 @@ public class SharedCamelInternalProcessor implements 
SharedInternalProcessor {
      */
     public boolean process(
             Exchange exchange, AsyncCallback originalCallback, AsyncProcessor 
processor, Processor resultProcessor) {
-        // ----------------------------------------------------------
-        // CAMEL END USER - READ ME FOR DEBUGGING TIPS
-        // ----------------------------------------------------------
-        // If you want to debug the Camel routing engine, then there is a lot 
of internal functionality
-        // the routing engine executes during routing messages. You can skip 
debugging this internal
-        // functionality and instead debug where the routing engine continues 
routing to the next node
-        // in the routes. The CamelInternalProcessor is a vital part of the 
routing engine, as its
-        // being used in between the nodes. As an end user you can just debug 
the code in this class
-        // in between the:
-        //   CAMEL END USER - DEBUG ME HERE +++ START +++
-        //   CAMEL END USER - DEBUG ME HERE +++ END +++
-        // you can see in the code below.
-        // ----------------------------------------------------------
-
-        if (processor == null || !continueProcessing(exchange, processor)) {
+        if (processor == null || !continueProcessing(exchange)) {
             // no processor or we should not continue then we are done
             originalCallback.done(true);
             return true;
         }
 
         // optimise to use object array for states, and only for the number of 
advices that keep state
-        final Object[] states = statefulAdvices > 0 ? new 
Object[statefulAdvices] : EMPTY_STATES;
+        final Object state;
         // optimise for loop using index access to avoid creating iterator 
object
-        for (int i = 0, j = 0; i < advices.size(); i++) {
-            CamelInternalProcessorAdvice<?> task = advices.get(i);
-            try {
-                Object state = task.before(exchange);
-                if (task.hasState()) {
-                    states[j++] = state;
-                }
-            } catch (Exception e) {
-                return handleException(exchange, originalCallback, e);
-            }
+        try {
+            state = advice.before(exchange);
+        } catch (Exception e) {
+            return handleException(exchange, originalCallback, e);
         }
 
         // create internal callback which will execute the advices in reverse 
order when done
-        AsyncCallback callback = new InternalCallback(states, exchange, 
originalCallback, resultProcessor);
+        AsyncCallback callback = new InternalCallback(state, exchange, 
originalCallback, resultProcessor);
 
         if (exchange.isTransacted()) {
             return processTransacted(exchange, processor, callback);
@@ -190,16 +150,10 @@ public class SharedCamelInternalProcessor implements 
SharedInternalProcessor {
             async = uow.beforeProcess(processor, exchange, async);
         }
 
-        // ----------------------------------------------------------
-        // CAMEL END USER - DEBUG ME HERE +++ START +++
-        // ----------------------------------------------------------
         if (LOG.isTraceEnabled()) {
             LOG.trace("Processing exchange for exchangeId: {} -> {}", 
exchange.getExchangeId(), exchange);
         }
         boolean sync = processor.process(exchange, async);
-        // ----------------------------------------------------------
-        // CAMEL END USER - DEBUG ME HERE +++ END +++
-        // ----------------------------------------------------------
 
         // optimize to only do after uow processing if really needed
         if (beforeAndAfter) {
@@ -226,17 +180,11 @@ public class SharedCamelInternalProcessor implements 
SharedInternalProcessor {
                         exchange.getExchangeId(), exchange);
             }
         }
-        // ----------------------------------------------------------
-        // CAMEL END USER - DEBUG ME HERE +++ START +++
-        // ----------------------------------------------------------
         try {
             processor.process(exchange);
         } catch (Exception e) {
             exchange.setException(e);
         }
-        // ----------------------------------------------------------
-        // CAMEL END USER - DEBUG ME HERE +++ END +++
-        // ----------------------------------------------------------
         callback.done(true);
         return true;
     }
@@ -246,13 +194,13 @@ public class SharedCamelInternalProcessor implements 
SharedInternalProcessor {
      */
     private final class InternalCallback implements AsyncCallback {
 
-        private final Object[] states;
+        private final Object state;
         private final Exchange exchange;
         private final AsyncCallback callback;
         private final Processor resultProcessor;
 
-        private InternalCallback(Object[] states, Exchange exchange, 
AsyncCallback callback, Processor resultProcessor) {
-            this.states = states;
+        private InternalCallback(Object state, Exchange exchange, 
AsyncCallback callback, Processor resultProcessor) {
+            this.state = state;
             this.exchange = exchange;
             this.callback = callback;
             this.resultProcessor = resultProcessor;
@@ -273,18 +221,12 @@ public class SharedCamelInternalProcessor implements 
SharedInternalProcessor {
 
             // we should call after in reverse order
             try {
-                AdviceIterator.runAfterTasks(advices, states, exchange);
+                AdviceIterator.runAfterTask(advice, state, exchange);
             } finally {
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ START +++
-                // ----------------------------------------------------------
                 // callback must be called
                 if (callback != null) {
                     reactiveExecutor.schedule(callback);
                 }
-                // ----------------------------------------------------------
-                // CAMEL END USER - DEBUG ME HERE +++ END +++
-                // ----------------------------------------------------------
             }
         }
     }
@@ -292,18 +234,20 @@ public class SharedCamelInternalProcessor implements 
SharedInternalProcessor {
     /**
      * Strategy to determine if we should continue processing the {@link 
Exchange}.
      */
-    protected boolean continueProcessing(Exchange exchange, AsyncProcessor 
processor) {
+    protected boolean continueProcessing(Exchange exchange) {
         if (exchange.isRouteStop()) {
             LOG.debug("Exchange is marked to stop routing: {}", exchange);
             return false;
         }
 
         if (shutdownStrategy.isForceShutdown()) {
-            String msg = "Run not allowed as ShutdownStrategy is forcing 
shutting down, will reject executing exchange: "
-                         + exchange;
-            LOG.debug(msg);
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException(msg));
+            if (LOG.isDebugEnabled() || exchange.getException() == null) {
+                String msg = "Run not allowed as ShutdownStrategy is forcing 
shutting down, will reject executing exchange: "
+                             + exchange;
+                LOG.debug(msg);
+                if (exchange.getException() == null) {
+                    exchange.setException(new RejectedExecutionException(msg));
+                }
             }
             return false;
         }

Reply via email to