This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1e4b501bb36ecdc289aef1e0ea7c84027d1fc281
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Apr 7 14:11:04 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object 
allocations.
---
 .../apache/camel/processor/MulticastProcessor.java |  10 +-
 .../org/apache/camel/processor/RecipientList.java  |  43 +------
 .../camel/processor/RecipientListProcessor.java    | 142 ++++++++++++++-------
 .../java/org/apache/camel/processor/Splitter.java  |   2 +-
 4 files changed, 105 insertions(+), 92 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index a7e913b..cd84349 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,13 +320,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        return process(exchange, callback, null, 0);
-    }
-
-    protected boolean process(Exchange exchange, AsyncCallback callback, 
Iterator iter, int size) {
         Iterable<ProcessorExchangePair> pairs;
+        // TODO: optimize size
+        int size = 0;
         try {
-            pairs = createProcessorExchangePairs(exchange, iter, size);
+            pairs = createProcessorExchangePairs(exchange);
         } catch (Throwable e) {
             exchange.setException(e);
             // unexpected exception was thrown, maybe from iterator etc. so do 
not regard as exhausted
@@ -925,7 +923,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, 
Integer.class);
     }
 
-    protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
+    protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange)
             throws Exception {
         List<ProcessorExchangePair> result = new 
ArrayList<>(processors.size());
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index 4a6c551..c8a36de 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,16 +16,13 @@
  */
 package org.apache.camel.processor;
 
-import java.lang.reflect.Array;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
@@ -36,7 +33,6 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.ProducerCache;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
-import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.support.cache.DefaultProducerCache;
 import org.apache.camel.support.cache.EmptyProducerCache;
 import org.apache.camel.support.service.ServiceHelper;
@@ -54,13 +50,12 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RecipientList.class);
 
-    private static final String IGNORE_DELIMITER_MARKER = "false";
     private final CamelContext camelContext;
     private String id;
     private String routeId;
     private Processor errorHandler;
     private ProducerCache producerCache;
-    private Expression expression;
+    private final Expression expression;
     private final String delimiter;
     private boolean parallelProcessing;
     private boolean parallelAggregate;
@@ -88,6 +83,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
         StringHelper.notEmpty(delimiter, "delimiter");
         this.camelContext = camelContext;
         this.delimiter = delimiter;
+        this.expression = null;
     }
 
     public RecipientList(CamelContext camelContext, Expression expression) {
@@ -177,38 +173,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
         if (!isStarted()) {
             throw new IllegalStateException("RecipientList has not been 
started: " + this);
         }
-
-        // use the evaluate expression result if exists
-        Object recipientList = 
exchange.removeProperty(ExchangePropertyKey.EVALUATE_EXPRESSION_RESULT);
-        if (recipientList == null && expression != null) {
-            // fallback and evaluate the expression
-            recipientList = expression.evaluate(exchange, Object.class);
-        }
-
-        return sendToRecipientList(exchange, recipientList, callback);
-    }
-
-    /**
-     * Sends the given exchange to the recipient list
-     */
-    public boolean sendToRecipientList(Exchange exchange, Object 
recipientList, AsyncCallback callback) {
-        // optimize to calculate number of recipients if possible
-        int size = 0;
-        if (recipientList instanceof Collection) {
-            size = ((Collection<?>) recipientList).size();
-        } else if (recipientList.getClass().isArray()) {
-            size = Array.getLength(recipientList);
-        }
-        Iterator<?> iter;
-
-        if (delimiter != null && 
delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
-            iter = ObjectHelper.createIterator(recipientList, null);
-        } else {
-            iter = ObjectHelper.createIterator(recipientList, delimiter);
-        }
-
-        // now let the multicast process the exchange
-        return recipientListProcessor.process(exchange, callback, iter, size);
+        return recipientListProcessor.process(exchange, callback);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
@@ -238,7 +203,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
         }
 
         recipientListProcessor = new RecipientListProcessor(
-                camelContext, null, producerCache, getAggregationStrategy(),
+                camelContext, null, expression, delimiter, producerCache, 
getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(),
                 isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
                 isStopOnAggregateException());
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index 2e7021d..720c937 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.processor;
 
+import java.lang.reflect.Array;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -28,6 +30,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExchangePropertyKey;
+import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Processor;
@@ -39,6 +42,7 @@ import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.EndpointHelper;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.ObjectHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +60,12 @@ import org.slf4j.LoggerFactory;
 public class RecipientListProcessor extends MulticastProcessor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RecipientListProcessor.class);
+
+    private static final String IGNORE_DELIMITER_MARKER = "false";
+
     private boolean ignoreInvalidEndpoints;
+    private final Expression expression;
+    private final String delimiter;
     private final ProducerCache producerCache;
     private int cacheSize;
 
@@ -147,19 +156,8 @@ public class RecipientListProcessor extends 
MulticastProcessor {
     }
 
     // TODO: camel-bean @RecipientList cacheSize
-
-    public RecipientListProcessor(CamelContext camelContext, Route route, 
ProducerCache producerCache) {
-        super(camelContext, route, null);
-        this.producerCache = producerCache;
-    }
-
-    public RecipientListProcessor(CamelContext camelContext, Route route, 
ProducerCache producerCache,
-                                  AggregationStrategy aggregationStrategy) {
-        super(camelContext, route, null, aggregationStrategy);
-        this.producerCache = producerCache;
-    }
-
-    public RecipientListProcessor(CamelContext camelContext, Route route, 
ProducerCache producerCache,
+    public RecipientListProcessor(CamelContext camelContext, Route route, 
Expression expression, String delimiter,
+                                  ProducerCache producerCache,
                                   AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException,
@@ -168,6 +166,8 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         super(camelContext, route, null, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService,
               streaming, stopOnException, timeout, onPrepare,
               shareUnitOfWork, parallelAggregate, stopOnAggregateException);
+        this.expression = expression;
+        this.delimiter = delimiter;
         this.producerCache = producerCache;
     }
 
@@ -188,47 +188,97 @@ public class RecipientListProcessor extends 
MulticastProcessor {
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
+    protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange)
             throws Exception {
-        // here we iterate the recipient lists and create the exchange pair 
for each of those
-        List<ProcessorExchangePair> result = size > 0 ? new ArrayList<>(size) 
: new ArrayList<>();
 
-        // at first we must lookup the endpoint and acquire the producer which 
can send to the endpoint
+        // use the evaluate expression result if exists
+        Object recipientList = 
exchange.removeProperty(ExchangePropertyKey.EVALUATE_EXPRESSION_RESULT);
+        if (recipientList == null && expression != null) {
+            // fallback and evaluate the expression
+            recipientList = expression.evaluate(exchange, Object.class);
+        }
+
+        // optimize for recipient without need for using delimiter
+        // (if its list/collection/array type)
+        if (recipientList instanceof List) {
+            List col = (List) recipientList;
+            int size = col.size();
+            List<ProcessorExchangePair> result = new ArrayList<>(size);
+            int index = 0;
+            for (int i = 0; i < size; i++) {
+                Object recipient = col.get(i);
+                index = doCreateProcessorExchangePairs(exchange, recipient, 
result, index);
+            }
+            return result;
+        } else if (recipientList instanceof Collection) {
+            Collection col = (Collection) recipientList;
+            int size = col.size();
+            List<ProcessorExchangePair> result = new ArrayList<>(size);
+            int index = 0;
+            for (Object recipient : col) {
+                index = doCreateProcessorExchangePairs(exchange, recipient, 
result, index);
+            }
+            return result;
+        } else if (recipientList.getClass().isArray()) {
+            Object[] arr = (Object[]) recipientList;
+            int size = Array.getLength(recipientList);
+            List<ProcessorExchangePair> result = new ArrayList<>(size);
+            int index = 0;
+            for (int i = 0; i < size; i++) {
+                Object recipient = arr[i];
+                index = doCreateProcessorExchangePairs(exchange, recipient, 
result, index);
+            }
+            return result;
+        }
+
+        // okay we have to use iterator based separated by delimiter
+        Iterator<?> iter;
+        if (delimiter != null && 
delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
+            iter = ObjectHelper.createIterator(recipientList, null);
+        } else {
+            iter = ObjectHelper.createIterator(recipientList, delimiter);
+        }
+        List<ProcessorExchangePair> result = new ArrayList<>();
         int index = 0;
         while (iter.hasNext()) {
-            boolean prototype = cacheSize < 0;
+            index = doCreateProcessorExchangePairs(exchange, iter.next(), 
result, index);
+        }
+        return result;
+    }
 
-            Object recipient = iter.next();
-            Endpoint endpoint;
-            Producer producer;
-            ExchangePattern pattern;
-            try {
-                recipient = prepareRecipient(exchange, recipient);
-                Endpoint existing = getExistingEndpoint(exchange, recipient);
-                if (existing == null) {
-                    endpoint = resolveEndpoint(exchange, recipient, prototype);
-                } else {
-                    endpoint = existing;
-                    // we have an existing endpoint then its not a prototype 
scope
-                    prototype = false;
-                }
-                pattern = resolveExchangePattern(recipient);
-                producer = producerCache.acquireProducer(endpoint);
-            } catch (Exception e) {
-                if (isIgnoreInvalidEndpoints()) {
-                    LOG.debug("Endpoint uri is invalid: {}. This exception 
will be ignored.", recipient, e);
-                    continue;
-                } else {
-                    // failure so break out
-                    throw e;
-                }
+    private int doCreateProcessorExchangePairs(
+            Exchange exchange, Object recipient, List<ProcessorExchangePair> 
result, int index)
+            throws NoTypeConversionAvailableException {
+        boolean prototype = cacheSize < 0;
+
+        Endpoint endpoint;
+        Producer producer;
+        ExchangePattern pattern;
+        try {
+            recipient = prepareRecipient(exchange, recipient);
+            Endpoint existing = getExistingEndpoint(exchange, recipient);
+            if (existing == null) {
+                endpoint = resolveEndpoint(exchange, recipient, prototype);
+            } else {
+                endpoint = existing;
+                // we have an existing endpoint then its not a prototype scope
+                prototype = false;
+            }
+            pattern = resolveExchangePattern(recipient);
+            producer = producerCache.acquireProducer(endpoint);
+        } catch (Exception e) {
+            if (isIgnoreInvalidEndpoints()) {
+                LOG.debug("Endpoint uri is invalid: {}. This exception will be 
ignored.", recipient, e);
+                return index;
+            } else {
+                // failure so break out
+                throw e;
             }
-
-            // then create the exchange pair
-            result.add(createProcessorExchangePair(index++, endpoint, 
producer, exchange, pattern, prototype));
         }
 
-        return result;
+        // then create the exchange pair
+        result.add(createProcessorExchangePair(index++, endpoint, producer, 
exchange, pattern, prototype));
+        return index;
     }
 
     /**
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index dd69a2f..1d2b444 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -159,7 +159,7 @@ public class Splitter extends MulticastProcessor implements 
AsyncProcessor, Trac
     }
 
     @Override
-    protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size)
+    protected Iterable<ProcessorExchangePair> 
createProcessorExchangePairs(Exchange exchange)
             throws Exception {
         // iter is only currently used by Recipient List EIP so its null
 

Reply via email to