This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 7f50037 CAMEL-16222: PooledExchangeFactory experiment 7f50037 is described below commit 7f50037d3f0d8f61422333147655fe4c1013924f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Feb 24 09:33:14 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../java/org/apache/camel/spi/ExchangeFactory.java | 10 ++++++++ .../camel/impl/engine/DefaultExchangeFactory.java | 9 +++++++ .../camel/impl/engine/PooledExchangeFactory.java | 29 +++++++++++++++++++--- .../MainConfigurationPropertiesConfigurer.java | 6 +++++ .../camel-main-configuration-metadata.json | 1 + core/camel-main/src/main/docs/main.adoc | 1 + .../camel/main/DefaultConfigurationProperties.java | 23 +++++++++++++++++ 7 files changed, 75 insertions(+), 4 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index 197d473..e2047e1 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -74,6 +74,16 @@ public interface ExchangeFactory { } /** + * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. + */ + int getCapacity(); + + /** + * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. + */ + void setCapacity(int capacity); + + /** * Whether statistics is enabled. */ boolean isStatisticsEnabled(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java index 469fb7c..1ca6740 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java @@ -67,4 +67,13 @@ public final class DefaultExchangeFactory implements ExchangeFactory, CamelConte // not in use } + @Override + public int getCapacity() { + return 0; + } + + @Override + public void setCapacity(int capacity) { + // not in use + } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 6bfe639..574ac36 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -16,7 +16,8 @@ */ package org.apache.camel.impl.engine; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.CamelContext; @@ -44,13 +45,14 @@ public final class PooledExchangeFactory extends ServiceSupport private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask(); private final Consumer consumer; - private final ConcurrentLinkedQueue<Exchange> pool = new ConcurrentLinkedQueue<>(); + private BlockingQueue<Exchange> pool; private final AtomicLong acquired = new AtomicLong(); private final AtomicLong created = new AtomicLong(); private final AtomicLong released = new AtomicLong(); private final AtomicLong discarded = new AtomicLong(); private CamelContext camelContext; + private int capacity = 100; private boolean statisticsEnabled; public PooledExchangeFactory() { @@ -64,6 +66,11 @@ public final class PooledExchangeFactory extends ServiceSupport } @Override + protected void doBuild() throws Exception { + this.pool = new ArrayBlockingQueue<>(capacity); + } + + @Override public CamelContext getCamelContext() { return camelContext; } @@ -78,6 +85,14 @@ public final class PooledExchangeFactory extends ServiceSupport return new PooledExchangeFactory(consumer, camelContext, statisticsEnabled); } + public int getCapacity() { + return capacity; + } + + public void setCapacity(int capacity) { + this.capacity = capacity; + } + public boolean isStatisticsEnabled() { return statisticsEnabled; } @@ -136,10 +151,16 @@ public final class PooledExchangeFactory extends ServiceSupport ee.onDone(null); // only release back in pool if reset was success + boolean inserted = pool.offer(exchange); + if (statisticsEnabled) { - released.incrementAndGet(); + if (inserted) { + released.incrementAndGet(); + } else { + discarded.incrementAndGet(); + } } - return pool.offer(exchange); + return inserted; } catch (Exception e) { if (statisticsEnabled) { discarded.incrementAndGet(); diff --git a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java index 2cdf094..744cd80 100644 --- a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java +++ b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java @@ -69,6 +69,8 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "EndpointRuntimeStatisticsEnabled": target.setEndpointRuntimeStatisticsEnabled(property(camelContext, boolean.class, value)); return true; case "exchangefactory": case "ExchangeFactory": target.setExchangeFactory(property(camelContext, java.lang.String.class, value)); return true; + case "exchangefactorycapacity": + case "ExchangeFactoryCapacity": target.setExchangeFactoryCapacity(property(camelContext, int.class, value)); return true; case "exchangefactorystatisticsenabled": case "ExchangeFactoryStatisticsEnabled": target.setExchangeFactoryStatisticsEnabled(property(camelContext, boolean.class, value)); return true; case "fileconfigurations": @@ -254,6 +256,8 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "EndpointRuntimeStatisticsEnabled": return boolean.class; case "exchangefactory": case "ExchangeFactory": return java.lang.String.class; + case "exchangefactorycapacity": + case "ExchangeFactoryCapacity": return int.class; case "exchangefactorystatisticsenabled": case "ExchangeFactoryStatisticsEnabled": return boolean.class; case "fileconfigurations": @@ -440,6 +444,8 @@ public class MainConfigurationPropertiesConfigurer extends org.apache.camel.supp case "EndpointRuntimeStatisticsEnabled": return target.isEndpointRuntimeStatisticsEnabled(); case "exchangefactory": case "ExchangeFactory": return target.getExchangeFactory(); + case "exchangefactorycapacity": + case "ExchangeFactoryCapacity": return target.getExchangeFactoryCapacity(); case "exchangefactorystatisticsenabled": case "ExchangeFactoryStatisticsEnabled": return target.isExchangeFactoryStatisticsEnabled(); case "fileconfigurations": diff --git a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json index 6709f97..09d115c 100644 --- a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json +++ b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json @@ -34,6 +34,7 @@ { "name": "camel.main.endpointLazyStartProducer", "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first mes [...] { "name": "camel.main.endpointRuntimeStatisticsEnabled", "description": "Sets whether endpoint runtime statistics is enabled (gathers runtime usage of each incoming and outgoing endpoints). The default value is false.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean" }, { "name": "camel.main.exchangeFactory", "description": "Controls whether to pool (reuse) exchanges or create new fresh exchanges (default). Using pooled will reduce JVM garbage collection overhead by avoiding to re-create Exchange instances per message each consumer receives.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "string", "javaType": "java.lang.String", "defaultValue": "default", "enum": [ "default", "pooled" ] }, + { "name": "camel.main.exchangeFactoryCapacity", "description": "The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "integer", "javaType": "int", "defaultValue": 100 }, { "name": "camel.main.exchangeFactoryStatisticsEnabled", "description": "Configures whether statistics is enabled on exchange factory.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean" }, { "name": "camel.main.fileConfigurations", "description": "Directory to load additional configuration files that contains configuration values that takes precedence over any other configuration. This can be used to refer to files that may have secret configuration that has been mounted on the file system for containers. You can specify a pattern to load from sub directories and a name pattern such as \/var\/app\/secret\/.properties, multiple directories can be separated by comma.", " [...] { "name": "camel.main.inflightRepositoryBrowseEnabled", "description": "Sets whether the inflight repository should allow browsing each inflight exchange. This is by default disabled as there is a very slight performance overhead when enabled.", "sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", "javaType": "boolean" }, diff --git a/core/camel-main/src/main/docs/main.adoc b/core/camel-main/src/main/docs/main.adoc index 38240f0..eb3b736 100644 --- a/core/camel-main/src/main/docs/main.adoc +++ b/core/camel-main/src/main/docs/main.adoc @@ -46,6 +46,7 @@ The following table lists all the options: | *camel.main.endpointLazyStart{zwsp}Producer* | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed the [...] | *camel.main.endpointRuntime{zwsp}StatisticsEnabled* | Sets whether endpoint runtime statistics is enabled (gathers runtime usage of each incoming and outgoing endpoints). The default value is false. | | boolean | *camel.main.exchangeFactory* | Controls whether to pool (reuse) exchanges or create new fresh exchanges (default). Using pooled will reduce JVM garbage collection overhead by avoiding to re-create Exchange instances per message each consumer receives. | default | String +| *camel.main.exchangeFactory{zwsp}Capacity* | The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. | 100 | int | *camel.main.exchangeFactory{zwsp}StatisticsEnabled* | Configures whether statistics is enabled on exchange factory. | | boolean | *camel.main.fileConfigurations* | Directory to load additional configuration files that contains configuration values that takes precedence over any other configuration. This can be used to refer to files that may have secret configuration that has been mounted on the file system for containers. You can specify a pattern to load from sub directories and a name pattern such as /var/app/secret/.properties, multiple directories can be separated by comma. | | String | *camel.main.inflightRepository{zwsp}BrowseEnabled* | Sets whether the inflight repository should allow browsing each inflight exchange. This is by default disabled as there is a very slight performance overhead when enabled. | | boolean diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java index 4b12a76..1707928 100644 --- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java +++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java @@ -89,6 +89,7 @@ public abstract class DefaultConfigurationProperties<T> { private boolean lightweight; @Metadata(defaultValue = "default", enums = "default,pooled") private String exchangeFactory = "default"; + private int exchangeFactoryCapacity = 100; private boolean exchangeFactoryStatisticsEnabled; // route controller @Metadata(defaultValue = "DEBUG") @@ -938,6 +939,20 @@ public abstract class DefaultConfigurationProperties<T> { this.exchangeFactory = exchangeFactory; } + /** + * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. + */ + public int getExchangeFactoryCapacity() { + return exchangeFactoryCapacity; + } + + /** + * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. + */ + public void setExchangeFactoryCapacity(int exchangeFactoryCapacity) { + this.exchangeFactoryCapacity = exchangeFactoryCapacity; + } + public boolean isExchangeFactoryStatisticsEnabled() { return exchangeFactoryStatisticsEnabled; } @@ -1814,6 +1829,14 @@ public abstract class DefaultConfigurationProperties<T> { } /** + * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. + */ + public T withExchangeFactoryCapacity(int exchangeFactoryCapacity) { + this.exchangeFactoryCapacity = exchangeFactoryCapacity; + return (T) this; + } + + /** * Configures whether statistics is enabled on exchange factory. */ public T withExchangeFactoryStatisticsEnabled(boolean exchangeFactoryStatisticsEnabled) {