This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 1e4b501bb36ecdc289aef1e0ea7c84027d1fc281 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Apr 7 14:11:04 2021 +0200 CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object allocations. --- .../apache/camel/processor/MulticastProcessor.java | 10 +- .../org/apache/camel/processor/RecipientList.java | 43 +------ .../camel/processor/RecipientListProcessor.java | 142 ++++++++++++++------- .../java/org/apache/camel/processor/Splitter.java | 2 +- 4 files changed, 105 insertions(+), 92 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index a7e913b..cd84349 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -320,13 +320,11 @@ public class MulticastProcessor extends AsyncProcessorSupport @Override public boolean process(Exchange exchange, AsyncCallback callback) { - return process(exchange, callback, null, 0); - } - - protected boolean process(Exchange exchange, AsyncCallback callback, Iterator iter, int size) { Iterable<ProcessorExchangePair> pairs; + // TODO: optimize size + int size = 0; try { - pairs = createProcessorExchangePairs(exchange, iter, size); + pairs = createProcessorExchangePairs(exchange); } catch (Throwable e) { exchange.setException(e); // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted @@ -925,7 +923,7 @@ public class MulticastProcessor extends AsyncProcessorSupport return exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class); } - protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size) + protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { List<ProcessorExchangePair> result = new ArrayList<>(processors.size()); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java index 4a6c551..c8a36de 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java @@ -16,16 +16,13 @@ */ package org.apache.camel.processor; -import java.lang.reflect.Array; import java.util.Collection; -import java.util.Iterator; import java.util.concurrent.ExecutorService; import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; @@ -36,7 +33,6 @@ import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; import org.apache.camel.spi.RouteIdAware; import org.apache.camel.support.AsyncProcessorSupport; -import org.apache.camel.support.ObjectHelper; import org.apache.camel.support.cache.DefaultProducerCache; import org.apache.camel.support.cache.EmptyProducerCache; import org.apache.camel.support.service.ServiceHelper; @@ -54,13 +50,12 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class); - private static final String IGNORE_DELIMITER_MARKER = "false"; private final CamelContext camelContext; private String id; private String routeId; private Processor errorHandler; private ProducerCache producerCache; - private Expression expression; + private final Expression expression; private final String delimiter; private boolean parallelProcessing; private boolean parallelAggregate; @@ -88,6 +83,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou StringHelper.notEmpty(delimiter, "delimiter"); this.camelContext = camelContext; this.delimiter = delimiter; + this.expression = null; } public RecipientList(CamelContext camelContext, Expression expression) { @@ -177,38 +173,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou if (!isStarted()) { throw new IllegalStateException("RecipientList has not been started: " + this); } - - // use the evaluate expression result if exists - Object recipientList = exchange.removeProperty(ExchangePropertyKey.EVALUATE_EXPRESSION_RESULT); - if (recipientList == null && expression != null) { - // fallback and evaluate the expression - recipientList = expression.evaluate(exchange, Object.class); - } - - return sendToRecipientList(exchange, recipientList, callback); - } - - /** - * Sends the given exchange to the recipient list - */ - public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) { - // optimize to calculate number of recipients if possible - int size = 0; - if (recipientList instanceof Collection) { - size = ((Collection<?>) recipientList).size(); - } else if (recipientList.getClass().isArray()) { - size = Array.getLength(recipientList); - } - Iterator<?> iter; - - if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) { - iter = ObjectHelper.createIterator(recipientList, null); - } else { - iter = ObjectHelper.createIterator(recipientList, delimiter); - } - - // now let the multicast process the exchange - return recipientListProcessor.process(exchange, callback, iter, size); + return recipientListProcessor.process(exchange, callback); } public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { @@ -238,7 +203,7 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou } recipientListProcessor = new RecipientListProcessor( - camelContext, null, producerCache, getAggregationStrategy(), + camelContext, null, expression, delimiter, producerCache, getAggregationStrategy(), isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(), isStopOnAggregateException()); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 2e7021d..720c937 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -16,7 +16,9 @@ */ package org.apache.camel.processor; +import java.lang.reflect.Array; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; @@ -28,6 +30,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; +import org.apache.camel.Expression; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; @@ -39,6 +42,7 @@ import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.EndpointHelper; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.MessageHelper; +import org.apache.camel.support.ObjectHelper; import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +60,12 @@ import org.slf4j.LoggerFactory; public class RecipientListProcessor extends MulticastProcessor { private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class); + + private static final String IGNORE_DELIMITER_MARKER = "false"; + private boolean ignoreInvalidEndpoints; + private final Expression expression; + private final String delimiter; private final ProducerCache producerCache; private int cacheSize; @@ -147,19 +156,8 @@ public class RecipientListProcessor extends MulticastProcessor { } // TODO: camel-bean @RecipientList cacheSize - - public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache) { - super(camelContext, route, null); - this.producerCache = producerCache; - } - - public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, - AggregationStrategy aggregationStrategy) { - super(camelContext, route, null, aggregationStrategy); - this.producerCache = producerCache; - } - - public RecipientListProcessor(CamelContext camelContext, Route route, ProducerCache producerCache, + public RecipientListProcessor(CamelContext camelContext, Route route, Expression expression, String delimiter, + ProducerCache producerCache, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, @@ -168,6 +166,8 @@ public class RecipientListProcessor extends MulticastProcessor { super(camelContext, route, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate, stopOnAggregateException); + this.expression = expression; + this.delimiter = delimiter; this.producerCache = producerCache; } @@ -188,47 +188,97 @@ public class RecipientListProcessor extends MulticastProcessor { } @Override - protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size) + protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { - // here we iterate the recipient lists and create the exchange pair for each of those - List<ProcessorExchangePair> result = size > 0 ? new ArrayList<>(size) : new ArrayList<>(); - // at first we must lookup the endpoint and acquire the producer which can send to the endpoint + // use the evaluate expression result if exists + Object recipientList = exchange.removeProperty(ExchangePropertyKey.EVALUATE_EXPRESSION_RESULT); + if (recipientList == null && expression != null) { + // fallback and evaluate the expression + recipientList = expression.evaluate(exchange, Object.class); + } + + // optimize for recipient without need for using delimiter + // (if its list/collection/array type) + if (recipientList instanceof List) { + List col = (List) recipientList; + int size = col.size(); + List<ProcessorExchangePair> result = new ArrayList<>(size); + int index = 0; + for (int i = 0; i < size; i++) { + Object recipient = col.get(i); + index = doCreateProcessorExchangePairs(exchange, recipient, result, index); + } + return result; + } else if (recipientList instanceof Collection) { + Collection col = (Collection) recipientList; + int size = col.size(); + List<ProcessorExchangePair> result = new ArrayList<>(size); + int index = 0; + for (Object recipient : col) { + index = doCreateProcessorExchangePairs(exchange, recipient, result, index); + } + return result; + } else if (recipientList.getClass().isArray()) { + Object[] arr = (Object[]) recipientList; + int size = Array.getLength(recipientList); + List<ProcessorExchangePair> result = new ArrayList<>(size); + int index = 0; + for (int i = 0; i < size; i++) { + Object recipient = arr[i]; + index = doCreateProcessorExchangePairs(exchange, recipient, result, index); + } + return result; + } + + // okay we have to use iterator based separated by delimiter + Iterator<?> iter; + if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) { + iter = ObjectHelper.createIterator(recipientList, null); + } else { + iter = ObjectHelper.createIterator(recipientList, delimiter); + } + List<ProcessorExchangePair> result = new ArrayList<>(); int index = 0; while (iter.hasNext()) { - boolean prototype = cacheSize < 0; + index = doCreateProcessorExchangePairs(exchange, iter.next(), result, index); + } + return result; + } - Object recipient = iter.next(); - Endpoint endpoint; - Producer producer; - ExchangePattern pattern; - try { - recipient = prepareRecipient(exchange, recipient); - Endpoint existing = getExistingEndpoint(exchange, recipient); - if (existing == null) { - endpoint = resolveEndpoint(exchange, recipient, prototype); - } else { - endpoint = existing; - // we have an existing endpoint then its not a prototype scope - prototype = false; - } - pattern = resolveExchangePattern(recipient); - producer = producerCache.acquireProducer(endpoint); - } catch (Exception e) { - if (isIgnoreInvalidEndpoints()) { - LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e); - continue; - } else { - // failure so break out - throw e; - } + private int doCreateProcessorExchangePairs( + Exchange exchange, Object recipient, List<ProcessorExchangePair> result, int index) + throws NoTypeConversionAvailableException { + boolean prototype = cacheSize < 0; + + Endpoint endpoint; + Producer producer; + ExchangePattern pattern; + try { + recipient = prepareRecipient(exchange, recipient); + Endpoint existing = getExistingEndpoint(exchange, recipient); + if (existing == null) { + endpoint = resolveEndpoint(exchange, recipient, prototype); + } else { + endpoint = existing; + // we have an existing endpoint then its not a prototype scope + prototype = false; + } + pattern = resolveExchangePattern(recipient); + producer = producerCache.acquireProducer(endpoint); + } catch (Exception e) { + if (isIgnoreInvalidEndpoints()) { + LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e); + return index; + } else { + // failure so break out + throw e; } - - // then create the exchange pair - result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype)); } - return result; + // then create the exchange pair + result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern, prototype)); + return index; } /** diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java index dd69a2f..1d2b444 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java @@ -159,7 +159,7 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac } @Override - protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange, Iterator<?> iter, int size) + protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { // iter is only currently used by Recipient List EIP so its null
