This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 75a34636810791ed2407d2e6a687637e012aad4d
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Apr 7 14:34:09 2021 +0200

    CAMEL-16462: camel-core - Optimize Multicast EIP to reduce object 
allocations.
---
 .../apache/camel/processor/MulticastProcessor.java | 30 +++++-----------------
 1 file changed, 7 insertions(+), 23 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index cd84349..0d2eaba 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -71,7 +71,6 @@ import org.apache.camel.support.PatternHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AsyncCompletionService;
 import org.slf4j.Logger;
@@ -140,19 +139,6 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     }
 
-    /**
-     * Class that represents prepared fine grained error handlers when 
processing multicasted/splitted exchanges
-     * <p/>
-     * See the <tt>createProcessorExchangePair</tt> and 
<tt>createErrorHandler</tt> methods.
-     */
-    static final class ErrorHandlerKey extends KeyValueHolder<Route, 
Processor> {
-
-        ErrorHandlerKey(Route key, Processor value) {
-            super(key, value);
-        }
-
-    }
-
     private final class Scheduler implements Executor {
 
         @Override
@@ -183,7 +169,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
     private ExecutorService aggregateExecutorService;
     private boolean shutdownAggregateExecutorService;
     private final long timeout;
-    private final ConcurrentMap<ErrorHandlerKey, Processor> errorHandlers = 
new ConcurrentHashMap<>();
+    private final ConcurrentMap<Processor, Processor> errorHandlers = new 
ConcurrentHashMap<>();
     private final boolean shareUnitOfWork;
 
     public MulticastProcessor(CamelContext camelContext, Route route, 
Collection<Processor> processors) {
@@ -1013,32 +999,30 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     protected Processor wrapInErrorHandler(Route route, Exchange exchange, 
Processor processor) {
         Processor answer;
+        Processor key = processor;
 
         if (route != this.route && this.route != null) {
             throw new UnsupportedOperationException("Is this really correct 
?");
         }
-        boolean tryBlock = 
exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK, boolean.class);
+        Boolean tryBlock = (Boolean) 
exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
 
         // do not wrap in error handler if we are inside a try block
-        if (!tryBlock && route != null) {
+        if (route != null && (tryBlock == null || !tryBlock)) {
             // wrap the producer in error handler so we have fine grained 
error handling on
             // the output side instead of the input side
             // this is needed to support redelivery on that output alone and 
not doing redelivery
             // for the entire multicast block again which will start from 
scratch again
 
-            // create key for cache
-            final ErrorHandlerKey key = new ErrorHandlerKey(route, processor);
-
             // lookup cached first to reuse and preserve memory
             answer = errorHandlers.get(key);
             if (answer != null) {
-                LOG.trace("Using existing error handler for: {}", processor);
+                LOG.trace("Using existing error handler for: {}", key);
                 return answer;
             }
 
-            LOG.trace("Creating error handler for: {}", processor);
+            LOG.trace("Creating error handler for: {}", key);
             try {
-                processor = wrapInErrorHandler(route, processor);
+                processor = wrapInErrorHandler(route, key);
 
                 // and wrap in unit of work processor so the copy exchange 
also can run under UoW
                 answer = createUnitOfWorkProcessor(route, processor, exchange);

Reply via email to