This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit b4b66d5b9dc9ce44a5ed497076fd837c73b70b0c Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Tue May 14 14:39:08 2024 +0200 Streamline SharedCamelInternalProcessor which was copied from CamelInternalProcessor but has a more limited scope --- .../apache/camel/impl/engine/AdviceIterator.java | 16 ++-- .../impl/engine/SharedCamelInternalProcessor.java | 102 +++++---------------- 2 files changed, 33 insertions(+), 85 deletions(-) diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java index 02461653431..d89fdcc92f3 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AdviceIterator.java @@ -36,12 +36,16 @@ final class AdviceIterator { if (task.hasState()) { state = states[stateIndex--]; } - try { - task.after(exchange, state); - } catch (Exception e) { - exchange.setException(e); - // allow all advices to complete even if there was an exception - } + runAfterTask(task, state, exchange); + } + } + + static void runAfterTask(CamelInternalProcessorAdvice task, Object state, Exchange exchange) { + try { + task.after(exchange, state); + } catch (Exception e) { + exchange.setException(e); + // allow all advices to complete even if there was an exception } } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java index d64b2c174cb..5ab5b8ddf66 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java @@ -16,8 +16,7 @@ */ package org.apache.camel.impl.engine; -import java.util.ArrayList; -import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; @@ -25,7 +24,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; -import org.apache.camel.Ordered; import org.apache.camel.Processor; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelInternalProcessorAdvice; @@ -36,7 +34,6 @@ import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.Transformer; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; -import org.apache.camel.support.OrderedComparator; import org.apache.camel.support.PluginHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,37 +63,20 @@ import org.slf4j.LoggerFactory; * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then * make sure to read the source code of this class about the debugging tips, which you can find in the * {@link #process(Exchange, AsyncCallback, AsyncProcessor, Processor)} method. - * <p/> - * The added advices can implement {@link Ordered} to control in which order the advices are executed. */ public class SharedCamelInternalProcessor implements SharedInternalProcessor { private static final Logger LOG = LoggerFactory.getLogger(SharedCamelInternalProcessor.class); - private static final Object[] EMPTY_STATES = new Object[0]; private final ReactiveExecutor reactiveExecutor; private final AsyncProcessorAwaitManager awaitManager; private final ShutdownStrategy shutdownStrategy; - private final List<CamelInternalProcessorAdvice<?>> advices; - private byte statefulAdvices; + private final CamelInternalProcessorAdvice<?> advice; - public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice<?>... advices) { + public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice<?> advice) { this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor(); this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext); this.shutdownStrategy = camelContext.getShutdownStrategy(); - - if (advices != null) { - this.advices = new ArrayList<>(advices.length); - for (CamelInternalProcessorAdvice<?> advice : advices) { - this.advices.add(advice); - if (advice.hasState()) { - statefulAdvices++; - } - } - // ensure advices are sorted, so they are in the order we want - this.advices.sort(OrderedComparator.get()); - } else { - this.advices = null; - } + this.advice = Objects.requireNonNull(advice, "advice"); } /** @@ -129,43 +109,23 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { */ public boolean process( Exchange exchange, AsyncCallback originalCallback, AsyncProcessor processor, Processor resultProcessor) { - // ---------------------------------------------------------- - // CAMEL END USER - READ ME FOR DEBUGGING TIPS - // ---------------------------------------------------------- - // If you want to debug the Camel routing engine, then there is a lot of internal functionality - // the routing engine executes during routing messages. You can skip debugging this internal - // functionality and instead debug where the routing engine continues routing to the next node - // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its - // being used in between the nodes. As an end user you can just debug the code in this class - // in between the: - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // you can see in the code below. - // ---------------------------------------------------------- - - if (processor == null || !continueProcessing(exchange, processor)) { + if (processor == null || !continueProcessing(exchange)) { // no processor or we should not continue then we are done originalCallback.done(true); return true; } // optimise to use object array for states, and only for the number of advices that keep state - final Object[] states = statefulAdvices > 0 ? new Object[statefulAdvices] : EMPTY_STATES; + final Object state; // optimise for loop using index access to avoid creating iterator object - for (int i = 0, j = 0; i < advices.size(); i++) { - CamelInternalProcessorAdvice<?> task = advices.get(i); - try { - Object state = task.before(exchange); - if (task.hasState()) { - states[j++] = state; - } - } catch (Exception e) { - return handleException(exchange, originalCallback, e); - } + try { + state = advice.before(exchange); + } catch (Exception e) { + return handleException(exchange, originalCallback, e); } // create internal callback which will execute the advices in reverse order when done - AsyncCallback callback = new InternalCallback(states, exchange, originalCallback, resultProcessor); + AsyncCallback callback = new InternalCallback(state, exchange, originalCallback, resultProcessor); if (exchange.isTransacted()) { return processTransacted(exchange, processor, callback); @@ -190,16 +150,10 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { async = uow.beforeProcess(processor, exchange, async); } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // ---------------------------------------------------------- if (LOG.isTraceEnabled()) { LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); } boolean sync = processor.process(exchange, async); - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // ---------------------------------------------------------- // optimize to only do after uow processing if really needed if (beforeAndAfter) { @@ -226,17 +180,11 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { exchange.getExchangeId(), exchange); } } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // ---------------------------------------------------------- try { processor.process(exchange); } catch (Exception e) { exchange.setException(e); } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // ---------------------------------------------------------- callback.done(true); return true; } @@ -246,13 +194,13 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { */ private final class InternalCallback implements AsyncCallback { - private final Object[] states; + private final Object state; private final Exchange exchange; private final AsyncCallback callback; private final Processor resultProcessor; - private InternalCallback(Object[] states, Exchange exchange, AsyncCallback callback, Processor resultProcessor) { - this.states = states; + private InternalCallback(Object state, Exchange exchange, AsyncCallback callback, Processor resultProcessor) { + this.state = state; this.exchange = exchange; this.callback = callback; this.resultProcessor = resultProcessor; @@ -273,18 +221,12 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { // we should call after in reverse order try { - AdviceIterator.runAfterTasks(advices, states, exchange); + AdviceIterator.runAfterTask(advice, state, exchange); } finally { - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ START +++ - // ---------------------------------------------------------- // callback must be called if (callback != null) { reactiveExecutor.schedule(callback); } - // ---------------------------------------------------------- - // CAMEL END USER - DEBUG ME HERE +++ END +++ - // ---------------------------------------------------------- } } } @@ -292,18 +234,20 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor { /** * Strategy to determine if we should continue processing the {@link Exchange}. */ - protected boolean continueProcessing(Exchange exchange, AsyncProcessor processor) { + protected boolean continueProcessing(Exchange exchange) { if (exchange.isRouteStop()) { LOG.debug("Exchange is marked to stop routing: {}", exchange); return false; } if (shutdownStrategy.isForceShutdown()) { - String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " - + exchange; - LOG.debug(msg); - if (exchange.getException() == null) { - exchange.setException(new RejectedExecutionException(msg)); + if (LOG.isDebugEnabled() || exchange.getException() == null) { + String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + + exchange; + LOG.debug(msg); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException(msg)); + } } return false; }