This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 61d3552429a Camel 16099: throttler EIP - throttle on concurrent 
requests (#12138)
61d3552429a is described below

commit 61d3552429af0cb3688504cdec793b4436f33d4c
Author: Jono Morris <j...@apache.org>
AuthorDate: Sat Nov 25 19:49:34 2023 +1300

    Camel 16099: throttler EIP - throttle on concurrent requests (#12138)
    
    * CAMEL-16099 throttle on number of concurrent requests
    
    * CAMEL-16099 update unit tests
    
    * CAMEL-16099 generated changes
    
    * CAMEL-16099 update throttle-group tests
    
    * CAMEL-16099 add note to 4_3 upgrade guide
---
 .../org/apache/camel/catalog/models/throttle.json  |  13 +-
 .../apache/camel/catalog/schemas/camel-spring.xsd  |   9 -
 .../resources/org/apache/camel/model/throttle.json |  13 +-
 .../apache/camel/model/ProcessorDefinition.java    |  57 ++++---
 .../org/apache/camel/model/ThrottleDefinition.java |  72 ++------
 .../java/org/apache/camel/processor/Throttler.java | 187 +++++++++------------
 .../org/apache/camel/reifier/ThrottleReifier.java  |   5 +-
 .../ThrottlerAsyncDelayedCallerRunsTest.java       |   2 +-
 .../camel/processor/ThrottlerAsyncDelayedTest.java |   4 +-
 .../apache/camel/processor/ThrottlerDslTest.java   |   4 +-
 .../camel/processor/ThrottlerMethodCallTest.java   |   2 +-
 .../org/apache/camel/processor/ThrottlerTest.java  | 170 ++++++++++---------
 .../camel/processor/ThrottlingGroupingTest.java    | 150 +++++++----------
 .../management/mbean/ManagedThrottlerMBean.java    |  14 +-
 .../camel/management/mbean/ManagedThrottler.java   |  18 +-
 .../camel/management/ManagedThrottlerTest.java     |  30 ++--
 .../java/org/apache/camel/xml/in/ModelParser.java  |   1 -
 .../java/org/apache/camel/xml/out/ModelWriter.java |   1 -
 .../org/apache/camel/yaml/out/ModelWriter.java     |   1 -
 .../ROOT/pages/camel-4x-upgrade-guide-4_3.adoc     |  10 +-
 .../dsl/yaml/deserializers/ModelDeserializers.java |   8 +-
 .../generated/resources/schema/camelYamlDsl.json   |   6 -
 etc/eclipse/camel_xml_templates.xml                |   2 +-
 23 files changed, 327 insertions(+), 452 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
index c9eaae7ee68..eda460dde65 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
@@ -15,12 +15,11 @@
     "expression": { "index": 0, "kind": "expression", "displayName": 
"Expression", "required": true, "type": "object", "javaType": 
"org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", 
"csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", 
"java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", 
"python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" 
], "deprecated": false, "autowired": false, "sec [...]
     "correlationExpression": { "index": 1, "kind": "expression", 
"displayName": "Correlation Expression", "required": false, "type": "object", 
"javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ 
"constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", 
"hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method", 
"mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", 
"xquery", "xtokenize" ], "deprecated": false, [...]
     "executorService": { "index": 2, "kind": "attribute", "displayName": 
"Executor Service", "label": "advanced", "required": false, "type": "object", 
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use a custom thread 
pool (ScheduledExecutorService) by the throttler." },
-    "timePeriodMillis": { "index": 3, "kind": "attribute", "displayName": 
"Time Period Millis", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1000", "description": "Sets the time period during which the 
maximum request count is valid for" },
-    "asyncDelayed": { "index": 4, "kind": "attribute", "displayName": "Async 
Delayed", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Enables asynchronous 
delay which means the thread will not block while delaying." },
-    "callerRunsWhenRejected": { "index": 5, "kind": "attribute", 
"displayName": "Caller Runs When Rejected", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether or not the caller should run the task when it was rejected by the 
thread pool. Is by default true" },
-    "rejectExecution": { "index": 6, "kind": "attribute", "displayName": 
"Reject Execution", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether or not 
throttler throws the ThrottlerRejectedExecutionException when the exchange 
exceeds the request limit Is by default false" },
-    "disabled": { "index": 7, "kind": "attribute", "displayName": "Disabled", 
"label": "advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether to disable this EIP from the 
route during build time. Once an EIP has been disabled then it cannot be 
enabled later at runtime." },
-    "id": { "index": 8, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
-    "description": { "index": 9, "kind": "element", "displayName": 
"Description", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the description of this node" }
+    "asyncDelayed": { "index": 3, "kind": "attribute", "displayName": "Async 
Delayed", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Enables asynchronous 
delay which means the thread will not block while delaying." },
+    "callerRunsWhenRejected": { "index": 4, "kind": "attribute", 
"displayName": "Caller Runs When Rejected", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether or not the caller should run the task when it was rejected by the 
thread pool. Is by default true" },
+    "rejectExecution": { "index": 5, "kind": "attribute", "displayName": 
"Reject Execution", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether or not 
throttler throws the ThrottlerRejectedExecutionException when the exchange 
exceeds the request limit Is by default false" },
+    "disabled": { "index": 6, "kind": "attribute", "displayName": "Disabled", 
"label": "advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether to disable this EIP from the 
route during build time. Once an EIP has been disabled then it cannot be 
enabled later at runtime." },
+    "id": { "index": 7, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
+    "description": { "index": 8, "kind": "element", "displayName": 
"Description", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the description of this node" }
   }
 }
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index ecdc5ffb115..cb3f2bc85ea 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -13086,15 +13086,6 @@ correlation key is throttled together.
             <xs:documentation xml:lang="en">
 <![CDATA[
 To use a custom thread pool (ScheduledExecutorService) by the throttler.
-]]>
-            </xs:documentation>
-          </xs:annotation>
-        </xs:attribute>
-        <xs:attribute name="timePeriodMillis" type="xs:string">
-          <xs:annotation>
-            <xs:documentation xml:lang="en">
-<![CDATA[
-Sets the time period during which the maximum request count is valid for. 
Default value: 1000
 ]]>
             </xs:documentation>
           </xs:annotation>
diff --git 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
index c9eaae7ee68..eda460dde65 100644
--- 
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
+++ 
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
@@ -15,12 +15,11 @@
     "expression": { "index": 0, "kind": "expression", "displayName": 
"Expression", "required": true, "type": "object", "javaType": 
"org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", 
"csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", 
"java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", 
"python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" 
], "deprecated": false, "autowired": false, "sec [...]
     "correlationExpression": { "index": 1, "kind": "expression", 
"displayName": "Correlation Expression", "required": false, "type": "object", 
"javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ 
"constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", 
"hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method", 
"mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", 
"xquery", "xtokenize" ], "deprecated": false, [...]
     "executorService": { "index": 2, "kind": "attribute", "displayName": 
"Executor Service", "label": "advanced", "required": false, "type": "object", 
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false, 
"autowired": false, "secret": false, "description": "To use a custom thread 
pool (ScheduledExecutorService) by the throttler." },
-    "timePeriodMillis": { "index": 3, "kind": "attribute", "displayName": 
"Time Period Millis", "required": false, "type": "duration", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": "1000", "description": "Sets the time period during which the 
maximum request count is valid for" },
-    "asyncDelayed": { "index": 4, "kind": "attribute", "displayName": "Async 
Delayed", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Enables asynchronous 
delay which means the thread will not block while delaying." },
-    "callerRunsWhenRejected": { "index": 5, "kind": "attribute", 
"displayName": "Caller Runs When Rejected", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether or not the caller should run the task when it was rejected by the 
thread pool. Is by default true" },
-    "rejectExecution": { "index": 6, "kind": "attribute", "displayName": 
"Reject Execution", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether or not 
throttler throws the ThrottlerRejectedExecutionException when the exchange 
exceeds the request limit Is by default false" },
-    "disabled": { "index": 7, "kind": "attribute", "displayName": "Disabled", 
"label": "advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether to disable this EIP from the 
route during build time. Once an EIP has been disabled then it cannot be 
enabled later at runtime." },
-    "id": { "index": 8, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
-    "description": { "index": 9, "kind": "element", "displayName": 
"Description", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the description of this node" }
+    "asyncDelayed": { "index": 3, "kind": "attribute", "displayName": "Async 
Delayed", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Enables asynchronous 
delay which means the thread will not block while delaying." },
+    "callerRunsWhenRejected": { "index": 4, "kind": "attribute", 
"displayName": "Caller Runs When Rejected", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, "description": 
"Whether or not the caller should run the task when it was rejected by the 
thread pool. Is by default true" },
+    "rejectExecution": { "index": 5, "kind": "attribute", "displayName": 
"Reject Execution", "label": "advanced", "required": false, "type": "boolean", 
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Whether or not 
throttler throws the ThrottlerRejectedExecutionException when the exchange 
exceeds the request limit Is by default false" },
+    "disabled": { "index": 6, "kind": "attribute", "displayName": "Disabled", 
"label": "advanced", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether to disable this EIP from the 
route during build time. Once an EIP has been disabled then it cannot be 
enabled later at runtime." },
+    "id": { "index": 7, "kind": "attribute", "displayName": "Id", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "description": "Sets the id of this node" 
},
+    "description": { "index": 8, "kind": "element", "displayName": 
"Description", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "Sets the description of this node" }
   }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 8f4e1920f4f..55b5a9c09b8 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -1765,14 +1765,14 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * that a specific endpoint does not get overloaded, or that we don't 
exceed an agreed SLA with some external
      * service.
      * <p/>
-     * Will default use a time period of 1 second, so setting the 
maximumRequestCount to eg 10 will default ensure at
-     * most 10 messages per second.
+     * Setting the maximumConcurrentRequest will ensure that no more than the 
specified number of messages will flow to
+     * the endpoint at any given time.
      *
-     * @param  maximumRequestCount the maximum messages
-     * @return                     the builder
+     * @param  maximumConcurrentRequests the maximum number of concurrent 
messages
+     * @return                           the builder
      */
-    public ThrottleDefinition throttle(long maximumRequestCount) {
-        return 
throttle(ExpressionBuilder.constantExpression(maximumRequestCount));
+    public ThrottleDefinition throttle(long maximumConcurrentRequests) {
+        return 
throttle(ExpressionBuilder.constantExpression(maximumConcurrentRequests));
     }
 
     /**
@@ -1780,14 +1780,14 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * that a specific endpoint does not get overloaded, or that we don't 
exceed an agreed SLA with some external
      * service.
      * <p/>
-     * Will default use a time period of 1 second, so setting the 
maximumRequestCount to eg 10 will default ensure at
-     * most 10 messages per second.
+     * Setting the maximumConcurrentRequest will ensure that no more than the 
specified number of messages will flow to
+     * the endpoint at any given time.
      *
-     * @param  maximumRequestCount an expression to calculate the maximum 
request count
-     * @return                     the builder
+     * @param  maximumConcurrentRequests an expression to calculate the 
maximum concurrent request count
+     * @return                           the builder
      */
-    public ThrottleDefinition throttle(Expression maximumRequestCount) {
-        ThrottleDefinition answer = new 
ThrottleDefinition(maximumRequestCount);
+    public ThrottleDefinition throttle(Expression maximumConcurrentRequests) {
+        ThrottleDefinition answer = new 
ThrottleDefinition(maximumConcurrentRequests);
         addOutput(answer);
         return answer;
     }
@@ -1799,17 +1799,18 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * based on the key expression to group exchanges. This will make 
key-based throttling instead of overall
      * throttling.
      * <p/>
-     * Will default use a time period of 1 second, so setting the 
maximumRequestCount to eg 10 will default ensure at
-     * most 10 messages per second.
+     * Setting the maximumConcurrentRequest will ensure that no more than the 
specified number of messages will flow to
+     * the endpoint at any given time.
      *
-     * @param  maximumRequestCount      an expression to calculate the maximum 
request count
-     * @param  correlationExpressionKey is a correlation key that can throttle 
by the given key instead of overall
-     *                                  throttling
-     * @return                          the builder
+     * @param  maximumConcurrentRequests an expression to calculate the 
maximum concurrent request count
+     * @param  correlationExpressionKey  is a correlation key that can 
throttle by the given key instead of overall
+     *                                   throttling
+     * @return                           the builder
      */
-    public ThrottleDefinition throttle(Expression maximumRequestCount, long 
correlationExpressionKey) {
+    public ThrottleDefinition throttle(Expression maximumConcurrentRequests, 
long correlationExpressionKey) {
         ThrottleDefinition answer
-                = new ThrottleDefinition(maximumRequestCount, 
ExpressionBuilder.constantExpression(correlationExpressionKey));
+                = new ThrottleDefinition(
+                        maximumConcurrentRequests, 
ExpressionBuilder.constantExpression(correlationExpressionKey));
         addOutput(answer);
         return answer;
     }
@@ -1821,16 +1822,16 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * based on the key expression to group exchanges. This will make 
key-based throttling instead of overall
      * throttling.
      * <p/>
-     * Will default use a time period of 1 second, so setting the 
maximumRequestCount to eg 10 will default ensure at
-     * most 10 messages per second.
+     * Setting the maximumConcurrentRequest will ensure that no more than the 
specified number of messages will flow to
+     * the endpoint at any given time.
      *
-     * @param  maximumRequestCount      an expression to calculate the maximum 
request count
-     * @param  correlationExpressionKey is a correlation key as an expression 
that can throttle by the given key instead
-     *                                  of overall throttling
-     * @return                          the builder
+     * @param  maximumConcurrentRequests an expression to calculate the 
maximum concurrent request count
+     * @param  correlationExpressionKey  is a correlation key as an expression 
that can throttle by the given key
+     *                                   instead of overall throttling
+     * @return                           the builder
      */
-    public ThrottleDefinition throttle(Expression maximumRequestCount, 
Expression correlationExpressionKey) {
-        ThrottleDefinition answer = new 
ThrottleDefinition(maximumRequestCount, correlationExpressionKey);
+    public ThrottleDefinition throttle(Expression maximumConcurrentRequests, 
Expression correlationExpressionKey) {
+        ThrottleDefinition answer = new 
ThrottleDefinition(maximumConcurrentRequests, correlationExpressionKey);
         addOutput(answer);
         return answer;
     }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 8c0b78b3e78..2a7611997b4 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -49,9 +49,6 @@ public class ThrottleDefinition extends ExpressionNode 
implements ExecutorServic
     @Metadata(label = "advanced", javaType = 
"java.util.concurrent.ExecutorService")
     private String executorService;
     @XmlAttribute
-    @Metadata(defaultValue = "1000", javaType = "java.time.Duration")
-    private String timePeriodMillis;
-    @XmlAttribute
     @Metadata(label = "advanced", javaType = "java.lang.Boolean")
     private String asyncDelayed;
     @XmlAttribute
@@ -64,16 +61,16 @@ public class ThrottleDefinition extends ExpressionNode 
implements ExecutorServic
     public ThrottleDefinition() {
     }
 
-    public ThrottleDefinition(Expression maximumRequestsPerPeriod) {
-        super(maximumRequestsPerPeriod);
+    public ThrottleDefinition(Expression maximumConcurrentRequests) {
+        super(maximumConcurrentRequests);
     }
 
-    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression 
correlationExpression) {
-        
this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), 
correlationExpression);
+    public ThrottleDefinition(Expression maximumConcurrentRequests, Expression 
correlationExpression) {
+        
this(ExpressionNodeHelper.toExpressionDefinition(maximumConcurrentRequests), 
correlationExpression);
     }
 
-    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, 
Expression correlationExpression) {
-        super(maximumRequestsPerPeriod);
+    private ThrottleDefinition(ExpressionDefinition maximumConcurrentRequests, 
Expression correlationExpression) {
+        super(maximumConcurrentRequests);
 
         ExpressionSubElementDefinition cor = new 
ExpressionSubElementDefinition();
         
cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
@@ -82,11 +79,7 @@ public class ThrottleDefinition extends ExpressionNode 
implements ExecutorServic
 
     @Override
     public String toString() {
-        return "Throttle[" + description() + "]";
-    }
-
-    protected String description() {
-        return getExpression() + " request per " + getTimePeriodMillis() + " 
millis";
+        return "Throttle[" + getExpression() + "]";
     }
 
     @Override
@@ -96,53 +89,32 @@ public class ThrottleDefinition extends ExpressionNode 
implements ExecutorServic
 
     @Override
     public String getLabel() {
-        return "throttle[" + description() + "]";
+        return "throttle[" + getExpression() + "]";
     }
 
     // Fluent API
     // 
-------------------------------------------------------------------------
     /**
-     * Sets the time period during which the maximum request count is valid for
-     *
-     * @param  timePeriodMillis period in millis
-     * @return                  the builder
-     */
-    public ThrottleDefinition timePeriodMillis(long timePeriodMillis) {
-        return timePeriodMillis(Long.toString(timePeriodMillis));
-    }
-
-    /**
-     * Sets the time period during which the maximum request count is valid for
-     *
-     * @param  timePeriodMillis period in millis
-     * @return                  the builder
-     */
-    public ThrottleDefinition timePeriodMillis(String timePeriodMillis) {
-        setTimePeriodMillis(timePeriodMillis);
-        return this;
-    }
-
-    /**
-     * Sets the time period during which the maximum request count per period
+     * Sets the maximum number of concurrent requests
      *
-     * @param  maximumRequestsPerPeriod the maximum request count number per 
time period
-     * @return                          the builder
+     * @param  maximumConcurrentRequests the maximum number of concurrent 
requests
+     * @return                           the builder
      */
-    public ThrottleDefinition maximumRequestsPerPeriod(long 
maximumRequestsPerPeriod) {
+    public ThrottleDefinition maximumConcurrentRequests(long 
maximumConcurrentRequests) {
         setExpression(
-                
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod)));
+                
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumConcurrentRequests)));
         return this;
     }
 
     /**
-     * Sets the time period during which the maximum request count per period
+     * Sets the number of concurrent requests
      *
-     * @param  maximumRequestsPerPeriod the maximum request count number per 
time period
-     * @return                          the builder
+     * @param  maximumConcurrentRequests the maximum number of concurrent 
requests
+     * @return                           the builder
      */
-    public ThrottleDefinition maximumRequestsPerPeriod(String 
maximumRequestsPerPeriod) {
+    public ThrottleDefinition maximumConcurrentRequests(String 
maximumConcurrentRequests) {
         setExpression(
-                
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.simpleExpression(maximumRequestsPerPeriod)));
+                
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.simpleExpression(maximumConcurrentRequests)));
         return this;
     }
 
@@ -297,14 +269,6 @@ public class ThrottleDefinition extends ExpressionNode 
implements ExecutorServic
         super.setExpression(expression);
     }
 
-    public String getTimePeriodMillis() {
-        return timePeriodMillis;
-    }
-
-    public void setTimePeriodMillis(String timePeriodMillis) {
-        this.timePeriodMillis = timePeriodMillis;
-    }
-
     public String getAsyncDelayed() {
         return asyncDelayed;
     }
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
index a7854805cde..b74b145914e 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
@@ -18,11 +18,10 @@ package org.apache.camel.processor;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -34,6 +33,7 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.AsyncProcessorSupport;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -41,20 +41,17 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A <a href="http://camel.apache.org/throttler.html";>Throttler</a> will set a 
limit on the maximum number of message
- * exchanges which can be sent to a processor within a specific time period.
+ * exchanges which can be sent to a processor concurrently.
  * <p/>
- * This pattern can be extremely useful if you have some external system which 
meters access; such as only allowing 100
- * requests per second; or if huge load can cause a particular system to 
malfunction or to reduce its throughput you
+ * This pattern can be extremely useful if you have some external system which 
meters access; such as only allowing 10
+ * concurrent requests; or if huge load can cause a particular system to 
malfunction or to reduce its throughput you
  * might want to introduce some throttling.
  *
  * This throttle implementation is thread-safe and is therefore safe to be 
used by multiple concurrent threads in a
  * single route.
  *
- * The throttling mechanism is a DelayQueue with maxRequestsPerPeriod permits 
on it. Each permit is set to be delayed by
- * timePeriodMillis (except when the throttler is initialized or the throttle 
rate increased, then there is no delay for
- * those permits). Callers trying to acquire a permit from the DelayQueue will 
block if necessary. The end result is a
- * rolling window of time. Where from the callers point of view in the last 
timePeriodMillis no more than
- * maxRequestsPerPeriod have been allowed to be acquired.
+ * The throttling mechanism is a Semaphore with maxConcurrentRequests permits 
on it. Callers trying to acquire a permit
+ * will block if necessary when maxConcurrentRequests permits have been 
acquired.
  */
 public class Throttler extends AsyncProcessorSupport implements Traceable, 
IdAware, RouteIdAware {
 
@@ -64,6 +61,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
 
     private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = 
"CamelThrottlerExchangeQueuedTimestamp";
     private static final String PROPERTY_EXCHANGE_STATE = 
"CamelThrottlerExchangeState";
+    private static final long CLEAN_PERIOD = 1000L * 10;
 
     private enum State {
         SYNC,
@@ -74,34 +72,24 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
     private final CamelContext camelContext;
     private final ScheduledExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
-
-    private volatile long timePeriodMillis;
-    private final long cleanPeriodMillis;
     private String id;
     private String routeId;
-    private Expression maxRequestsPerPeriodExpression;
+    private Expression maxConcurrentRequestsExpression;
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
     private final Expression correlationExpression;
     private final Map<String, ThrottlingState> states = new 
ConcurrentHashMap<>();
 
-    public Throttler(final CamelContext camelContext, final Expression 
maxRequestsPerPeriodExpression,
-                     final long timePeriodMillis,
+    public Throttler(final CamelContext camelContext, final Expression 
maxConcurrentRequestsExpression,
                      final ScheduledExecutorService asyncExecutor, final 
boolean shutdownAsyncExecutor,
                      final boolean rejectExecution, Expression correlation) {
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
         this.shutdownAsyncExecutor = shutdownAsyncExecutor;
 
-        ObjectHelper.notNull(maxRequestsPerPeriodExpression, 
"maxRequestsPerPeriodExpression");
-        this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
-
-        if (timePeriodMillis <= 0) {
-            throw new IllegalArgumentException("TimePeriodMillis should be a 
positive number, was: " + timePeriodMillis);
-        }
-        this.timePeriodMillis = timePeriodMillis;
-        this.cleanPeriodMillis = timePeriodMillis * 10;
+        ObjectHelper.notNull(maxConcurrentRequestsExpression, 
"maxConcurrentRequestsExpression");
+        this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression;
         this.asyncExecutor = asyncExecutor;
         this.correlationExpression = correlation;
     }
@@ -127,16 +115,12 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                 key = correlationExpression.evaluate(exchange, String.class);
             }
             ThrottlingState throttlingState = states.computeIfAbsent(key, 
ThrottlingState::new);
-            throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange);
-
-            ThrottlePermit permit = throttlingState.poll();
+            
throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange);
 
-            if (permit == null) {
+            if (!throttlingState.tryAcquire(exchange)) {
                 if (isRejectExecution()) {
                     throw new ThrottlerRejectedExecutionException(
-                            "Exceeded the max throttle rate of "
-                                                                  + 
throttlingState.getThrottleRate() + " within "
-                                                                  + 
timePeriodMillis + "ms");
+                            "Exceeded the max throttle rate of " + 
throttlingState.getThrottleRate());
                 } else {
                     // delegate to async pool
                     if (isAsyncDelayed() && !exchange.isTransacted() && state 
== State.SYNC) {
@@ -154,12 +138,10 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                     if (LOG.isTraceEnabled()) {
                         start = System.currentTimeMillis();
                     }
-                    permit = throttlingState.take();
+                    throttlingState.acquire(exchange);
                     if (LOG.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    throttlingState.enqueue(permit, exchange);
-
                     if (state == State.ASYNC) {
                         if (LOG.isTraceEnabled()) {
                             long queuedTime = start - queuedStart;
@@ -175,8 +157,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                     }
                 }
             } else {
-                throttlingState.enqueue(permit, exchange);
-
+                // permit acquired
                 if (state == State.ASYNC) {
                     if (LOG.isTraceEnabled()) {
                         long queuedTime = System.currentTimeMillis() - 
queuedStart;
@@ -214,7 +195,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
     }
 
     /**
-     * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the 
executor rejects the submission and
+     * Delegate blocking to an asyncExecutor. Except if the executor rejects 
the submission and
      * isCallerRunsWhenRejected() is enabled, then this method will delegate 
back to process(), but not before changing
      * the exchange state to stop any recursion.
      */
@@ -225,8 +206,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                 exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, 
System.currentTimeMillis());
             }
             exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC);
-            long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS);
-            asyncExecutor.schedule(() -> process(exchange, callback), delay, 
TimeUnit.NANOSECONDS);
+            asyncExecutor.submit(() -> process(exchange, callback));
             return false;
         } catch (final RejectedExecutionException e) {
             if (isCallerRunsWhenRejected()) {
@@ -259,68 +239,82 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
 
     private class ThrottlingState {
         private final String key;
-        private final DelayQueue<ThrottlePermit> delayQueue = new 
DelayQueue<>();
         private final AtomicReference<ScheduledFuture<?>> cleanFuture = new 
AtomicReference<>();
         private volatile int throttleRate;
+        private WrappedSemaphore semaphore;
 
         ThrottlingState(String key) {
             this.key = key;
+            semaphore = new WrappedSemaphore();
         }
 
         public int getThrottleRate() {
             return throttleRate;
         }
 
-        public ThrottlePermit poll() {
-            return delayQueue.poll();
+        public void clean() {
+            states.remove(key);
         }
 
-        public ThrottlePermit peek() {
-            return delayQueue.peek();
+        public boolean tryAcquire(Exchange exchange) {
+            boolean acquired = semaphore.tryAcquire();
+            if (acquired) {
+                addSynchronization(exchange);
+            }
+            return acquired;
         }
 
-        public ThrottlePermit take() throws InterruptedException {
-            return delayQueue.take();
+        public void acquire(Exchange exchange) throws InterruptedException {
+            semaphore.acquire();
+            addSynchronization(exchange);
         }
 
-        public void clean() {
-            states.remove(key);
+        private void addSynchronization(final Exchange exchange) {
+            exchange.getExchangeExtension().addOnCompletion(new 
Synchronization() {
+                @Override
+                public void onComplete(Exchange exchange) {
+                    release(exchange);
+                }
+
+                @Override
+                public void onFailure(Exchange exchange) {
+                    release(exchange);
+                }
+            });
         }
 
         /**
-         * Returns a permit to the DelayQueue, first resetting it's delay to 
be relative to now.
+         * Returns a permit.
          */
-        public void enqueue(final ThrottlePermit permit, final Exchange 
exchange) {
-            permit.setDelayMs(getTimePeriodMillis());
-            delayQueue.put(permit);
+        public void release(final Exchange exchange) {
+            semaphore.release();
             try {
-                ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, 
cleanPeriodMillis, TimeUnit.MILLISECONDS);
+                ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, 
CLEAN_PERIOD, TimeUnit.MILLISECONDS);
                 ScheduledFuture<?> prev = cleanFuture.getAndSet(next);
                 if (prev != null) {
                     prev.cancel(false);
                 }
-                // try and incur the least amount of overhead while releasing 
permits back to the queue
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Permit released, for exchangeId: {}", 
exchange.getExchangeId());
                 }
             } catch (RejectedExecutionException e) {
-                LOG.debug("Throttling queue cleaning rejected", e);
+                LOG.debug("Throttle cleaning rejected", e);
             }
         }
 
         /**
-         * Evaluates the maxRequestsPerPeriodExpression and adjusts the 
throttle rate up or down.
+         * Evaluates the maxConcurrentRequestsExpression and adjusts the 
throttle rate up or down.
          */
-        public synchronized void calculateAndSetMaxRequestsPerPeriod(final 
Exchange exchange) throws Exception {
-            Integer newThrottle = 
maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
+        public synchronized void 
calculateAndSetMaxConcurrentRequestsExpression(final Exchange exchange) throws 
Exception {
+            Integer newThrottle = 
maxConcurrentRequestsExpression.evaluate(exchange, Integer.class);
 
             if (newThrottle != null && newThrottle < 0) {
-                throw new IllegalStateException("The maximumRequestsPerPeriod 
must be a positive number, was: " + newThrottle);
+                throw new IllegalStateException("The maximumConcurrentRequests 
must be a positive number, was: " + newThrottle);
             }
 
             if (newThrottle == null && throttleRate == 0) {
                 throw new RuntimeExchangeException(
-                        "The maxRequestsPerPeriodExpression was evaluated as 
null: " + maxRequestsPerPeriodExpression,
+                        "The maxConcurrentRequestsExpression was evaluated as 
null: " + maxConcurrentRequestsExpression,
                         exchange);
             }
 
@@ -331,14 +325,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                         int delta = throttleRate - newThrottle;
 
                         // discard any permits that are needed to decrease 
throttling
-                        while (delta > 0) {
-                            delayQueue.take();
-                            delta--;
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Permit discarded due to throttling 
rate decrease, triggered by ExchangeId: {}",
-                                        exchange.getExchangeId());
-                            }
-                        }
+                        semaphore.reducePermits(delta);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Throttle rate decreased from {} to {}, 
triggered by ExchangeId: {}", throttleRate,
                                     newThrottle, exchange.getExchangeId());
@@ -347,9 +334,7 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
                         // increase
                     } else if (newThrottle > throttleRate) {
                         int delta = newThrottle - throttleRate;
-                        for (int i = 0; i < delta; i++) {
-                            delayQueue.put(new ThrottlePermit(-1));
-                        }
+                        semaphore.increasePermits(delta);
                         if (throttleRate == 0) {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Initial throttle rate set to {}, 
triggered by ExchangeId: {}", newThrottle,
@@ -368,28 +353,29 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
         }
     }
 
-    /**
-     * Permit that implements the Delayed interface needed by DelayQueue.
-     */
-    private static class ThrottlePermit implements Delayed {
-        private volatile long scheduledTime;
-
-        ThrottlePermit(final long delayMs) {
-            setDelayMs(delayMs);
+    // extend Semaphore so we can reduce permits if required
+    private class WrappedSemaphore extends Semaphore {
+        public WrappedSemaphore() {
+            super(0, true);
         }
 
-        public void setDelayMs(final long delayMs) {
-            this.scheduledTime = System.currentTimeMillis() + delayMs;
+        public boolean tryAcquire() {
+            try {
+                // honours fairness setting
+                return super.tryAcquire(0L, TimeUnit.NANOSECONDS);
+            } catch (InterruptedException e) {
+                return false;
+            }
         }
 
-        @Override
-        public long getDelay(final TimeUnit unit) {
-            return unit.convert(scheduledTime - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
+        // decrease throttling
+        public void reducePermits(int n) {
+            super.reducePermits(n);
         }
 
-        @Override
-        public int compareTo(final Delayed o) {
-            return Long.compare(getDelay(TimeUnit.MILLISECONDS), 
o.getDelay(TimeUnit.MILLISECONDS));
+        // increase throttling
+        public void increasePermits(int n) {
+            super.release(n);
         }
     }
 
@@ -440,36 +426,25 @@ public class Throttler extends AsyncProcessorSupport 
implements Traceable, IdAwa
     /**
      * Sets the maximum number of requests per time period expression
      */
-    public void setMaximumRequestsPerPeriodExpression(Expression 
maxRequestsPerPeriodExpression) {
-        this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
+    public void setMaximumConcurrentRequestsExpression(Expression 
maxConcurrentRequestsExpression) {
+        this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression;
     }
 
-    public Expression getMaximumRequestsPerPeriodExpression() {
-        return maxRequestsPerPeriodExpression;
+    public Expression getMaximumConcurrentRequests() {
+        return maxConcurrentRequestsExpression;
     }
 
     /**
-     * Gets the current maximum request per period value. If it is grouped 
throttling applied with correlationExpression
-     * than the max per period within the group will return
+     * Gets the current maximum request. If it is grouped throttling applied 
with correlationExpression then the max
+     * within the group will return
      */
-    public int getCurrentMaximumRequestsPerPeriod() {
+    public int getCurrentMaximumConcurrentRequests() {
         return 
states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0);
     }
 
-    /**
-     * Sets the time period during which the maximum number of requests apply
-     */
-    public void setTimePeriodMillis(final long timePeriodMillis) {
-        this.timePeriodMillis = timePeriodMillis;
-    }
-
-    public long getTimePeriodMillis() {
-        return timePeriodMillis;
-    }
-
     @Override
     public String getTraceLabel() {
-        return "throttle[" + maxRequestsPerPeriodExpression + " per: " + 
timePeriodMillis + "]";
+        return "throttle[" + maxConcurrentRequestsExpression + "]";
     }
 
     @Override
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
index 20dc0bec3ef..a046d0bf1c4 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
@@ -37,9 +37,6 @@ public class ThrottleReifier extends 
ExpressionReifier<ThrottleDefinition> {
         boolean shutdownThreadPool = willCreateNewThreadPool(definition, true);
         ScheduledExecutorService threadPool = 
getConfiguredScheduledExecutorService("Throttle", definition, true);
 
-        // should be default 1000 millis
-        long period = parseDuration(definition.getTimePeriodMillis(), 1000L);
-
         // max requests per period is mandatory
         Expression maxRequestsExpression = 
createMaxRequestsPerPeriodExpression();
         if (maxRequestsExpression == null) {
@@ -53,7 +50,7 @@ public class ThrottleReifier extends 
ExpressionReifier<ThrottleDefinition> {
 
         boolean reject = parseBoolean(definition.getRejectExecution(), false);
         Throttler answer = new Throttler(
-                camelContext, maxRequestsExpression, period, threadPool, 
shutdownThreadPool, reject, correlation);
+                camelContext, maxRequestsExpression, threadPool, 
shutdownThreadPool, reject, correlation);
 
         answer.setAsyncDelayed(async);
         // should be true by default
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
index 56efee27f12..1af2943081c 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
@@ -50,7 +50,7 @@ public class ThrottlerAsyncDelayedCallerRunsTest extends 
ContextTestSupport {
                 builder.maxQueueSize(2);
                 
context.getExecutorServiceManager().registerThreadPoolProfile(builder.build());
 
-                
from("seda:start").throttle(1).timePeriodMillis(100).asyncDelayed().executorService("myThrottler")
+                
from("seda:start").throttle(1).delay(100).asyncDelayed().executorService("myThrottler")
                         .callerRunsWhenRejected(true).to("mock:result");
             }
         };
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
index ef14ff4b3e8..71adf7ea555 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
@@ -67,10 +67,10 @@ public class ThrottlerAsyncDelayedTest extends 
ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                
from("seda:a").throttle(3).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result",
 "mock:result");
+                
from("seda:a").throttle(3).delay(INTERVAL).asyncDelayed().to("log:result", 
"mock:result");
                 // END SNIPPET: ex
 
-                
from("direct:a").throttle(3).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result",
 "mock:result");
+                
from("direct:a").throttle(3).delay(INTERVAL).asyncDelayed().to("log:result", 
"mock:result");
             }
         };
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
index 2d09d32c026..bb34216469a 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
@@ -49,7 +49,7 @@ public class ThrottlerDslTest extends ContextTestSupport {
         resultEndpoint.assertIsSatisfied();
 
         // now assert that they have actually been throttled
-        long minimumTime = (messageCount - 1) * INTERVAL;
+        long minimumTime = messageCount * INTERVAL;
         // add a little slack
         long delta = System.currentTimeMillis() - start + 200;
         assertTrue(delta >= minimumTime, "Should take at least " + minimumTime 
+ "ms, was: " + delta);
@@ -61,7 +61,7 @@ public class ThrottlerDslTest extends ContextTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("direct:start").throttle().message(m -> 
m.getHeader("ThrottleCount", Integer.class))
-                        .timePeriodMillis(INTERVAL).to("log:result", 
"mock:result");
+                        .delay(INTERVAL).to("log:result", "mock:result");
             }
         };
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
index 7255f31da86..79289495426 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
@@ -75,7 +75,7 @@ public class ThrottlerMethodCallTest extends 
ContextTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:expressionMethod").throttle(method("myBean", 
"getMessagesPerInterval")).timePeriodMillis(INTERVAL)
+                from("direct:expressionMethod").throttle(method("myBean", 
"getMessagesPerInterval")).delay(INTERVAL)
                         .to("log:result", "mock:result");
             }
         };
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index b25f77941f1..37b92903927 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Arrays;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -35,23 +36,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", 
disabledReason = "Flaky on Github CI")
 public class ThrottlerTest extends ContextTestSupport {
     private static final int INTERVAL = 500;
-    private static final int TOLERANCE = 50;
     private static final int MESSAGE_COUNT = 9;
-
-    @Test
-    public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() 
throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(3);
-        resultEndpoint.setResultWaitTime(2000);
-
-        for (int i = 0; i < MESSAGE_COUNT; i++) {
-            template.sendBody("seda:a", "<message>" + i + "</message>");
-        }
-
-        // lets pause to give the requests time to be processed
-        // to check that the throttle really does kick in
-        resultEndpoint.assertIsSatisfied();
-    }
+    private static final int CONCURRENT_REQUESTS = 2;
+    private volatile int curr;
+    private volatile int max;
 
     @Test
     public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
@@ -61,27 +49,29 @@ public class ThrottlerTest extends ContextTestSupport {
         MockEndpoint errorEndpoint = resolveMandatoryEndpoint("mock:error", 
MockEndpoint.class);
         errorEndpoint.expectedMessageCount(4);
 
-        for (int i = 0; i < 6; i++) {
-            template.sendBody("direct:start", "<message>" + i + "</message>");
+        ExecutorService executor = Executors.newFixedThreadPool(6);
+        try {
+            for (int i = 0; i < 6; i++) {
+                executor.execute(() -> template.sendBody("direct:start", 
"<message>payload</message>"));
+            }
+            assertMockEndpointsSatisfied();
+        } finally {
+            executor.shutdownNow();
         }
-
-        // lets pause to give the requests time to be processed
-        // to check that the throttle really does kick in
-        assertMockEndpointsSatisfied();
     }
 
     @Test
     public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() 
throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
-        long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", 
MESSAGE_COUNT, resultEndpoint);
-        assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+        sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT, 
resultEndpoint);
+        assertTrue(max <= CONCURRENT_REQUESTS);
     }
 
     @Test
     public void testConfigurationWithConstantExpression() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
-        long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, 
"direct:expressionConstant", MESSAGE_COUNT, resultEndpoint);
-        assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+        sendMessagesAndAwaitDelivery(MESSAGE_COUNT, 
"direct:expressionConstant", MESSAGE_COUNT, resultEndpoint);
+        assertTrue(max <= CONCURRENT_REQUESTS);
     }
 
     @Test
@@ -91,7 +81,7 @@ public class ThrottlerTest extends ContextTestSupport {
 
         ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
         try {
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 
CONCURRENT_REQUESTS, INTERVAL, MESSAGE_COUNT);
         } finally {
             executor.shutdownNow();
         }
@@ -102,47 +92,46 @@ public class ThrottlerTest extends ContextTestSupport {
         ExecutorService executor = Executors.newFixedThreadPool(5);
         try {
             MockEndpoint resultEndpoint = 
resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 
INTERVAL, MESSAGE_COUNT);
-            Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the
-                                               // first throttle rate does not
-                                               // influence the next one.
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 
INTERVAL, MESSAGE_COUNT);
+            Thread.sleep(INTERVAL); // sleep here to ensure the
+                                   // first throttle rate does not
+                                   // influence the next one.
 
             resultEndpoint.reset();
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 10, 
INTERVAL, MESSAGE_COUNT);
-            Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the
-                                               // first throttle rate does not
-                                               // influence the next one.
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 
INTERVAL, MESSAGE_COUNT);
+            Thread.sleep(INTERVAL); // sleep here to ensure the
+                                   // first throttle rate does not
+                                   // influence the next one.
 
             resultEndpoint.reset();
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 
INTERVAL, MESSAGE_COUNT);
-            Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the
-                                               // first throttle rate does not
-                                               // influence the next one.
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, 
INTERVAL, MESSAGE_COUNT);
+            Thread.sleep(INTERVAL); // sleep here to ensure the
+                                   // first throttle rate does not
+                                   // influence the next one.
 
             resultEndpoint.reset();
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 10, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, 
INTERVAL, MESSAGE_COUNT);
         } finally {
             executor.shutdownNow();
         }
     }
 
-    private void assertThrottlerTiming(
-            final long elapsedTimeMs, final int throttle, final int 
intervalMs, final int messageCount) {
-        // now assert that they have actually been throttled (use +/- 50 as
-        // slack)
-        long minimum = calculateMinimum(intervalMs, throttle, messageCount) - 
50;
-        long maximum = calculateMaximum(intervalMs, throttle, messageCount) + 
50;
-        // add 500 in case running on slow CI boxes
-        maximum += 500;
-        log.info("Sent {} exchanges in {}ms, with throttle rate of {} per 
{}ms. Calculated min {}ms and max {}ms", messageCount,
-                elapsedTimeMs, throttle, intervalMs, minimum,
-                maximum);
-
-        assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum 
+ "ms, was: " + elapsedTimeMs);
-        assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most 
" + maximum + "ms, was: " + elapsedTimeMs);
+    @Test
+    public void testFifo() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", 
"D", "E", "F", "G", "H");
+        sendBody("direct:fifo");
+        assertMockEndpointsSatisfied();
     }
 
-    private long sendMessagesAndAwaitDelivery(
+    @Test
+    public void testPermitReleaseOnException() throws Exception {
+        // verify that failed processing releases throttle permit
+        getMockEndpoint("mock:error").expectedBodiesReceived("A", "B", "C", 
"D", "E", "F", "G", "H");
+        sendBody("direct:release");
+        assertMockEndpointsSatisfied();
+    }
+
+    private void sendMessagesAndAwaitDelivery(
             final int messageCount, final String endpointUri, final int 
threadPoolSize, final MockEndpoint receivingEndpoint)
             throws InterruptedException {
         ExecutorService executor = 
Executors.newFixedThreadPool(threadPoolSize);
@@ -151,7 +140,6 @@ public class ThrottlerTest extends ContextTestSupport {
                 receivingEndpoint.expectedMessageCount(messageCount);
             }
 
-            long start = System.nanoTime();
             for (int i = 0; i < messageCount; i++) {
                 executor.execute(new Runnable() {
                     public void run() {
@@ -164,7 +152,6 @@ public class ThrottlerTest extends ContextTestSupport {
             if (receivingEndpoint != null) {
                 receivingEndpoint.assertIsSatisfied();
             }
-            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
         } finally {
             executor.shutdownNow();
         }
@@ -176,6 +163,7 @@ public class ThrottlerTest extends ContextTestSupport {
             throws InterruptedException {
         resultEndpoint.expectedMessageCount(messageCount);
 
+        max = 0;
         long start = System.nanoTime();
         for (int i = 0; i < messageCount; i++) {
             executor.execute(new Runnable() {
@@ -189,19 +177,12 @@ public class ThrottlerTest extends ContextTestSupport {
         // let's wait for the exchanges to arrive
         resultEndpoint.assertIsSatisfied();
         long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
-        assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
-    }
-
-    private long calculateMinimum(final long periodMs, final long 
throttleRate, final long messageCount) {
-        if (messageCount % throttleRate > 0) {
-            return (long) Math.floor((double) messageCount / (double) 
throttleRate) * periodMs;
-        } else {
-            return (long) (Math.floor((double) messageCount / (double) 
throttleRate) * periodMs) - periodMs;
-        }
+        assertTrue(max <= throttle);
     }
 
-    private long calculateMaximum(final long periodMs, final long 
throttleRate, final long messageCount) {
-        return ((long) Math.ceil((double) messageCount / (double) 
throttleRate)) * periodMs;
+    private void sendBody(String endpoint) {
+        Arrays.stream(new String[] { "A", "B", "C", "D", "E", "F", "G", "H" })
+                .forEach(b -> template.sendBody(endpoint, b));
     }
 
     @Override
@@ -211,21 +192,44 @@ public class ThrottlerTest extends ContextTestSupport {
 
                 
onException(ThrottlerRejectedExecutionException.class).handled(true).to("mock:error");
 
-                // START SNIPPET: ex
-                
from("seda:a").throttle(3).timePeriodMillis(1000).to("log:result", 
"mock:result");
-                // END SNIPPET: ex
-
-                
from("direct:a").throttle(5).timePeriodMillis(INTERVAL).to("log:result", 
"mock:result");
-
-                
from("direct:expressionConstant").throttle(constant(5)).timePeriodMillis(INTERVAL).to("log:result",
-                        "mock:result");
-
-                
from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result",
-                        "mock:result");
-
-                
from("direct:start").throttle(2).timePeriodMillis(1000).rejectExecution(true).to("log:result",
 "mock:result");
-
-                
from("direct:highThrottleRate").throttle(10000).timePeriodMillis(INTERVAL).to("mock:result");
+                from("direct:a").throttle(CONCURRENT_REQUESTS)
+                        .process(exchange -> {
+                            curr++;
+                        })
+                        .delay(INTERVAL)
+                        .process(exchange -> {
+                            max = Math.max(max, curr--);
+                        })
+                        .to("log:result", "mock:result");
+
+                
from("direct:expressionConstant").throttle(constant(CONCURRENT_REQUESTS))
+                        .process(exchange -> {
+                            curr++;
+                        })
+                        .delay(INTERVAL)
+                        .process(exchange -> {
+                            max = Math.max(max, curr--);
+                        })
+                        .to("log:result", "mock:result");
+
+                
from("direct:expressionHeader").throttle(header("throttleValue"))
+                        .process(exchange -> {
+                            curr++;
+                        })
+                        .delay(INTERVAL)
+                        .process(exchange -> {
+                            max = Math.max(max, curr--);
+                        })
+                        .to("log:result", "mock:result");
+
+                
from("direct:start").throttle(2).rejectExecution(true).delay(1000).to("log:result",
 "mock:result");
+
+                from("direct:fifo").throttle(1).delay(100).to("mock:result");
+
+                
from("direct:release").errorHandler(deadLetterChannel("mock:error")).throttle(1).delay(100)
+                        .process(exchange -> {
+                            throw new RuntimeException();
+                        }).to("mock:result");
             }
         };
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
index d030cffb74e..3922f069748 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -18,9 +18,10 @@ package org.apache.camel.processor;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
@@ -33,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Isolated
 public class ThrottlingGroupingTest extends ContextTestSupport {
     private static final int INTERVAL = 500;
-    private static final int MESSAGE_COUNT = 9;
-    private static final int TOLERANCE = 50;
+    private static final int MESSAGE_COUNT = 20;
+    private static final int CONCURRENT_REQUESTS = 2;
+    private Map<String, AtomicInteger> curr;
+    private static int max;
 
     @Test
     public void testGroupingWithSingleConstant() throws Exception {
@@ -73,64 +76,35 @@ public class ThrottlingGroupingTest extends 
ContextTestSupport {
     }
 
     @Test
-    public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() 
throws Exception {
-
+    public void 
testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() 
throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", 
MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(3);
-        resultEndpoint.setResultWaitTime(2000);
-
-        Map<String, Object> headers = new HashMap<>();
-        for (int i = 0; i < 9; i++) {
-            if (i % 2 == 0) {
-                headers.put("key", "1");
-            } else {
-                headers.put("key", "2");
-            }
-            template.sendBodyAndHeaders("seda:ga", "<message>" + i + 
"</message>", headers);
-        }
-
-        // lets pause to give the requests time to be processed
-        // to check that the throttle really does kick in
-        resultEndpoint.assertIsSatisfied();
+        sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:ga", 
CONCURRENT_REQUESTS, resultEndpoint);
     }
 
-    private void assertThrottlerTiming(
-            final long elapsedTimeMs, final int throttle, final int 
intervalMs, final int messageCount) {
-        // now assert that they have actually been throttled (use +/- 50 as
-        // slack)
-        long minimum = calculateMinimum(intervalMs, throttle, messageCount) - 
50;
-        long maximum = calculateMaximum(intervalMs, throttle, messageCount) + 
50;
-        // add 500 in case running on slow CI boxes
-        maximum += 500;
-        log.info("Sent {} exchanges in {}ms, with throttle rate of {} per 
{}ms. Calculated min {}ms and max {}ms", messageCount,
-                elapsedTimeMs, throttle, intervalMs, minimum,
-                maximum);
-
-        assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum 
+ "ms, was: " + elapsedTimeMs);
-        assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most 
" + maximum + "ms, was: " + elapsedTimeMs);
-    }
-
-    private long sendMessagesAndAwaitDelivery(
-            final int messageCount, final String endpointUri, final int 
threadPoolSize, final MockEndpoint receivingEndpoint)
+    private void sendMessagesAndAwaitDelivery(
+            final int messageCount, final String endpointUri, final int 
throttle, final MockEndpoint receivingEndpoint)
             throws InterruptedException {
-        ExecutorService executor = 
Executors.newFixedThreadPool(threadPoolSize);
+        max = 0;
+        curr = new ConcurrentHashMap<>();
+        // two throttle groups
+        curr.putIfAbsent("1", new AtomicInteger(0));
+        curr.putIfAbsent("2", new AtomicInteger(0));
+        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
         try {
             if (receivingEndpoint != null) {
                 receivingEndpoint.expectedMessageCount(messageCount);
             }
 
-            long start = System.nanoTime();
             for (int i = 0; i < messageCount; i++) {
-                executor.execute(new Runnable() {
-                    public void run() {
-                        Map<String, Object> headers = new HashMap<>();
-                        if (messageCount % 2 == 0) {
-                            headers.put("key", "1");
-                        } else {
-                            headers.put("key", "2");
-                        }
-                        template.sendBodyAndHeaders(endpointUri, 
"<message>payload</message>", headers);
+                int finalI = i;
+                executor.execute(() -> {
+                    Map<String, Object> headers = new HashMap<>();
+                    if (finalI % 2 == 0) {
+                        headers.put("key", "1");
+                    } else {
+                        headers.put("key", "2");
                     }
+                    template.sendBodyAndHeaders(endpointUri, 
"<message>payload</message>", headers);
                 });
             }
 
@@ -138,17 +112,10 @@ public class ThrottlingGroupingTest extends 
ContextTestSupport {
             if (receivingEndpoint != null) {
                 receivingEndpoint.assertIsSatisfied();
             }
-            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
         } finally {
             executor.shutdownNow();
         }
-    }
-
-    @Test
-    public void 
testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() 
throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", 
MockEndpoint.class);
-        long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, 
"direct:ga", MESSAGE_COUNT, resultEndpoint);
-        assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+        assertTrue(max <= throttle);
     }
 
     @Test
@@ -158,50 +125,39 @@ public class ThrottlingGroupingTest extends 
ContextTestSupport {
 
         ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
         try {
-            sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, 
INTERVAL, MESSAGE_COUNT);
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 
CONCURRENT_REQUESTS, MESSAGE_COUNT);
         } finally {
             executor.shutdownNow();
         }
     }
 
-    private long calculateMinimum(final long periodMs, final long 
throttleRate, final long messageCount) {
-        if (messageCount % throttleRate > 0) {
-            return (long) Math.floor((double) messageCount / (double) 
throttleRate) * periodMs;
-        } else {
-            return (long) (Math.floor((double) messageCount / (double) 
throttleRate) * periodMs) - periodMs;
-        }
-    }
-
-    private long calculateMaximum(final long periodMs, final long 
throttleRate, final long messageCount) {
-        return ((long) Math.ceil((double) messageCount / (double) 
throttleRate)) * periodMs;
-    }
-
     private void sendMessagesWithHeaderExpression(
-            final ExecutorService executor, final MockEndpoint resultEndpoint, 
final int throttle, final int intervalMs,
-            final int messageCount)
+            final ExecutorService executor, final MockEndpoint resultEndpoint, 
final int throttle, final int messageCount)
             throws InterruptedException {
         resultEndpoint.expectedMessageCount(messageCount);
 
-        long start = System.nanoTime();
+        max = 0;
+        curr = new ConcurrentHashMap<>();
+        // two throttle groups
+        curr.putIfAbsent("1", new AtomicInteger(0));
+        curr.putIfAbsent("2", new AtomicInteger(0));
         for (int i = 0; i < messageCount; i++) {
-            executor.execute(new Runnable() {
-                public void run() {
-                    Map<String, Object> headers = new HashMap<>();
-                    headers.put("throttleValue", throttle);
-                    if (messageCount % 2 == 0) {
-                        headers.put("key", "1");
-                    } else {
-                        headers.put("key", "2");
-                    }
-                    template.sendBodyAndHeaders("direct:gexpressionHeader", 
"<message>payload</message>", headers);
+            int finalI = i;
+            executor.execute(() -> {
+                Map<String, Object> headers = new HashMap<>();
+                headers.put("throttleValue", throttle);
+                if (finalI % 2 == 0) {
+                    headers.put("key", "1");
+                } else {
+                    headers.put("key", "2");
                 }
+                template.sendBodyAndHeaders("direct:gexpressionHeader", 
"<message>payload</message>", headers);
             });
         }
 
         // let's wait for the exchanges to arrive
         resultEndpoint.assertIsSatisfied();
-        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
-        assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
+        assertTrue(max <= throttle);
     }
 
     @Override
@@ -215,12 +171,24 @@ public class ThrottlingGroupingTest extends 
ContextTestSupport {
                 from("seda:b").throttle(header("max"), 2).to("mock:result2");
                 
from("seda:c").throttle(header("max")).correlationExpression(header("key")).to("mock:resultdynamic");
 
-                from("seda:ga").throttle(constant(3), 
header("key")).timePeriodMillis(1000).to("log:gresult", "mock:gresult");
-
-                from("direct:ga").throttle(constant(5), 
header("key")).timePeriodMillis(INTERVAL).to("log:gresult",
-                        "mock:gresult");
+                from("direct:ga").throttle(constant(CONCURRENT_REQUESTS), 
header("key"))
+                        .process(exchange -> {
+                            
curr.get(exchange.getMessage().getHeader("key")).getAndIncrement();
+                        })
+                        .delay(INTERVAL)
+                        .process(exchange -> {
+                            max = Math.max(max, 
curr.get(exchange.getMessage().getHeader("key")).getAndDecrement());
+                        })
+                        .to("log:gresult", "mock:gresult");
 
-                
from("direct:gexpressionHeader").throttle(header("throttleValue"), 
header("key")).timePeriodMillis(INTERVAL)
+                
from("direct:gexpressionHeader").throttle(header("throttleValue"), 
header("key"))
+                        .process(exchange -> {
+                            
curr.get(exchange.getMessage().getHeader("key")).getAndIncrement();
+                        })
+                        .delay(INTERVAL)
+                        .process(exchange -> {
+                            max = Math.max(max, 
curr.get(exchange.getMessage().getHeader("key")).getAndDecrement());
+                        })
                         .to("log:gresult", "mock:gresult");
             }
         };
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index 9acf6cc149e..95d5452693f 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -20,17 +20,11 @@ import org.apache.camel.api.management.ManagedAttribute;
 
 public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
 
-    @ManagedAttribute(description = "Maximum requests per period")
-    long getMaximumRequestsPerPeriod();
+    @ManagedAttribute(description = "Maximum concurrent requests")
+    long getMaximumConcurrentRequests();
 
-    @ManagedAttribute(description = "Maximum requests per period")
-    void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
-
-    @ManagedAttribute(description = "Time period in millis")
-    long getTimePeriodMillis();
-
-    @ManagedAttribute(description = "Time period in millis")
-    void setTimePeriodMillis(long timePeriodMillis);
+    @ManagedAttribute(description = "Maximum concurrent requests")
+    void setMaximumConcurrentRequests(long maximumConcurrentRequests);
 
     @ManagedAttribute(description = "Enables asynchronous delay which means 
the thread will not block while delaying")
     Boolean isAsyncDelayed();
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
index 977d33cfdac..2abb6a14f16 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
@@ -38,23 +38,13 @@ public class ManagedThrottler extends ManagedProcessor 
implements ManagedThrottl
     }
 
     @Override
-    public long getMaximumRequestsPerPeriod() {
-        return throttler.getCurrentMaximumRequestsPerPeriod();
+    public long getMaximumConcurrentRequests() {
+        return throttler.getCurrentMaximumConcurrentRequests();
     }
 
     @Override
-    public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
-        
throttler.setMaximumRequestsPerPeriodExpression(constant(maximumRequestsPerPeriod));
-    }
-
-    @Override
-    public long getTimePeriodMillis() {
-        return throttler.getTimePeriodMillis();
-    }
-
-    @Override
-    public void setTimePeriodMillis(long timePeriodMillis) {
-        throttler.setTimePeriodMillis(timePeriodMillis);
+    public void setMaximumConcurrentRequests(long maximumConcurrentRequests) {
+        
throttler.setMaximumConcurrentRequestsExpression(constant(maximumConcurrentRequests));
     }
 
     @Override
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 7e754f0b915..d1be4aeaeb0 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -81,15 +81,13 @@ public class ManagedThrottlerTest extends 
ManagementTestSupport {
         Long completed = (Long) mbeanServer.getAttribute(routeName, 
"ExchangesCompleted");
         assertEquals(10, completed.longValue());
 
-        Long timePeriod = (Long) mbeanServer.getAttribute(throttlerName, 
"TimePeriodMillis");
-        assertEquals(250, timePeriod.longValue());
-
         Long total = (Long) mbeanServer.getAttribute(routeName, 
"TotalProcessingTime");
 
-        assertTrue(total < 1000, "Should take at most 1.0 sec: was " + total);
+        // 10 * delay (100) + tolerance (200)
+        assertTrue(total < 1200, "Should take at most 1.2 sec: was " + total);
 
         // change the throttler using JMX
-        mbeanServer.setAttribute(throttlerName, new 
Attribute("MaximumRequestsPerPeriod", (long) 2));
+        mbeanServer.setAttribute(throttlerName, new 
Attribute("MaximumConcurrentRequests", (long) 2));
 
         // reset the counters
         mbeanServer.invoke(routeName, "reset", null, null);
@@ -99,15 +97,16 @@ public class ManagedThrottlerTest extends 
ManagementTestSupport {
             template.sendBody("direct:start", "Message " + i);
         }
 
-        Long period = (Long) mbeanServer.getAttribute(throttlerName, 
"MaximumRequestsPerPeriod");
-        assertNotNull(period);
-        assertEquals(2, period.longValue());
+        Long requests = (Long) mbeanServer.getAttribute(throttlerName, 
"MaximumConcurrentRequests");
+        assertNotNull(requests);
+        assertEquals(2, requests.longValue());
 
         completed = (Long) mbeanServer.getAttribute(routeName, 
"ExchangesCompleted");
         assertEquals(10, completed.longValue());
         total = (Long) mbeanServer.getAttribute(routeName, 
"TotalProcessingTime");
 
-        assertTrue(total > 1000, "Should be around 1 sec now: was " + total);
+        // 10 * delay (100) + tolerance (200)
+        assertTrue(total < 1200, "Should take at most 1.2 sec: was " + total);
     }
 
     @DisabledOnOs(OS.WINDOWS)
@@ -269,19 +268,20 @@ public class ManagedThrottlerTest extends 
ManagementTestSupport {
             public void configure() throws Exception {
                 from("direct:start").id("route1")
                         .to("log:foo")
-                        .throttle(10).timePeriodMillis(250).id("mythrottler")
+                        .throttle(10).id("mythrottler")
+                        .delay(100)
                         .to("mock:result");
 
                 from("seda:throttleCount").id("route2")
-                        .throttle(1).timePeriodMillis(250).id("mythrottler2")
+                        .throttle(1).id("mythrottler2").delay(250)
                         .to("mock:end");
 
                 from("seda:throttleCountAsync").id("route3")
-                        
.throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3")
+                        
.throttle(1).asyncDelayed().id("mythrottler3").delay(250)
                         .to("mock:endAsync");
 
                 from("seda:throttleCountAsyncException").id("route4")
-                        
.throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4")
+                        
.throttle(1).asyncDelayed().id("mythrottler4").delay(250)
                         .to("mock:endAsyncException")
                         .process(exchange -> {
                             throw new RuntimeException("Fail me");
@@ -289,21 +289,21 @@ public class ManagedThrottlerTest extends 
ManagementTestSupport {
                 
from("seda:throttleCountRejectExecutionCallerRuns").id("route5")
                         
.onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
                         .throttle(1)
-                        .timePeriodMillis(250)
                         .asyncDelayed()
                         .executorService(badService)
                         .callerRunsWhenRejected(true)
                         .id("mythrottler5")
+                        .delay(250)
                         .to("mock:endAsyncRejectCallerRuns");
 
                 from("seda:throttleCountRejectExecution").id("route6")
                         
.onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
                         .throttle(1)
-                        .timePeriodMillis(250)
                         .asyncDelayed()
                         .executorService(badService)
                         .callerRunsWhenRejected(false)
                         .id("mythrottler6")
+                        .delay(250)
                         .to("mock:endAsyncReject");
             }
         };
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index c4557ab90b5..d5ea55476bd 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -1457,7 +1457,6 @@ public class ModelParser extends BaseParser {
                 case "callerRunsWhenRejected": 
def.setCallerRunsWhenRejected(val); break;
                 case "executorService": def.setExecutorService(val); break;
                 case "rejectExecution": def.setRejectExecution(val); break;
-                case "timePeriodMillis": def.setTimePeriodMillis(val); break;
                 default: return 
processorDefinitionAttributeHandler().accept(def, key, val);
             }
             return true;
diff --git 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
index ab3770e1d5e..0b1ae97db63 100644
--- 
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
+++ 
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
@@ -2335,7 +2335,6 @@ public class ModelWriter extends BaseWriter {
             throws IOException {
         startElement(name);
         doWriteProcessorDefinitionAttributes(def);
-        doWriteAttribute("timePeriodMillis", def.getTimePeriodMillis());
         doWriteAttribute("rejectExecution", def.getRejectExecution());
         doWriteAttribute("callerRunsWhenRejected", 
def.getCallerRunsWhenRejected());
         doWriteAttribute("executorService", def.getExecutorService());
diff --git 
a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
 
b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
index 06bd9313014..f9de1b056d8 100644
--- 
a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
+++ 
b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
@@ -2335,7 +2335,6 @@ public class ModelWriter extends BaseWriter {
             throws IOException {
         startElement(name);
         doWriteProcessorDefinitionAttributes(def);
-        doWriteAttribute("timePeriodMillis", def.getTimePeriodMillis());
         doWriteAttribute("rejectExecution", def.getRejectExecution());
         doWriteAttribute("callerRunsWhenRejected", 
def.getCallerRunsWhenRejected());
         doWriteAttribute("executorService", def.getExecutorService());
diff --git 
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
index 931c33a16af..f97b3b25b7c 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
@@ -18,4 +18,12 @@ When using the `SyncCommitManager` then the offset will be 
committed so that the
 the behavior described in the documentation.
 
 When using the `AsyncCommitManager` then the offset will be committed so that 
the payload is continually retried. This was
-the behavior described in the documentation.
\ No newline at end of file
+the behavior described in the documentation.
+
+=== throttle
+
+Throttle now uses the number of concurrent requests as the throttling measure 
instead of the number of requests
+per period.
+
+The `throttle` parameter now specifies the maximum number of concurrent 
requests,
+and there is no longer support for the `timePeriodMillis` option.
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index e6bc1336d0d..8fec7a9903c 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -17238,8 +17238,7 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     @YamlProperty(name = "expression", type = 
"object:org.apache.camel.model.language.ExpressionDefinition", description = 
"Expression to configure the maximum number of messages to throttle per 
request", displayName = "Expression", oneOf = "expression"),
                     @YamlProperty(name = "id", type = "string", description = 
"Sets the id of this node", displayName = "Id"),
                     @YamlProperty(name = "inherit-error-handler", type = 
"boolean"),
-                    @YamlProperty(name = "reject-execution", type = "boolean", 
description = "Whether or not throttler throws the 
ThrottlerRejectedExecutionException when the exchange exceeds the request limit 
Is by default false", displayName = "Reject Execution"),
-                    @YamlProperty(name = "time-period-millis", type = 
"string", defaultValue = "1000", description = "Sets the time period during 
which the maximum request count is valid for", displayName = "Time Period 
Millis")
+                    @YamlProperty(name = "reject-execution", type = "boolean", 
description = "Whether or not throttler throws the 
ThrottlerRejectedExecutionException when the exchange exceeds the request limit 
Is by default false", displayName = "Reject Execution")
             }
     )
     public static class ThrottleDefinitionDeserializer extends 
YamlDeserializerBase<ThrottleDefinition> {
@@ -17296,11 +17295,6 @@ public final class ModelDeserializers extends 
YamlDeserializerSupport {
                     target.setRejectExecution(val);
                     break;
                 }
-                case "time-period-millis": {
-                    String val = asText(node);
-                    target.setTimePeriodMillis(val);
-                    break;
-                }
                 case "id": {
                     String val = asText(node);
                     target.setId(val);
diff --git 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index 82cfe6f5ee7..da067a148aa 100644
--- 
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++ 
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -6538,12 +6538,6 @@
             "title" : "Reject Execution",
             "description" : "Whether or not throttler throws the 
ThrottlerRejectedExecutionException when the exchange exceeds the request limit 
Is by default false"
           },
-          "timePeriodMillis" : {
-            "type" : "string",
-            "title" : "Time Period Millis",
-            "description" : "Sets the time period during which the maximum 
request count is valid for",
-            "default" : "1000"
-          },
           "constant" : { },
           "csimple" : { },
           "datasonnet" : { },
diff --git a/etc/eclipse/camel_xml_templates.xml 
b/etc/eclipse/camel_xml_templates.xml
index 6af5acce144..e283c357d9e 100644
--- a/etc/eclipse/camel_xml_templates.xml
+++ b/etc/eclipse/camel_xml_templates.xml
@@ -104,7 +104,7 @@
 &lt;/route&gt;
 </template><template autoinsert="true" context="xml_all" deleted="false" 
description="Creates a Throttler" enabled="true" 
name="camel_throttler">&lt;route&gt;
   &lt;from uri="from_uri" /&gt;
-  &lt;throttle maximumRequestsPerPeriod="number_of_messages" 
timePeriodMillis="milliseconds"&gt;
+  &lt;throttle maximumConcurrentRequests="number_of_messages"&gt;
     &lt;to uri="to_uri" /&gt;
   &lt;/throttle&gt;
 &lt;/route&gt;


Reply via email to