This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 7f50037  CAMEL-16222: PooledExchangeFactory experiment
7f50037 is described below

commit 7f50037d3f0d8f61422333147655fe4c1013924f
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Feb 24 09:33:14 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/spi/ExchangeFactory.java | 10 ++++++++
 .../camel/impl/engine/DefaultExchangeFactory.java  |  9 +++++++
 .../camel/impl/engine/PooledExchangeFactory.java   | 29 +++++++++++++++++++---
 .../MainConfigurationPropertiesConfigurer.java     |  6 +++++
 .../camel-main-configuration-metadata.json         |  1 +
 core/camel-main/src/main/docs/main.adoc            |  1 +
 .../camel/main/DefaultConfigurationProperties.java | 23 +++++++++++++++++
 7 files changed, 75 insertions(+), 4 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 197d473..e2047e1 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -74,6 +74,16 @@ public interface ExchangeFactory {
     }
 
     /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
+     */
+    int getCapacity();
+
+    /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
+     */
+    void setCapacity(int capacity);
+
+    /**
      * Whether statistics is enabled.
      */
     boolean isStatisticsEnabled();
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index 469fb7c..1ca6740 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -67,4 +67,13 @@ public final class DefaultExchangeFactory implements 
ExchangeFactory, CamelConte
         // not in use
     }
 
+    @Override
+    public int getCapacity() {
+        return 0;
+    }
+
+    @Override
+    public void setCapacity(int capacity) {
+        // not in use
+    }
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 6bfe639..574ac36 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -16,7 +16,8 @@
  */
 package org.apache.camel.impl.engine;
 
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
@@ -44,13 +45,14 @@ public final class PooledExchangeFactory extends 
ServiceSupport
 
     private final ReleaseOnDoneTask onDone = new ReleaseOnDoneTask();
     private final Consumer consumer;
-    private final ConcurrentLinkedQueue<Exchange> pool = new 
ConcurrentLinkedQueue<>();
+    private BlockingQueue<Exchange> pool;
     private final AtomicLong acquired = new AtomicLong();
     private final AtomicLong created = new AtomicLong();
     private final AtomicLong released = new AtomicLong();
     private final AtomicLong discarded = new AtomicLong();
 
     private CamelContext camelContext;
+    private int capacity = 100;
     private boolean statisticsEnabled;
 
     public PooledExchangeFactory() {
@@ -64,6 +66,11 @@ public final class PooledExchangeFactory extends 
ServiceSupport
     }
 
     @Override
+    protected void doBuild() throws Exception {
+        this.pool = new ArrayBlockingQueue<>(capacity);
+    }
+
+    @Override
     public CamelContext getCamelContext() {
         return camelContext;
     }
@@ -78,6 +85,14 @@ public final class PooledExchangeFactory extends 
ServiceSupport
         return new PooledExchangeFactory(consumer, camelContext, 
statisticsEnabled);
     }
 
+    public int getCapacity() {
+        return capacity;
+    }
+
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
     public boolean isStatisticsEnabled() {
         return statisticsEnabled;
     }
@@ -136,10 +151,16 @@ public final class PooledExchangeFactory extends 
ServiceSupport
             ee.onDone(null);
 
             // only release back in pool if reset was success
+            boolean inserted = pool.offer(exchange);
+
             if (statisticsEnabled) {
-                released.incrementAndGet();
+                if (inserted) {
+                    released.incrementAndGet();
+                } else {
+                    discarded.incrementAndGet();
+                }
             }
-            return pool.offer(exchange);
+            return inserted;
         } catch (Exception e) {
             if (statisticsEnabled) {
                 discarded.incrementAndGet();
diff --git 
a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
 
b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
index 2cdf094..744cd80 100644
--- 
a/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
+++ 
b/core/camel-main/src/generated/java/org/apache/camel/main/MainConfigurationPropertiesConfigurer.java
@@ -69,6 +69,8 @@ public class MainConfigurationPropertiesConfigurer extends 
org.apache.camel.supp
         case "EndpointRuntimeStatisticsEnabled": 
target.setEndpointRuntimeStatisticsEnabled(property(camelContext, 
boolean.class, value)); return true;
         case "exchangefactory":
         case "ExchangeFactory": 
target.setExchangeFactory(property(camelContext, java.lang.String.class, 
value)); return true;
+        case "exchangefactorycapacity":
+        case "ExchangeFactoryCapacity": 
target.setExchangeFactoryCapacity(property(camelContext, int.class, value)); 
return true;
         case "exchangefactorystatisticsenabled":
         case "ExchangeFactoryStatisticsEnabled": 
target.setExchangeFactoryStatisticsEnabled(property(camelContext, 
boolean.class, value)); return true;
         case "fileconfigurations":
@@ -254,6 +256,8 @@ public class MainConfigurationPropertiesConfigurer extends 
org.apache.camel.supp
         case "EndpointRuntimeStatisticsEnabled": return boolean.class;
         case "exchangefactory":
         case "ExchangeFactory": return java.lang.String.class;
+        case "exchangefactorycapacity":
+        case "ExchangeFactoryCapacity": return int.class;
         case "exchangefactorystatisticsenabled":
         case "ExchangeFactoryStatisticsEnabled": return boolean.class;
         case "fileconfigurations":
@@ -440,6 +444,8 @@ public class MainConfigurationPropertiesConfigurer extends 
org.apache.camel.supp
         case "EndpointRuntimeStatisticsEnabled": return 
target.isEndpointRuntimeStatisticsEnabled();
         case "exchangefactory":
         case "ExchangeFactory": return target.getExchangeFactory();
+        case "exchangefactorycapacity":
+        case "ExchangeFactoryCapacity": return 
target.getExchangeFactoryCapacity();
         case "exchangefactorystatisticsenabled":
         case "ExchangeFactoryStatisticsEnabled": return 
target.isExchangeFactoryStatisticsEnabled();
         case "fileconfigurations":
diff --git 
a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
 
b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
index 6709f97..09d115c 100644
--- 
a/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
+++ 
b/core/camel-main/src/generated/resources/META-INF/camel-main-configuration-metadata.json
@@ -34,6 +34,7 @@
     { "name": "camel.main.endpointLazyStartProducer", "description": "Whether 
the producer should be started lazy (on the first message). By starting lazy 
you can use this to allow CamelContext and routes to startup in situations 
where a producer may otherwise fail during starting and cause the route to fail 
being started. By deferring this startup to be lazy then the startup failure 
can be handled during routing messages via Camel's routing error handlers. 
Beware that when the first mes [...]
     { "name": "camel.main.endpointRuntimeStatisticsEnabled", "description": 
"Sets whether endpoint runtime statistics is enabled (gathers runtime usage of 
each incoming and outgoing endpoints). The default value is false.", 
"sourceType": "org.apache.camel.main.DefaultConfigurationProperties", "type": 
"boolean", "javaType": "boolean" },
     { "name": "camel.main.exchangeFactory", "description": "Controls whether 
to pool (reuse) exchanges or create new fresh exchanges (default). Using pooled 
will reduce JVM garbage collection overhead by avoiding to re-create Exchange 
instances per message each consumer receives.", "sourceType": 
"org.apache.camel.main.DefaultConfigurationProperties", "type": "string", 
"javaType": "java.lang.String", "defaultValue": "default", "enum": [ "default", 
"pooled" ] },
+    { "name": "camel.main.exchangeFactoryCapacity", "description": "The 
capacity the pool (for each consumer) uses for storing exchanges. The default 
capacity is 100.", "sourceType": 
"org.apache.camel.main.DefaultConfigurationProperties", "type": "integer", 
"javaType": "int", "defaultValue": 100 },
     { "name": "camel.main.exchangeFactoryStatisticsEnabled", "description": 
"Configures whether statistics is enabled on exchange factory.", "sourceType": 
"org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", 
"javaType": "boolean" },
     { "name": "camel.main.fileConfigurations", "description": "Directory to 
load additional configuration files that contains configuration values that 
takes precedence over any other configuration. This can be used to refer to 
files that may have secret configuration that has been mounted on the file 
system for containers. You can specify a pattern to load from sub directories 
and a name pattern such as \/var\/app\/secret\/.properties, multiple 
directories can be separated by comma.", " [...]
     { "name": "camel.main.inflightRepositoryBrowseEnabled", "description": 
"Sets whether the inflight repository should allow browsing each inflight 
exchange. This is by default disabled as there is a very slight performance 
overhead when enabled.", "sourceType": 
"org.apache.camel.main.DefaultConfigurationProperties", "type": "boolean", 
"javaType": "boolean" },
diff --git a/core/camel-main/src/main/docs/main.adoc 
b/core/camel-main/src/main/docs/main.adoc
index 38240f0..eb3b736 100644
--- a/core/camel-main/src/main/docs/main.adoc
+++ b/core/camel-main/src/main/docs/main.adoc
@@ -46,6 +46,7 @@ The following table lists all the options:
 | *camel.main.endpointLazyStart{zwsp}Producer* | Whether the producer should 
be started lazy (on the first message). By starting lazy you can use this to 
allow CamelContext and routes to startup in situations where a producer may 
otherwise fail during starting and cause the route to fail being started. By 
deferring this startup to be lazy then the startup failure can be handled 
during routing messages via Camel's routing error handlers. Beware that when 
the first message is processed the [...]
 | *camel.main.endpointRuntime{zwsp}StatisticsEnabled* | Sets whether endpoint 
runtime statistics is enabled (gathers runtime usage of each incoming and 
outgoing endpoints). The default value is false. |  | boolean
 | *camel.main.exchangeFactory* | Controls whether to pool (reuse) exchanges or 
create new fresh exchanges (default). Using pooled will reduce JVM garbage 
collection overhead by avoiding to re-create Exchange instances per message 
each consumer receives. | default | String
+| *camel.main.exchangeFactory{zwsp}Capacity* | The capacity the pool (for each 
consumer) uses for storing exchanges. The default capacity is 100. | 100 | int
 | *camel.main.exchangeFactory{zwsp}StatisticsEnabled* | Configures whether 
statistics is enabled on exchange factory. |  | boolean
 | *camel.main.fileConfigurations* | Directory to load additional configuration 
files that contains configuration values that takes precedence over any other 
configuration. This can be used to refer to files that may have secret 
configuration that has been mounted on the file system for containers. You can 
specify a pattern to load from sub directories and a name pattern such as 
/var/app/secret/.properties, multiple directories can be separated by comma. |  
| String
 | *camel.main.inflightRepository{zwsp}BrowseEnabled* | Sets whether the 
inflight repository should allow browsing each inflight exchange. This is by 
default disabled as there is a very slight performance overhead when enabled. | 
 | boolean
diff --git 
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
 
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
index 4b12a76..1707928 100644
--- 
a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
+++ 
b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationProperties.java
@@ -89,6 +89,7 @@ public abstract class DefaultConfigurationProperties<T> {
     private boolean lightweight;
     @Metadata(defaultValue = "default", enums = "default,pooled")
     private String exchangeFactory = "default";
+    private int exchangeFactoryCapacity = 100;
     private boolean exchangeFactoryStatisticsEnabled;
     // route controller
     @Metadata(defaultValue = "DEBUG")
@@ -938,6 +939,20 @@ public abstract class DefaultConfigurationProperties<T> {
         this.exchangeFactory = exchangeFactory;
     }
 
+    /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
+     */
+    public int getExchangeFactoryCapacity() {
+        return exchangeFactoryCapacity;
+    }
+
+    /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
+     */
+    public void setExchangeFactoryCapacity(int exchangeFactoryCapacity) {
+        this.exchangeFactoryCapacity = exchangeFactoryCapacity;
+    }
+
     public boolean isExchangeFactoryStatisticsEnabled() {
         return exchangeFactoryStatisticsEnabled;
     }
@@ -1814,6 +1829,14 @@ public abstract class DefaultConfigurationProperties<T> {
     }
 
     /**
+     * The capacity the pool (for each consumer) uses for storing exchanges. 
The default capacity is 100.
+     */
+    public T withExchangeFactoryCapacity(int exchangeFactoryCapacity) {
+        this.exchangeFactoryCapacity = exchangeFactoryCapacity;
+        return (T) this;
+    }
+
+    /**
      * Configures whether statistics is enabled on exchange factory.
      */
     public T withExchangeFactoryStatisticsEnabled(boolean 
exchangeFactoryStatisticsEnabled) {

Reply via email to