This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new ee5a18c CAMEL-16279: camel-core - Optimize core to reduce object
allocations by pooloing reusable tasks in the routing engine.
ee5a18c is described below
commit ee5a18c8b4989ce2933203c47220830c9c7a8a82
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Mar 9 09:08:06 2021 +0100
CAMEL-16279: camel-core - Optimize core to reduce object allocations by
pooloing reusable tasks in the routing engine.
---
.../java/org/apache/camel/spi/ExchangeFactory.java | 5 --
.../org/apache/camel/spi/PooledObjectFactory.java | 5 ++
.../engine/CamelInternalPooledTaskFactory.java | 47 +++++++++++++
.../camel/impl/engine/CamelInternalProcessor.java | 79 ++++++++++++++++++----
.../camel/impl/engine/CamelInternalTask.java | 45 ++++++++++++
.../camel/impl/engine/PooledExchangeFactory.java | 35 ----------
.../java/org/apache/camel/processor/Pipeline.java | 15 ++++
.../camel/main/DefaultConfigurationConfigurer.java | 2 +-
.../camel/support/PooledObjectFactorySupport.java | 17 ++++-
.../support/PrototypeObjectFactorySupport.java | 5 ++
10 files changed, 199 insertions(+), 56 deletions(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 29b3fdb..75b9d21 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -79,9 +79,4 @@ public interface ExchangeFactory extends
PooledObjectFactory<Exchange>, NonManag
return true;
}
- /**
- * Whether the factory is pooled.
- */
- boolean isPooled();
-
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
index db4c0d1..a9c38bc 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -108,6 +108,11 @@ public interface PooledObjectFactory<T> extends Service,
CamelContextAware {
Statistics getStatistics();
/**
+ * Whether the factory is pooled.
+ */
+ boolean isPooled();
+
+ /**
* Acquires an object from the pool (if any)
*
* @return the object or <tt>null</tt> if the pool is empty
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java
new file mode 100644
index 0000000..faace27
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalPooledTaskFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+/**
+ * A pool for reusing {@link CamelInternalTask} to reduce object allocations.
+ */
+final class CamelInternalPooledTaskFactory extends
PooledObjectFactorySupport<CamelInternalTask> {
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ // we do not want to capture statistics so its disabled
+ }
+
+ @Override
+ public CamelInternalTask acquire() {
+ return pool.poll();
+ }
+
+ @Override
+ public boolean release(CamelInternalTask task) {
+ task.reset();
+ return pool.offer(task);
+ }
+
+ @Override
+ public String toString() {
+ return "CamelInternalPooledTaskFactory[capacity: " + getCapacity() +
"]";
+ }
+
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 9e847fe..79919ea 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -17,6 +17,7 @@
package org.apache.camel.impl.engine;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -45,6 +46,7 @@ import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.InternalProcessor;
import
org.apache.camel.spi.ManagementInterceptStrategy.InstrumentationProcessor;
import org.apache.camel.spi.MessageHistoryFactory;
+import org.apache.camel.spi.PooledObjectFactory;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.spi.ShutdownStrategy;
@@ -61,6 +63,7 @@ import org.apache.camel.support.OrderedComparator;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.UnitOfWorkHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,6 +106,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
private final ShutdownStrategy shutdownStrategy;
private final List<CamelInternalProcessorAdvice<?>> advices = new
ArrayList<>();
private byte statefulAdvices;
+ private PooledObjectFactory<CamelInternalTask> taskFactory;
public CamelInternalProcessor(CamelContext camelContext) {
this.camelContext = camelContext;
@@ -118,6 +122,27 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
}
@Override
+ protected void doBuild() throws Exception {
+ boolean pooled =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+
+ // only create pooled task factory
+ if (pooled) {
+ taskFactory = new CamelInternalPooledTaskFactory();
+ int capacity =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+ taskFactory.setCapacity(capacity);
+ LOG.trace("Using TaskFactory: {}", taskFactory);
+ }
+
+ ServiceHelper.buildService(taskFactory, processor);
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ ServiceHelper.stopAndShutdownServices(taskFactory, processor);
+ }
+
+ @Override
public void addAdvice(CamelInternalProcessorAdvice<?> advice) {
advices.add(advice);
// ensure advices are sorted so they are in the order we want
@@ -174,19 +199,35 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
/**
* Callback task to process the advices after processing.
*/
- private final class AsyncAfterTask implements AsyncCallback {
+ private final class AsyncAfterTask implements CamelInternalTask {
private final Object[] states;
- private final Exchange exchange;
- private final AsyncCallback originalCallback;
+ private Exchange exchange;
+ private AsyncCallback originalCallback;
- private AsyncAfterTask(Object[] states, Exchange exchange,
AsyncCallback originalCallback) {
+ private AsyncAfterTask(Object[] states) {
this.states = states;
+ }
+
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback originalCallback)
{
this.exchange = exchange;
this.originalCallback = originalCallback;
}
@Override
+ public Object[] getStates() {
+ return states;
+ }
+
+ @Override
+ public void reset() {
+ Arrays.fill(states, null);
+ this.exchange = null;
+ this.originalCallback = null;
+ }
+
+ @Override
public void done(boolean doneSync) {
try {
for (int i = advices.size() - 1, j = states.length - 1; i >=
0; i--) {
@@ -213,6 +254,11 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
+
+ // task is done so reset
+ if (taskFactory != null) {
+ taskFactory.release(this);
+ }
}
}
}
@@ -253,8 +299,19 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
return true;
}
+ Object[] states;
+
+ // create internal callback which will execute the advices in reverse
order when done
+ CamelInternalTask afterTask = taskFactory != null ?
taskFactory.acquire() : null;
+ if (afterTask == null) {
+ states = statefulAdvices > 0 ? new Object[statefulAdvices] :
EMPTY_STATES;
+ afterTask = new AsyncAfterTask(states);
+ } else {
+ states = afterTask.getStates();
+ }
+ afterTask.prepare(exchange, originalCallback);
+
// optimise to use object array for states, and only for the number of
advices that keep state
- final Object[] states = statefulAdvices > 0 ? new
Object[statefulAdvices] : EMPTY_STATES;
// optimise for loop using index access to avoid creating iterator
object
for (int i = 0, j = 0; i < advices.size(); i++) {
CamelInternalProcessorAdvice<?> task = advices.get(i);
@@ -270,10 +327,6 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
}
}
- // create internal callback which will execute the advices in reverse
order when done
- // TODO: pool this task, and the states array
- AsyncCallback callback = new AsyncAfterTask(states, exchange,
originalCallback);
-
if (exchange.isTransacted()) {
// must be synchronized for transacted exchanges
if (LOG.isTraceEnabled()) {
@@ -291,14 +344,16 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
- callback.done(true);
+ if (taskFactory != null) {
+ taskFactory.release(afterTask);
+ }
return true;
} else {
final UnitOfWork uow = exchange.getUnitOfWork();
// do uow before processing and if a value is returned the the uow
wants to be processed after
// was well in the same thread
- AsyncCallback async = callback;
+ AsyncCallback async = afterTask;
boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess();
if (beforeAndAfter) {
async = uow.beforeProcess(processor, exchange, async);
@@ -318,7 +373,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
// optimize to only do after uow processing if really needed
if (beforeAndAfter) {
// execute any after processor work (in current thread, not in
the callback)
- uow.afterProcess(processor, exchange, callback, false);
+ uow.afterProcess(processor, exchange, afterTask, false);
}
if (LOG.isTraceEnabled()) {
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java
new file mode 100644
index 0000000..9fc5caa
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalTask.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * Task uses to hold state during {@link CamelInternalProcessor}.
+ */
+interface CamelInternalTask extends AsyncCallback {
+
+ /**
+ * Prepares the task for the given exchange and its callback
+ *
+ * @param exchange the exchange
+ * @param callback the callback
+ */
+ void prepare(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Gets the states
+ */
+ Object[] getStates();
+
+ /**
+ * Resets the task after its done and can be reused for another exchange.
+ */
+ void reset();
+
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index d1effc9..ab64529 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.impl.engine;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -36,8 +33,6 @@ public final class PooledExchangeFactory extends
PrototypeExchangeFactory {
private static final Logger LOG =
LoggerFactory.getLogger(PooledExchangeFactory.class);
private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask();
- private BlockingQueue<Exchange> pool;
- private int capacity = 100;
public PooledExchangeFactory() {
}
@@ -47,12 +42,6 @@ public final class PooledExchangeFactory extends
PrototypeExchangeFactory {
}
@Override
- protected void doBuild() throws Exception {
- super.doBuild();
- this.pool = new ArrayBlockingQueue<>(capacity);
- }
-
- @Override
public ExchangeFactory newExchangeFactory(Consumer consumer) {
PooledExchangeFactory answer = new PooledExchangeFactory(consumer);
answer.setCamelContext(camelContext);
@@ -61,23 +50,6 @@ public final class PooledExchangeFactory extends
PrototypeExchangeFactory {
return answer;
}
- public int getCapacity() {
- return capacity;
- }
-
- @Override
- public int getSize() {
- if (pool != null) {
- return pool.size();
- } else {
- return 0;
- }
- }
-
- public void setCapacity(int capacity) {
- this.capacity = capacity;
- }
-
@Override
public Exchange create(boolean autoRelease) {
Exchange exchange = pool.poll();
@@ -163,13 +135,6 @@ public final class PooledExchangeFactory extends
PrototypeExchangeFactory {
}
@Override
- public void purge() {
- if (pool != null) {
- pool.clear();
- }
- }
-
- @Override
public boolean isPooled() {
return true;
}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
index 6d6b3d1..728e31a 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -177,15 +177,30 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
public PooledExchangeTask create(Exchange exchange,
AsyncCallback callback) {
return new PipelineTask();
}
+
+ @Override
+ public String toString() {
+ return "PooledTaskFactory[capacity: " + getCapacity() +
"]";
+ }
};
int capacity =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
taskFactory.setCapacity(capacity);
} else {
taskFactory = new PrototypeTaskFactory() {
@Override
+ public boolean isPooled() {
+ return false;
+ }
+
+ @Override
public PooledExchangeTask create(Exchange exchange,
AsyncCallback callback) {
return new PipelineTask();
}
+
+ @Override
+ public String toString() {
+ return "PrototypeTaskFactory";
+ }
};
}
LOG.trace("Using TaskFactory: {}", taskFactory);
diff --git
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index 73953bb..67c6162 100644
---
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -128,8 +128,8 @@ public final class DefaultConfigurationConfigurer {
} else if ("prototype".equals(config.getExchangeFactory())) {
ecc.setExchangeFactory(new PrototypeExchangeFactory());
}
-
ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
ecc.getExchangeFactory().setCapacity(config.getExchangeFactoryCapacity());
+
ecc.getExchangeFactory().setStatisticsEnabled(config.isExchangeFactoryStatisticsEnabled());
if (!config.isJmxEnabled()) {
camelContext.disableJMX();
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
index 64867b3..6aa402c 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -35,7 +35,9 @@ public abstract class PooledObjectFactorySupport<T> extends
ServiceSupport imple
@Override
protected void doBuild() throws Exception {
super.doBuild();
- this.pool = new ArrayBlockingQueue<>(capacity);
+ if (isPooled()) {
+ this.pool = new ArrayBlockingQueue<>(capacity);
+ }
}
@Override
@@ -83,8 +85,15 @@ public abstract class PooledObjectFactorySupport<T> extends
ServiceSupport imple
}
@Override
+ public boolean isPooled() {
+ return true;
+ }
+
+ @Override
public void purge() {
- pool.clear();
+ if (pool != null) {
+ pool.clear();
+ }
}
@Override
@@ -96,7 +105,9 @@ public abstract class PooledObjectFactorySupport<T> extends
ServiceSupport imple
protected void doShutdown() throws Exception {
super.doShutdown();
statistics.reset();
- pool.clear();
+ if (pool != null) {
+ pool.clear();
+ }
}
/**
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
index 8c038e4..a2d6bfe 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -81,6 +81,11 @@ public abstract class PrototypeObjectFactorySupport<T>
extends ServiceSupport im
}
@Override
+ public boolean isPooled() {
+ return false;
+ }
+
+ @Override
protected void doShutdown() throws Exception {
super.doShutdown();
statistics.reset();