This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 79b25c6  CAMEL-16462: camel-core - Optimize RecipientList EIP to 
reduce object allocations.
79b25c6 is described below

commit 79b25c60bc6afb678e77d505bb3ff21c54e823e9
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Apr 7 11:00:46 2021 +0200

    CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object 
allocations.
---
 .../apache/camel/processor/MulticastProcessor.java | 28 ++++++++++-------
 .../org/apache/camel/processor/RecipientList.java  | 10 ++++++-
 .../util/concurrent/AsyncCompletionService.java    | 35 ++++++++++++++--------
 3 files changed, 50 insertions(+), 23 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index c010738..96d3502 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,10 +320,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        return process(exchange, callback, null);
+        return process(exchange, callback, null, 0);
     }
 
-    protected boolean process(Exchange exchange, AsyncCallback callback, 
Iterator iter) {
+    protected boolean process(Exchange exchange, AsyncCallback callback, 
Iterator iter, int size) {
         Iterable<ProcessorExchangePair> pairs;
         try {
             pairs = createProcessorExchangePairs(exchange, iter);
@@ -343,7 +343,8 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         // which is how the routing engine normally operates
         // if we have parallel processing enabled then we cannot run in 
transacted mode (requires synchronous processing via same thread)
         MulticastTask state = !isParallelProcessing() && 
exchange.isTransacted()
-                ? new MulticastTransactedTask(exchange, pairs, callback) : new 
MulticastReactiveTask(exchange, pairs, callback);
+                ? new MulticastTransactedTask(exchange, pairs, callback, size)
+                : new MulticastReactiveTask(exchange, pairs, callback, size);
         if (isParallelProcessing()) {
             executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
@@ -407,8 +408,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         final AsyncCallback callback;
         final Iterator<ProcessorExchangePair> iterator;
         final ReentrantLock lock = new ReentrantLock();
-        final AsyncCompletionService<Exchange> completion
-                = new AsyncCompletionService<>(scheduler, !isStreaming(), 
lock);
+        final AsyncCompletionService<Exchange> completion;
         final AtomicReference<Exchange> result = new AtomicReference<>();
         final AtomicInteger nbExchangeSent = new AtomicInteger();
         final AtomicInteger nbAggregated = new AtomicInteger();
@@ -423,9 +423,10 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             this.callback = null;
             this.iterator = null;
             this.mdc = null;
+            this.completion = null;
         }
 
-        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback) {
+        MulticastTask(Exchange original, Iterable<ProcessorExchangePair> 
pairs, AsyncCallback callback, int capacity) {
             this.original = original;
             this.pairs = pairs;
             this.callback = callback;
@@ -441,6 +442,11 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             } else {
                 this.mdc = null;
             }
+            if (capacity > 0) {
+                this.completion = new AsyncCompletionService<>(scheduler, 
!isStreaming(), lock, capacity);
+            } else {
+                this.completion = new AsyncCompletionService<>(scheduler, 
!isStreaming(), lock);
+            }
         }
 
         @Override
@@ -519,8 +525,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         private MulticastReactiveTask() {
         }
 
-        public MulticastReactiveTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
-            super(original, pairs, callback);
+        public MulticastReactiveTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
+                                     int size) {
+            super(original, pairs, callback, size);
         }
 
         @Override
@@ -621,8 +628,9 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         private MulticastTransactedTask() {
         }
 
-        public MulticastTransactedTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
-            super(original, pairs, callback);
+        public MulticastTransactedTask(Exchange original, 
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
+                                       int size) {
+            super(original, pairs, callback, size);
         }
 
         @Override
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index 78ef5bc..4a6c551 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.lang.reflect.Array;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.ExecutorService;
@@ -191,6 +192,13 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
      * Sends the given exchange to the recipient list
      */
     public boolean sendToRecipientList(Exchange exchange, Object 
recipientList, AsyncCallback callback) {
+        // optimize to calculate number of recipients if possible
+        int size = 0;
+        if (recipientList instanceof Collection) {
+            size = ((Collection<?>) recipientList).size();
+        } else if (recipientList.getClass().isArray()) {
+            size = Array.getLength(recipientList);
+        }
         Iterator<?> iter;
 
         if (delimiter != null && 
delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
@@ -200,7 +208,7 @@ public class RecipientList extends AsyncProcessorSupport 
implements IdAware, Rou
         }
 
         // now let the multicast process the exchange
-        return recipientListProcessor.process(exchange, callback, iter);
+        return recipientListProcessor.process(exchange, callback, iter, size);
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
 
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
index 9ce4fd3..4cdc311 100644
--- 
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
+++ 
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
@@ -19,7 +19,8 @@ package org.apache.camel.util.concurrent;
 import java.util.PriorityQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -28,21 +29,30 @@ public class AsyncCompletionService<V> {
 
     private final Executor executor;
     private final boolean ordered;
-    private final PriorityQueue<Task> queue = new PriorityQueue<>();
-    private final AtomicLong nextId = new AtomicLong();
-    private final AtomicLong index = new AtomicLong();
+    private final PriorityQueue<Task> queue;
+    private final AtomicInteger nextId = new AtomicInteger();
+    private final AtomicInteger index = new AtomicInteger();
     private final ReentrantLock lock;
     private final Condition available;
 
     public AsyncCompletionService(Executor executor, boolean ordered) {
-        this(executor, ordered, null);
+        this(executor, ordered, null, 0);
     }
 
     public AsyncCompletionService(Executor executor, boolean ordered, 
ReentrantLock lock) {
+        this(executor, ordered, lock, 0);
+    }
+
+    public AsyncCompletionService(Executor executor, boolean ordered, 
ReentrantLock lock, int capacity) {
         this.executor = executor;
         this.ordered = ordered;
         this.lock = lock != null ? lock : new ReentrantLock();
         this.available = this.lock.newCondition();
+        if (capacity > 0) {
+            queue = new PriorityQueue<>(capacity);
+        } else {
+            queue = new PriorityQueue<>();
+        }
     }
 
     public ReentrantLock getLock() {
@@ -135,34 +145,35 @@ public class AsyncCompletionService<V> {
         }
     }
 
-    private class Task implements Runnable, Comparable<Task> {
-        private final long id;
+    private class Task implements Runnable, Comparable<Task>, Consumer<V> {
+        private final int id;
         private final Consumer<Consumer<V>> runner;
         private V result;
 
-        Task(long id, Consumer<Consumer<V>> runner) {
+        Task(int id, Consumer<Consumer<V>> runner) {
             this.id = id;
             this.runner = runner;
         }
 
         @Override
         public void run() {
-            runner.accept(this::setResult);
+            runner.accept(this);
         }
 
-        protected void setResult(V result) {
+        @Override
+        public void accept(V result) {
             this.result = result;
             complete(this);
         }
 
         @Override
         public int compareTo(Task other) {
-            return Long.compare(this.id, other.id);
+            return Integer.compare(this.id, other.id);
         }
 
         @Override
         public String toString() {
-            return "SubmitOrderedFutureTask[" + this.id + "]";
+            return "SubmitOrderedTask[" + this.id + "]";
         }
     }
 }

Reply via email to