This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 61d3552429a Camel 16099: throttler EIP - throttle on concurrent requests (#12138) 61d3552429a is described below commit 61d3552429af0cb3688504cdec793b4436f33d4c Author: Jono Morris <j...@apache.org> AuthorDate: Sat Nov 25 19:49:34 2023 +1300 Camel 16099: throttler EIP - throttle on concurrent requests (#12138) * CAMEL-16099 throttle on number of concurrent requests * CAMEL-16099 update unit tests * CAMEL-16099 generated changes * CAMEL-16099 update throttle-group tests * CAMEL-16099 add note to 4_3 upgrade guide --- .../org/apache/camel/catalog/models/throttle.json | 13 +- .../apache/camel/catalog/schemas/camel-spring.xsd | 9 - .../resources/org/apache/camel/model/throttle.json | 13 +- .../apache/camel/model/ProcessorDefinition.java | 57 ++++--- .../org/apache/camel/model/ThrottleDefinition.java | 72 ++------ .../java/org/apache/camel/processor/Throttler.java | 187 +++++++++------------ .../org/apache/camel/reifier/ThrottleReifier.java | 5 +- .../ThrottlerAsyncDelayedCallerRunsTest.java | 2 +- .../camel/processor/ThrottlerAsyncDelayedTest.java | 4 +- .../apache/camel/processor/ThrottlerDslTest.java | 4 +- .../camel/processor/ThrottlerMethodCallTest.java | 2 +- .../org/apache/camel/processor/ThrottlerTest.java | 170 ++++++++++--------- .../camel/processor/ThrottlingGroupingTest.java | 150 +++++++---------- .../management/mbean/ManagedThrottlerMBean.java | 14 +- .../camel/management/mbean/ManagedThrottler.java | 18 +- .../camel/management/ManagedThrottlerTest.java | 30 ++-- .../java/org/apache/camel/xml/in/ModelParser.java | 1 - .../java/org/apache/camel/xml/out/ModelWriter.java | 1 - .../org/apache/camel/yaml/out/ModelWriter.java | 1 - .../ROOT/pages/camel-4x-upgrade-guide-4_3.adoc | 10 +- .../dsl/yaml/deserializers/ModelDeserializers.java | 8 +- .../generated/resources/schema/camelYamlDsl.json | 6 - etc/eclipse/camel_xml_templates.xml | 2 +- 23 files changed, 327 insertions(+), 452 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json index c9eaae7ee68..eda460dde65 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json @@ -15,12 +15,11 @@ "expression": { "index": 0, "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "sec [...] "correlationExpression": { "index": 1, "kind": "expression", "displayName": "Correlation Expression", "required": false, "type": "object", "javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, [...] "executorService": { "index": 2, "kind": "attribute", "displayName": "Executor Service", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom thread pool (ScheduledExecutorService) by the throttler." }, - "timePeriodMillis": { "index": 3, "kind": "attribute", "displayName": "Time Period Millis", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1000", "description": "Sets the time period during which the maximum request count is valid for" }, - "asyncDelayed": { "index": 4, "kind": "attribute", "displayName": "Async Delayed", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables asynchronous delay which means the thread will not block while delaying." }, - "callerRunsWhenRejected": { "index": 5, "kind": "attribute", "displayName": "Caller Runs When Rejected", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true" }, - "rejectExecution": { "index": 6, "kind": "attribute", "displayName": "Reject Execution", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false" }, - "disabled": { "index": 7, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, - "id": { "index": 8, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, - "description": { "index": 9, "kind": "element", "displayName": "Description", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } + "asyncDelayed": { "index": 3, "kind": "attribute", "displayName": "Async Delayed", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables asynchronous delay which means the thread will not block while delaying." }, + "callerRunsWhenRejected": { "index": 4, "kind": "attribute", "displayName": "Caller Runs When Rejected", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true" }, + "rejectExecution": { "index": 5, "kind": "attribute", "displayName": "Reject Execution", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false" }, + "disabled": { "index": 6, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, + "id": { "index": 7, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, + "description": { "index": 8, "kind": "element", "displayName": "Description", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } } } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd index ecdc5ffb115..cb3f2bc85ea 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd @@ -13086,15 +13086,6 @@ correlation key is throttled together. <xs:documentation xml:lang="en"> <![CDATA[ To use a custom thread pool (ScheduledExecutorService) by the throttler. -]]> - </xs:documentation> - </xs:annotation> - </xs:attribute> - <xs:attribute name="timePeriodMillis" type="xs:string"> - <xs:annotation> - <xs:documentation xml:lang="en"> -<![CDATA[ -Sets the time period during which the maximum request count is valid for. Default value: 1000 ]]> </xs:documentation> </xs:annotation> diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json index c9eaae7ee68..eda460dde65 100644 --- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json +++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json @@ -15,12 +15,11 @@ "expression": { "index": 0, "kind": "expression", "displayName": "Expression", "required": true, "type": "object", "javaType": "org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, "autowired": false, "sec [...] "correlationExpression": { "index": 1, "kind": "expression", "displayName": "Correlation Expression", "required": false, "type": "object", "javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [ "constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize" ], "deprecated": false, [...] "executorService": { "index": 2, "kind": "attribute", "displayName": "Executor Service", "label": "advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom thread pool (ScheduledExecutorService) by the throttler." }, - "timePeriodMillis": { "index": 3, "kind": "attribute", "displayName": "Time Period Millis", "required": false, "type": "duration", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "1000", "description": "Sets the time period during which the maximum request count is valid for" }, - "asyncDelayed": { "index": 4, "kind": "attribute", "displayName": "Async Delayed", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables asynchronous delay which means the thread will not block while delaying." }, - "callerRunsWhenRejected": { "index": 5, "kind": "attribute", "displayName": "Caller Runs When Rejected", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true" }, - "rejectExecution": { "index": 6, "kind": "attribute", "displayName": "Reject Execution", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false" }, - "disabled": { "index": 7, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, - "id": { "index": 8, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, - "description": { "index": 9, "kind": "element", "displayName": "Description", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } + "asyncDelayed": { "index": 3, "kind": "attribute", "displayName": "Async Delayed", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Enables asynchronous delay which means the thread will not block while delaying." }, + "callerRunsWhenRejected": { "index": 4, "kind": "attribute", "displayName": "Caller Runs When Rejected", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true" }, + "rejectExecution": { "index": 5, "kind": "attribute", "displayName": "Reject Execution", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false" }, + "disabled": { "index": 6, "kind": "attribute", "displayName": "Disabled", "label": "advanced", "required": false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to disable this EIP from the route during build time. Once an EIP has been disabled then it cannot be enabled later at runtime." }, + "id": { "index": 7, "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" }, + "description": { "index": 8, "kind": "element", "displayName": "Description", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" } } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 8f4e1920f4f..55b5a9c09b8 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -1765,14 +1765,14 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * that a specific endpoint does not get overloaded, or that we don't exceed an agreed SLA with some external * service. * <p/> - * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 will default ensure at - * most 10 messages per second. + * Setting the maximumConcurrentRequest will ensure that no more than the specified number of messages will flow to + * the endpoint at any given time. * - * @param maximumRequestCount the maximum messages - * @return the builder + * @param maximumConcurrentRequests the maximum number of concurrent messages + * @return the builder */ - public ThrottleDefinition throttle(long maximumRequestCount) { - return throttle(ExpressionBuilder.constantExpression(maximumRequestCount)); + public ThrottleDefinition throttle(long maximumConcurrentRequests) { + return throttle(ExpressionBuilder.constantExpression(maximumConcurrentRequests)); } /** @@ -1780,14 +1780,14 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * that a specific endpoint does not get overloaded, or that we don't exceed an agreed SLA with some external * service. * <p/> - * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 will default ensure at - * most 10 messages per second. + * Setting the maximumConcurrentRequest will ensure that no more than the specified number of messages will flow to + * the endpoint at any given time. * - * @param maximumRequestCount an expression to calculate the maximum request count - * @return the builder + * @param maximumConcurrentRequests an expression to calculate the maximum concurrent request count + * @return the builder */ - public ThrottleDefinition throttle(Expression maximumRequestCount) { - ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount); + public ThrottleDefinition throttle(Expression maximumConcurrentRequests) { + ThrottleDefinition answer = new ThrottleDefinition(maximumConcurrentRequests); addOutput(answer); return answer; } @@ -1799,17 +1799,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * based on the key expression to group exchanges. This will make key-based throttling instead of overall * throttling. * <p/> - * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 will default ensure at - * most 10 messages per second. + * Setting the maximumConcurrentRequest will ensure that no more than the specified number of messages will flow to + * the endpoint at any given time. * - * @param maximumRequestCount an expression to calculate the maximum request count - * @param correlationExpressionKey is a correlation key that can throttle by the given key instead of overall - * throttling - * @return the builder + * @param maximumConcurrentRequests an expression to calculate the maximum concurrent request count + * @param correlationExpressionKey is a correlation key that can throttle by the given key instead of overall + * throttling + * @return the builder */ - public ThrottleDefinition throttle(Expression maximumRequestCount, long correlationExpressionKey) { + public ThrottleDefinition throttle(Expression maximumConcurrentRequests, long correlationExpressionKey) { ThrottleDefinition answer - = new ThrottleDefinition(maximumRequestCount, ExpressionBuilder.constantExpression(correlationExpressionKey)); + = new ThrottleDefinition( + maximumConcurrentRequests, ExpressionBuilder.constantExpression(correlationExpressionKey)); addOutput(answer); return answer; } @@ -1821,16 +1822,16 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * based on the key expression to group exchanges. This will make key-based throttling instead of overall * throttling. * <p/> - * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 will default ensure at - * most 10 messages per second. + * Setting the maximumConcurrentRequest will ensure that no more than the specified number of messages will flow to + * the endpoint at any given time. * - * @param maximumRequestCount an expression to calculate the maximum request count - * @param correlationExpressionKey is a correlation key as an expression that can throttle by the given key instead - * of overall throttling - * @return the builder + * @param maximumConcurrentRequests an expression to calculate the maximum concurrent request count + * @param correlationExpressionKey is a correlation key as an expression that can throttle by the given key + * instead of overall throttling + * @return the builder */ - public ThrottleDefinition throttle(Expression maximumRequestCount, Expression correlationExpressionKey) { - ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount, correlationExpressionKey); + public ThrottleDefinition throttle(Expression maximumConcurrentRequests, Expression correlationExpressionKey) { + ThrottleDefinition answer = new ThrottleDefinition(maximumConcurrentRequests, correlationExpressionKey); addOutput(answer); return answer; } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java index 8c0b78b3e78..2a7611997b4 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java @@ -49,9 +49,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic @Metadata(label = "advanced", javaType = "java.util.concurrent.ExecutorService") private String executorService; @XmlAttribute - @Metadata(defaultValue = "1000", javaType = "java.time.Duration") - private String timePeriodMillis; - @XmlAttribute @Metadata(label = "advanced", javaType = "java.lang.Boolean") private String asyncDelayed; @XmlAttribute @@ -64,16 +61,16 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic public ThrottleDefinition() { } - public ThrottleDefinition(Expression maximumRequestsPerPeriod) { - super(maximumRequestsPerPeriod); + public ThrottleDefinition(Expression maximumConcurrentRequests) { + super(maximumConcurrentRequests); } - public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) { - this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression); + public ThrottleDefinition(Expression maximumConcurrentRequests, Expression correlationExpression) { + this(ExpressionNodeHelper.toExpressionDefinition(maximumConcurrentRequests), correlationExpression); } - private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) { - super(maximumRequestsPerPeriod); + private ThrottleDefinition(ExpressionDefinition maximumConcurrentRequests, Expression correlationExpression) { + super(maximumConcurrentRequests); ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); @@ -82,11 +79,7 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic @Override public String toString() { - return "Throttle[" + description() + "]"; - } - - protected String description() { - return getExpression() + " request per " + getTimePeriodMillis() + " millis"; + return "Throttle[" + getExpression() + "]"; } @Override @@ -96,53 +89,32 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic @Override public String getLabel() { - return "throttle[" + description() + "]"; + return "throttle[" + getExpression() + "]"; } // Fluent API // ------------------------------------------------------------------------- /** - * Sets the time period during which the maximum request count is valid for - * - * @param timePeriodMillis period in millis - * @return the builder - */ - public ThrottleDefinition timePeriodMillis(long timePeriodMillis) { - return timePeriodMillis(Long.toString(timePeriodMillis)); - } - - /** - * Sets the time period during which the maximum request count is valid for - * - * @param timePeriodMillis period in millis - * @return the builder - */ - public ThrottleDefinition timePeriodMillis(String timePeriodMillis) { - setTimePeriodMillis(timePeriodMillis); - return this; - } - - /** - * Sets the time period during which the maximum request count per period + * Sets the maximum number of concurrent requests * - * @param maximumRequestsPerPeriod the maximum request count number per time period - * @return the builder + * @param maximumConcurrentRequests the maximum number of concurrent requests + * @return the builder */ - public ThrottleDefinition maximumRequestsPerPeriod(long maximumRequestsPerPeriod) { + public ThrottleDefinition maximumConcurrentRequests(long maximumConcurrentRequests) { setExpression( - ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod))); + ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumConcurrentRequests))); return this; } /** - * Sets the time period during which the maximum request count per period + * Sets the number of concurrent requests * - * @param maximumRequestsPerPeriod the maximum request count number per time period - * @return the builder + * @param maximumConcurrentRequests the maximum number of concurrent requests + * @return the builder */ - public ThrottleDefinition maximumRequestsPerPeriod(String maximumRequestsPerPeriod) { + public ThrottleDefinition maximumConcurrentRequests(String maximumConcurrentRequests) { setExpression( - ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.simpleExpression(maximumRequestsPerPeriod))); + ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.simpleExpression(maximumConcurrentRequests))); return this; } @@ -297,14 +269,6 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic super.setExpression(expression); } - public String getTimePeriodMillis() { - return timePeriodMillis; - } - - public void setTimePeriodMillis(String timePeriodMillis) { - this.timePeriodMillis = timePeriodMillis; - } - public String getAsyncDelayed() { return asyncDelayed; } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java index a7854805cde..b74b145914e 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java @@ -18,11 +18,10 @@ package org.apache.camel.processor; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -34,6 +33,7 @@ import org.apache.camel.RuntimeExchangeException; import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.AsyncProcessorSupport; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; @@ -41,20 +41,17 @@ import org.slf4j.LoggerFactory; /** * A <a href="http://camel.apache.org/throttler.html">Throttler</a> will set a limit on the maximum number of message - * exchanges which can be sent to a processor within a specific time period. + * exchanges which can be sent to a processor concurrently. * <p/> - * This pattern can be extremely useful if you have some external system which meters access; such as only allowing 100 - * requests per second; or if huge load can cause a particular system to malfunction or to reduce its throughput you + * This pattern can be extremely useful if you have some external system which meters access; such as only allowing 10 + * concurrent requests; or if huge load can cause a particular system to malfunction or to reduce its throughput you * might want to introduce some throttling. * * This throttle implementation is thread-safe and is therefore safe to be used by multiple concurrent threads in a * single route. * - * The throttling mechanism is a DelayQueue with maxRequestsPerPeriod permits on it. Each permit is set to be delayed by - * timePeriodMillis (except when the throttler is initialized or the throttle rate increased, then there is no delay for - * those permits). Callers trying to acquire a permit from the DelayQueue will block if necessary. The end result is a - * rolling window of time. Where from the callers point of view in the last timePeriodMillis no more than - * maxRequestsPerPeriod have been allowed to be acquired. + * The throttling mechanism is a Semaphore with maxConcurrentRequests permits on it. Callers trying to acquire a permit + * will block if necessary when maxConcurrentRequests permits have been acquired. */ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware { @@ -64,6 +61,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp"; private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState"; + private static final long CLEAN_PERIOD = 1000L * 10; private enum State { SYNC, @@ -74,34 +72,24 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa private final CamelContext camelContext; private final ScheduledExecutorService asyncExecutor; private final boolean shutdownAsyncExecutor; - - private volatile long timePeriodMillis; - private final long cleanPeriodMillis; private String id; private String routeId; - private Expression maxRequestsPerPeriodExpression; + private Expression maxConcurrentRequestsExpression; private boolean rejectExecution; private boolean asyncDelayed; private boolean callerRunsWhenRejected = true; private final Expression correlationExpression; private final Map<String, ThrottlingState> states = new ConcurrentHashMap<>(); - public Throttler(final CamelContext camelContext, final Expression maxRequestsPerPeriodExpression, - final long timePeriodMillis, + public Throttler(final CamelContext camelContext, final Expression maxConcurrentRequestsExpression, final ScheduledExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) { this.camelContext = camelContext; this.rejectExecution = rejectExecution; this.shutdownAsyncExecutor = shutdownAsyncExecutor; - ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression"); - this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; - - if (timePeriodMillis <= 0) { - throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis); - } - this.timePeriodMillis = timePeriodMillis; - this.cleanPeriodMillis = timePeriodMillis * 10; + ObjectHelper.notNull(maxConcurrentRequestsExpression, "maxConcurrentRequestsExpression"); + this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression; this.asyncExecutor = asyncExecutor; this.correlationExpression = correlation; } @@ -127,16 +115,12 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa key = correlationExpression.evaluate(exchange, String.class); } ThrottlingState throttlingState = states.computeIfAbsent(key, ThrottlingState::new); - throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange); - - ThrottlePermit permit = throttlingState.poll(); + throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange); - if (permit == null) { + if (!throttlingState.tryAcquire(exchange)) { if (isRejectExecution()) { throw new ThrottlerRejectedExecutionException( - "Exceeded the max throttle rate of " - + throttlingState.getThrottleRate() + " within " - + timePeriodMillis + "ms"); + "Exceeded the max throttle rate of " + throttlingState.getThrottleRate()); } else { // delegate to async pool if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) { @@ -154,12 +138,10 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa if (LOG.isTraceEnabled()) { start = System.currentTimeMillis(); } - permit = throttlingState.take(); + throttlingState.acquire(exchange); if (LOG.isTraceEnabled()) { elapsed = System.currentTimeMillis() - start; } - throttlingState.enqueue(permit, exchange); - if (state == State.ASYNC) { if (LOG.isTraceEnabled()) { long queuedTime = start - queuedStart; @@ -175,8 +157,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa } } } else { - throttlingState.enqueue(permit, exchange); - + // permit acquired if (state == State.ASYNC) { if (LOG.isTraceEnabled()) { long queuedTime = System.currentTimeMillis() - queuedStart; @@ -214,7 +195,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa } /** - * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission and + * Delegate blocking to an asyncExecutor. Except if the executor rejects the submission and * isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not before changing * the exchange state to stop any recursion. */ @@ -225,8 +206,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, System.currentTimeMillis()); } exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC); - long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS); - asyncExecutor.schedule(() -> process(exchange, callback), delay, TimeUnit.NANOSECONDS); + asyncExecutor.submit(() -> process(exchange, callback)); return false; } catch (final RejectedExecutionException e) { if (isCallerRunsWhenRejected()) { @@ -259,68 +239,82 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa private class ThrottlingState { private final String key; - private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>(); private final AtomicReference<ScheduledFuture<?>> cleanFuture = new AtomicReference<>(); private volatile int throttleRate; + private WrappedSemaphore semaphore; ThrottlingState(String key) { this.key = key; + semaphore = new WrappedSemaphore(); } public int getThrottleRate() { return throttleRate; } - public ThrottlePermit poll() { - return delayQueue.poll(); + public void clean() { + states.remove(key); } - public ThrottlePermit peek() { - return delayQueue.peek(); + public boolean tryAcquire(Exchange exchange) { + boolean acquired = semaphore.tryAcquire(); + if (acquired) { + addSynchronization(exchange); + } + return acquired; } - public ThrottlePermit take() throws InterruptedException { - return delayQueue.take(); + public void acquire(Exchange exchange) throws InterruptedException { + semaphore.acquire(); + addSynchronization(exchange); } - public void clean() { - states.remove(key); + private void addSynchronization(final Exchange exchange) { + exchange.getExchangeExtension().addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + release(exchange); + } + + @Override + public void onFailure(Exchange exchange) { + release(exchange); + } + }); } /** - * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now. + * Returns a permit. */ - public void enqueue(final ThrottlePermit permit, final Exchange exchange) { - permit.setDelayMs(getTimePeriodMillis()); - delayQueue.put(permit); + public void release(final Exchange exchange) { + semaphore.release(); try { - ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, cleanPeriodMillis, TimeUnit.MILLISECONDS); + ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, CLEAN_PERIOD, TimeUnit.MILLISECONDS); ScheduledFuture<?> prev = cleanFuture.getAndSet(next); if (prev != null) { prev.cancel(false); } - // try and incur the least amount of overhead while releasing permits back to the queue if (LOG.isTraceEnabled()) { LOG.trace("Permit released, for exchangeId: {}", exchange.getExchangeId()); } } catch (RejectedExecutionException e) { - LOG.debug("Throttling queue cleaning rejected", e); + LOG.debug("Throttle cleaning rejected", e); } } /** - * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down. + * Evaluates the maxConcurrentRequestsExpression and adjusts the throttle rate up or down. */ - public synchronized void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception { - Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class); + public synchronized void calculateAndSetMaxConcurrentRequestsExpression(final Exchange exchange) throws Exception { + Integer newThrottle = maxConcurrentRequestsExpression.evaluate(exchange, Integer.class); if (newThrottle != null && newThrottle < 0) { - throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle); + throw new IllegalStateException("The maximumConcurrentRequests must be a positive number, was: " + newThrottle); } if (newThrottle == null && throttleRate == 0) { throw new RuntimeExchangeException( - "The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, + "The maxConcurrentRequestsExpression was evaluated as null: " + maxConcurrentRequestsExpression, exchange); } @@ -331,14 +325,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa int delta = throttleRate - newThrottle; // discard any permits that are needed to decrease throttling - while (delta > 0) { - delayQueue.take(); - delta--; - if (LOG.isTraceEnabled()) { - LOG.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", - exchange.getExchangeId()); - } - } + semaphore.reducePermits(delta); if (LOG.isDebugEnabled()) { LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); @@ -347,9 +334,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa // increase } else if (newThrottle > throttleRate) { int delta = newThrottle - throttleRate; - for (int i = 0; i < delta; i++) { - delayQueue.put(new ThrottlePermit(-1)); - } + semaphore.increasePermits(delta); if (throttleRate == 0) { if (LOG.isDebugEnabled()) { LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, @@ -368,28 +353,29 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa } } - /** - * Permit that implements the Delayed interface needed by DelayQueue. - */ - private static class ThrottlePermit implements Delayed { - private volatile long scheduledTime; - - ThrottlePermit(final long delayMs) { - setDelayMs(delayMs); + // extend Semaphore so we can reduce permits if required + private class WrappedSemaphore extends Semaphore { + public WrappedSemaphore() { + super(0, true); } - public void setDelayMs(final long delayMs) { - this.scheduledTime = System.currentTimeMillis() + delayMs; + public boolean tryAcquire() { + try { + // honours fairness setting + return super.tryAcquire(0L, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + return false; + } } - @Override - public long getDelay(final TimeUnit unit) { - return unit.convert(scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + // decrease throttling + public void reducePermits(int n) { + super.reducePermits(n); } - @Override - public int compareTo(final Delayed o) { - return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + // increase throttling + public void increasePermits(int n) { + super.release(n); } } @@ -440,36 +426,25 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa /** * Sets the maximum number of requests per time period expression */ - public void setMaximumRequestsPerPeriodExpression(Expression maxRequestsPerPeriodExpression) { - this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; + public void setMaximumConcurrentRequestsExpression(Expression maxConcurrentRequestsExpression) { + this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression; } - public Expression getMaximumRequestsPerPeriodExpression() { - return maxRequestsPerPeriodExpression; + public Expression getMaximumConcurrentRequests() { + return maxConcurrentRequestsExpression; } /** - * Gets the current maximum request per period value. If it is grouped throttling applied with correlationExpression - * than the max per period within the group will return + * Gets the current maximum request. If it is grouped throttling applied with correlationExpression then the max + * within the group will return */ - public int getCurrentMaximumRequestsPerPeriod() { + public int getCurrentMaximumConcurrentRequests() { return states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0); } - /** - * Sets the time period during which the maximum number of requests apply - */ - public void setTimePeriodMillis(final long timePeriodMillis) { - this.timePeriodMillis = timePeriodMillis; - } - - public long getTimePeriodMillis() { - return timePeriodMillis; - } - @Override public String getTraceLabel() { - return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]"; + return "throttle[" + maxConcurrentRequestsExpression + "]"; } @Override diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java index 20dc0bec3ef..a046d0bf1c4 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java @@ -37,9 +37,6 @@ public class ThrottleReifier extends ExpressionReifier<ThrottleDefinition> { boolean shutdownThreadPool = willCreateNewThreadPool(definition, true); ScheduledExecutorService threadPool = getConfiguredScheduledExecutorService("Throttle", definition, true); - // should be default 1000 millis - long period = parseDuration(definition.getTimePeriodMillis(), 1000L); - // max requests per period is mandatory Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(); if (maxRequestsExpression == null) { @@ -53,7 +50,7 @@ public class ThrottleReifier extends ExpressionReifier<ThrottleDefinition> { boolean reject = parseBoolean(definition.getRejectExecution(), false); Throttler answer = new Throttler( - camelContext, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation); + camelContext, maxRequestsExpression, threadPool, shutdownThreadPool, reject, correlation); answer.setAsyncDelayed(async); // should be true by default diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java index 56efee27f12..1af2943081c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java @@ -50,7 +50,7 @@ public class ThrottlerAsyncDelayedCallerRunsTest extends ContextTestSupport { builder.maxQueueSize(2); context.getExecutorServiceManager().registerThreadPoolProfile(builder.build()); - from("seda:start").throttle(1).timePeriodMillis(100).asyncDelayed().executorService("myThrottler") + from("seda:start").throttle(1).delay(100).asyncDelayed().executorService("myThrottler") .callerRunsWhenRejected(true).to("mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java index ef14ff4b3e8..71adf7ea555 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java @@ -67,10 +67,10 @@ public class ThrottlerAsyncDelayedTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { // START SNIPPET: ex - from("seda:a").throttle(3).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result", "mock:result"); + from("seda:a").throttle(3).delay(INTERVAL).asyncDelayed().to("log:result", "mock:result"); // END SNIPPET: ex - from("direct:a").throttle(3).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result", "mock:result"); + from("direct:a").throttle(3).delay(INTERVAL).asyncDelayed().to("log:result", "mock:result"); } }; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java index 2d09d32c026..bb34216469a 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java @@ -49,7 +49,7 @@ public class ThrottlerDslTest extends ContextTestSupport { resultEndpoint.assertIsSatisfied(); // now assert that they have actually been throttled - long minimumTime = (messageCount - 1) * INTERVAL; + long minimumTime = messageCount * INTERVAL; // add a little slack long delta = System.currentTimeMillis() - start + 200; assertTrue(delta >= minimumTime, "Should take at least " + minimumTime + "ms, was: " + delta); @@ -61,7 +61,7 @@ public class ThrottlerDslTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { from("direct:start").throttle().message(m -> m.getHeader("ThrottleCount", Integer.class)) - .timePeriodMillis(INTERVAL).to("log:result", "mock:result"); + .delay(INTERVAL).to("log:result", "mock:result"); } }; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java index 7255f31da86..79289495426 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java @@ -75,7 +75,7 @@ public class ThrottlerMethodCallTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("direct:expressionMethod").throttle(method("myBean", "getMessagesPerInterval")).timePeriodMillis(INTERVAL) + from("direct:expressionMethod").throttle(method("myBean", "getMessagesPerInterval")).delay(INTERVAL) .to("log:result", "mock:result"); } }; diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java index b25f77941f1..37b92903927 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -35,23 +36,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Flaky on Github CI") public class ThrottlerTest extends ContextTestSupport { private static final int INTERVAL = 500; - private static final int TOLERANCE = 50; private static final int MESSAGE_COUNT = 9; - - @Test - public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(3); - resultEndpoint.setResultWaitTime(2000); - - for (int i = 0; i < MESSAGE_COUNT; i++) { - template.sendBody("seda:a", "<message>" + i + "</message>"); - } - - // lets pause to give the requests time to be processed - // to check that the throttle really does kick in - resultEndpoint.assertIsSatisfied(); - } + private static final int CONCURRENT_REQUESTS = 2; + private volatile int curr; + private volatile int max; @Test public void testSendLotsOfMessagesWithRejectExecution() throws Exception { @@ -61,27 +49,29 @@ public class ThrottlerTest extends ContextTestSupport { MockEndpoint errorEndpoint = resolveMandatoryEndpoint("mock:error", MockEndpoint.class); errorEndpoint.expectedMessageCount(4); - for (int i = 0; i < 6; i++) { - template.sendBody("direct:start", "<message>" + i + "</message>"); + ExecutorService executor = Executors.newFixedThreadPool(6); + try { + for (int i = 0; i < 6; i++) { + executor.execute(() -> template.sendBody("direct:start", "<message>payload</message>")); + } + assertMockEndpointsSatisfied(); + } finally { + executor.shutdownNow(); } - - // lets pause to give the requests time to be processed - // to check that the throttle really does kick in - assertMockEndpointsSatisfied(); } @Test public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT, resultEndpoint); - assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT); + sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT, resultEndpoint); + assertTrue(max <= CONCURRENT_REQUESTS); } @Test public void testConfigurationWithConstantExpression() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:expressionConstant", MESSAGE_COUNT, resultEndpoint); - assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT); + sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:expressionConstant", MESSAGE_COUNT, resultEndpoint); + assertTrue(max <= CONCURRENT_REQUESTS); } @Test @@ -91,7 +81,7 @@ public class ThrottlerTest extends ContextTestSupport { ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT); try { - sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, INTERVAL, MESSAGE_COUNT); + sendMessagesWithHeaderExpression(executor, resultEndpoint, CONCURRENT_REQUESTS, INTERVAL, MESSAGE_COUNT); } finally { executor.shutdownNow(); } @@ -102,47 +92,46 @@ public class ThrottlerTest extends ContextTestSupport { ExecutorService executor = Executors.newFixedThreadPool(5); try { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, INTERVAL, MESSAGE_COUNT); - Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the - // first throttle rate does not - // influence the next one. + sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, INTERVAL, MESSAGE_COUNT); + Thread.sleep(INTERVAL); // sleep here to ensure the + // first throttle rate does not + // influence the next one. resultEndpoint.reset(); - sendMessagesWithHeaderExpression(executor, resultEndpoint, 10, INTERVAL, MESSAGE_COUNT); - Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the - // first throttle rate does not - // influence the next one. + sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, INTERVAL, MESSAGE_COUNT); + Thread.sleep(INTERVAL); // sleep here to ensure the + // first throttle rate does not + // influence the next one. resultEndpoint.reset(); - sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, INTERVAL, MESSAGE_COUNT); - Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the - // first throttle rate does not - // influence the next one. + sendMessagesWithHeaderExpression(executor, resultEndpoint, 2, INTERVAL, MESSAGE_COUNT); + Thread.sleep(INTERVAL); // sleep here to ensure the + // first throttle rate does not + // influence the next one. resultEndpoint.reset(); - sendMessagesWithHeaderExpression(executor, resultEndpoint, 10, INTERVAL, MESSAGE_COUNT); + sendMessagesWithHeaderExpression(executor, resultEndpoint, 4, INTERVAL, MESSAGE_COUNT); } finally { executor.shutdownNow(); } } - private void assertThrottlerTiming( - final long elapsedTimeMs, final int throttle, final int intervalMs, final int messageCount) { - // now assert that they have actually been throttled (use +/- 50 as - // slack) - long minimum = calculateMinimum(intervalMs, throttle, messageCount) - 50; - long maximum = calculateMaximum(intervalMs, throttle, messageCount) + 50; - // add 500 in case running on slow CI boxes - maximum += 500; - log.info("Sent {} exchanges in {}ms, with throttle rate of {} per {}ms. Calculated min {}ms and max {}ms", messageCount, - elapsedTimeMs, throttle, intervalMs, minimum, - maximum); - - assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum + "ms, was: " + elapsedTimeMs); - assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most " + maximum + "ms, was: " + elapsedTimeMs); + @Test + public void testFifo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C", "D", "E", "F", "G", "H"); + sendBody("direct:fifo"); + assertMockEndpointsSatisfied(); } - private long sendMessagesAndAwaitDelivery( + @Test + public void testPermitReleaseOnException() throws Exception { + // verify that failed processing releases throttle permit + getMockEndpoint("mock:error").expectedBodiesReceived("A", "B", "C", "D", "E", "F", "G", "H"); + sendBody("direct:release"); + assertMockEndpointsSatisfied(); + } + + private void sendMessagesAndAwaitDelivery( final int messageCount, final String endpointUri, final int threadPoolSize, final MockEndpoint receivingEndpoint) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); @@ -151,7 +140,6 @@ public class ThrottlerTest extends ContextTestSupport { receivingEndpoint.expectedMessageCount(messageCount); } - long start = System.nanoTime(); for (int i = 0; i < messageCount; i++) { executor.execute(new Runnable() { public void run() { @@ -164,7 +152,6 @@ public class ThrottlerTest extends ContextTestSupport { if (receivingEndpoint != null) { receivingEndpoint.assertIsSatisfied(); } - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } finally { executor.shutdownNow(); } @@ -176,6 +163,7 @@ public class ThrottlerTest extends ContextTestSupport { throws InterruptedException { resultEndpoint.expectedMessageCount(messageCount); + max = 0; long start = System.nanoTime(); for (int i = 0; i < messageCount; i++) { executor.execute(new Runnable() { @@ -189,19 +177,12 @@ public class ThrottlerTest extends ContextTestSupport { // let's wait for the exchanges to arrive resultEndpoint.assertIsSatisfied(); long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount); - } - - private long calculateMinimum(final long periodMs, final long throttleRate, final long messageCount) { - if (messageCount % throttleRate > 0) { - return (long) Math.floor((double) messageCount / (double) throttleRate) * periodMs; - } else { - return (long) (Math.floor((double) messageCount / (double) throttleRate) * periodMs) - periodMs; - } + assertTrue(max <= throttle); } - private long calculateMaximum(final long periodMs, final long throttleRate, final long messageCount) { - return ((long) Math.ceil((double) messageCount / (double) throttleRate)) * periodMs; + private void sendBody(String endpoint) { + Arrays.stream(new String[] { "A", "B", "C", "D", "E", "F", "G", "H" }) + .forEach(b -> template.sendBody(endpoint, b)); } @Override @@ -211,21 +192,44 @@ public class ThrottlerTest extends ContextTestSupport { onException(ThrottlerRejectedExecutionException.class).handled(true).to("mock:error"); - // START SNIPPET: ex - from("seda:a").throttle(3).timePeriodMillis(1000).to("log:result", "mock:result"); - // END SNIPPET: ex - - from("direct:a").throttle(5).timePeriodMillis(INTERVAL).to("log:result", "mock:result"); - - from("direct:expressionConstant").throttle(constant(5)).timePeriodMillis(INTERVAL).to("log:result", - "mock:result"); - - from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result", - "mock:result"); - - from("direct:start").throttle(2).timePeriodMillis(1000).rejectExecution(true).to("log:result", "mock:result"); - - from("direct:highThrottleRate").throttle(10000).timePeriodMillis(INTERVAL).to("mock:result"); + from("direct:a").throttle(CONCURRENT_REQUESTS) + .process(exchange -> { + curr++; + }) + .delay(INTERVAL) + .process(exchange -> { + max = Math.max(max, curr--); + }) + .to("log:result", "mock:result"); + + from("direct:expressionConstant").throttle(constant(CONCURRENT_REQUESTS)) + .process(exchange -> { + curr++; + }) + .delay(INTERVAL) + .process(exchange -> { + max = Math.max(max, curr--); + }) + .to("log:result", "mock:result"); + + from("direct:expressionHeader").throttle(header("throttleValue")) + .process(exchange -> { + curr++; + }) + .delay(INTERVAL) + .process(exchange -> { + max = Math.max(max, curr--); + }) + .to("log:result", "mock:result"); + + from("direct:start").throttle(2).rejectExecution(true).delay(1000).to("log:result", "mock:result"); + + from("direct:fifo").throttle(1).delay(100).to("mock:result"); + + from("direct:release").errorHandler(deadLetterChannel("mock:error")).throttle(1).delay(100) + .process(exchange -> { + throw new RuntimeException(); + }).to("mock:result"); } }; } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java index d030cffb74e..3922f069748 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java @@ -18,9 +18,10 @@ package org.apache.camel.processor; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; @@ -33,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Isolated public class ThrottlingGroupingTest extends ContextTestSupport { private static final int INTERVAL = 500; - private static final int MESSAGE_COUNT = 9; - private static final int TOLERANCE = 50; + private static final int MESSAGE_COUNT = 20; + private static final int CONCURRENT_REQUESTS = 2; + private Map<String, AtomicInteger> curr; + private static int max; @Test public void testGroupingWithSingleConstant() throws Exception { @@ -73,64 +76,35 @@ public class ThrottlingGroupingTest extends ContextTestSupport { } @Test - public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() throws Exception { - + public void testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() throws Exception { MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class); - resultEndpoint.expectedMessageCount(3); - resultEndpoint.setResultWaitTime(2000); - - Map<String, Object> headers = new HashMap<>(); - for (int i = 0; i < 9; i++) { - if (i % 2 == 0) { - headers.put("key", "1"); - } else { - headers.put("key", "2"); - } - template.sendBodyAndHeaders("seda:ga", "<message>" + i + "</message>", headers); - } - - // lets pause to give the requests time to be processed - // to check that the throttle really does kick in - resultEndpoint.assertIsSatisfied(); + sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:ga", CONCURRENT_REQUESTS, resultEndpoint); } - private void assertThrottlerTiming( - final long elapsedTimeMs, final int throttle, final int intervalMs, final int messageCount) { - // now assert that they have actually been throttled (use +/- 50 as - // slack) - long minimum = calculateMinimum(intervalMs, throttle, messageCount) - 50; - long maximum = calculateMaximum(intervalMs, throttle, messageCount) + 50; - // add 500 in case running on slow CI boxes - maximum += 500; - log.info("Sent {} exchanges in {}ms, with throttle rate of {} per {}ms. Calculated min {}ms and max {}ms", messageCount, - elapsedTimeMs, throttle, intervalMs, minimum, - maximum); - - assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum + "ms, was: " + elapsedTimeMs); - assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most " + maximum + "ms, was: " + elapsedTimeMs); - } - - private long sendMessagesAndAwaitDelivery( - final int messageCount, final String endpointUri, final int threadPoolSize, final MockEndpoint receivingEndpoint) + private void sendMessagesAndAwaitDelivery( + final int messageCount, final String endpointUri, final int throttle, final MockEndpoint receivingEndpoint) throws InterruptedException { - ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); + max = 0; + curr = new ConcurrentHashMap<>(); + // two throttle groups + curr.putIfAbsent("1", new AtomicInteger(0)); + curr.putIfAbsent("2", new AtomicInteger(0)); + ExecutorService executor = Executors.newFixedThreadPool(messageCount); try { if (receivingEndpoint != null) { receivingEndpoint.expectedMessageCount(messageCount); } - long start = System.nanoTime(); for (int i = 0; i < messageCount; i++) { - executor.execute(new Runnable() { - public void run() { - Map<String, Object> headers = new HashMap<>(); - if (messageCount % 2 == 0) { - headers.put("key", "1"); - } else { - headers.put("key", "2"); - } - template.sendBodyAndHeaders(endpointUri, "<message>payload</message>", headers); + int finalI = i; + executor.execute(() -> { + Map<String, Object> headers = new HashMap<>(); + if (finalI % 2 == 0) { + headers.put("key", "1"); + } else { + headers.put("key", "2"); } + template.sendBodyAndHeaders(endpointUri, "<message>payload</message>", headers); }); } @@ -138,17 +112,10 @@ public class ThrottlingGroupingTest extends ContextTestSupport { if (receivingEndpoint != null) { receivingEndpoint.assertIsSatisfied(); } - return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } finally { executor.shutdownNow(); } - } - - @Test - public void testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() throws Exception { - MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class); - long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:ga", MESSAGE_COUNT, resultEndpoint); - assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT); + assertTrue(max <= throttle); } @Test @@ -158,50 +125,39 @@ public class ThrottlingGroupingTest extends ContextTestSupport { ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT); try { - sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, INTERVAL, MESSAGE_COUNT); + sendMessagesWithHeaderExpression(executor, resultEndpoint, CONCURRENT_REQUESTS, MESSAGE_COUNT); } finally { executor.shutdownNow(); } } - private long calculateMinimum(final long periodMs, final long throttleRate, final long messageCount) { - if (messageCount % throttleRate > 0) { - return (long) Math.floor((double) messageCount / (double) throttleRate) * periodMs; - } else { - return (long) (Math.floor((double) messageCount / (double) throttleRate) * periodMs) - periodMs; - } - } - - private long calculateMaximum(final long periodMs, final long throttleRate, final long messageCount) { - return ((long) Math.ceil((double) messageCount / (double) throttleRate)) * periodMs; - } - private void sendMessagesWithHeaderExpression( - final ExecutorService executor, final MockEndpoint resultEndpoint, final int throttle, final int intervalMs, - final int messageCount) + final ExecutorService executor, final MockEndpoint resultEndpoint, final int throttle, final int messageCount) throws InterruptedException { resultEndpoint.expectedMessageCount(messageCount); - long start = System.nanoTime(); + max = 0; + curr = new ConcurrentHashMap<>(); + // two throttle groups + curr.putIfAbsent("1", new AtomicInteger(0)); + curr.putIfAbsent("2", new AtomicInteger(0)); for (int i = 0; i < messageCount; i++) { - executor.execute(new Runnable() { - public void run() { - Map<String, Object> headers = new HashMap<>(); - headers.put("throttleValue", throttle); - if (messageCount % 2 == 0) { - headers.put("key", "1"); - } else { - headers.put("key", "2"); - } - template.sendBodyAndHeaders("direct:gexpressionHeader", "<message>payload</message>", headers); + int finalI = i; + executor.execute(() -> { + Map<String, Object> headers = new HashMap<>(); + headers.put("throttleValue", throttle); + if (finalI % 2 == 0) { + headers.put("key", "1"); + } else { + headers.put("key", "2"); } + template.sendBodyAndHeaders("direct:gexpressionHeader", "<message>payload</message>", headers); }); } // let's wait for the exchanges to arrive resultEndpoint.assertIsSatisfied(); - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount); + assertTrue(max <= throttle); } @Override @@ -215,12 +171,24 @@ public class ThrottlingGroupingTest extends ContextTestSupport { from("seda:b").throttle(header("max"), 2).to("mock:result2"); from("seda:c").throttle(header("max")).correlationExpression(header("key")).to("mock:resultdynamic"); - from("seda:ga").throttle(constant(3), header("key")).timePeriodMillis(1000).to("log:gresult", "mock:gresult"); - - from("direct:ga").throttle(constant(5), header("key")).timePeriodMillis(INTERVAL).to("log:gresult", - "mock:gresult"); + from("direct:ga").throttle(constant(CONCURRENT_REQUESTS), header("key")) + .process(exchange -> { + curr.get(exchange.getMessage().getHeader("key")).getAndIncrement(); + }) + .delay(INTERVAL) + .process(exchange -> { + max = Math.max(max, curr.get(exchange.getMessage().getHeader("key")).getAndDecrement()); + }) + .to("log:gresult", "mock:gresult"); - from("direct:gexpressionHeader").throttle(header("throttleValue"), header("key")).timePeriodMillis(INTERVAL) + from("direct:gexpressionHeader").throttle(header("throttleValue"), header("key")) + .process(exchange -> { + curr.get(exchange.getMessage().getHeader("key")).getAndIncrement(); + }) + .delay(INTERVAL) + .process(exchange -> { + max = Math.max(max, curr.get(exchange.getMessage().getHeader("key")).getAndDecrement()); + }) .to("log:gresult", "mock:gresult"); } }; diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java index 9acf6cc149e..95d5452693f 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java @@ -20,17 +20,11 @@ import org.apache.camel.api.management.ManagedAttribute; public interface ManagedThrottlerMBean extends ManagedProcessorMBean { - @ManagedAttribute(description = "Maximum requests per period") - long getMaximumRequestsPerPeriod(); + @ManagedAttribute(description = "Maximum concurrent requests") + long getMaximumConcurrentRequests(); - @ManagedAttribute(description = "Maximum requests per period") - void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod); - - @ManagedAttribute(description = "Time period in millis") - long getTimePeriodMillis(); - - @ManagedAttribute(description = "Time period in millis") - void setTimePeriodMillis(long timePeriodMillis); + @ManagedAttribute(description = "Maximum concurrent requests") + void setMaximumConcurrentRequests(long maximumConcurrentRequests); @ManagedAttribute(description = "Enables asynchronous delay which means the thread will not block while delaying") Boolean isAsyncDelayed(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java index 977d33cfdac..2abb6a14f16 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java @@ -38,23 +38,13 @@ public class ManagedThrottler extends ManagedProcessor implements ManagedThrottl } @Override - public long getMaximumRequestsPerPeriod() { - return throttler.getCurrentMaximumRequestsPerPeriod(); + public long getMaximumConcurrentRequests() { + return throttler.getCurrentMaximumConcurrentRequests(); } @Override - public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) { - throttler.setMaximumRequestsPerPeriodExpression(constant(maximumRequestsPerPeriod)); - } - - @Override - public long getTimePeriodMillis() { - return throttler.getTimePeriodMillis(); - } - - @Override - public void setTimePeriodMillis(long timePeriodMillis) { - throttler.setTimePeriodMillis(timePeriodMillis); + public void setMaximumConcurrentRequests(long maximumConcurrentRequests) { + throttler.setMaximumConcurrentRequestsExpression(constant(maximumConcurrentRequests)); } @Override diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java index 7e754f0b915..d1be4aeaeb0 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java @@ -81,15 +81,13 @@ public class ManagedThrottlerTest extends ManagementTestSupport { Long completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted"); assertEquals(10, completed.longValue()); - Long timePeriod = (Long) mbeanServer.getAttribute(throttlerName, "TimePeriodMillis"); - assertEquals(250, timePeriod.longValue()); - Long total = (Long) mbeanServer.getAttribute(routeName, "TotalProcessingTime"); - assertTrue(total < 1000, "Should take at most 1.0 sec: was " + total); + // 10 * delay (100) + tolerance (200) + assertTrue(total < 1200, "Should take at most 1.2 sec: was " + total); // change the throttler using JMX - mbeanServer.setAttribute(throttlerName, new Attribute("MaximumRequestsPerPeriod", (long) 2)); + mbeanServer.setAttribute(throttlerName, new Attribute("MaximumConcurrentRequests", (long) 2)); // reset the counters mbeanServer.invoke(routeName, "reset", null, null); @@ -99,15 +97,16 @@ public class ManagedThrottlerTest extends ManagementTestSupport { template.sendBody("direct:start", "Message " + i); } - Long period = (Long) mbeanServer.getAttribute(throttlerName, "MaximumRequestsPerPeriod"); - assertNotNull(period); - assertEquals(2, period.longValue()); + Long requests = (Long) mbeanServer.getAttribute(throttlerName, "MaximumConcurrentRequests"); + assertNotNull(requests); + assertEquals(2, requests.longValue()); completed = (Long) mbeanServer.getAttribute(routeName, "ExchangesCompleted"); assertEquals(10, completed.longValue()); total = (Long) mbeanServer.getAttribute(routeName, "TotalProcessingTime"); - assertTrue(total > 1000, "Should be around 1 sec now: was " + total); + // 10 * delay (100) + tolerance (200) + assertTrue(total < 1200, "Should take at most 1.2 sec: was " + total); } @DisabledOnOs(OS.WINDOWS) @@ -269,19 +268,20 @@ public class ManagedThrottlerTest extends ManagementTestSupport { public void configure() throws Exception { from("direct:start").id("route1") .to("log:foo") - .throttle(10).timePeriodMillis(250).id("mythrottler") + .throttle(10).id("mythrottler") + .delay(100) .to("mock:result"); from("seda:throttleCount").id("route2") - .throttle(1).timePeriodMillis(250).id("mythrottler2") + .throttle(1).id("mythrottler2").delay(250) .to("mock:end"); from("seda:throttleCountAsync").id("route3") - .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3") + .throttle(1).asyncDelayed().id("mythrottler3").delay(250) .to("mock:endAsync"); from("seda:throttleCountAsyncException").id("route4") - .throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4") + .throttle(1).asyncDelayed().id("mythrottler4").delay(250) .to("mock:endAsyncException") .process(exchange -> { throw new RuntimeException("Fail me"); @@ -289,21 +289,21 @@ public class ManagedThrottlerTest extends ManagementTestSupport { from("seda:throttleCountRejectExecutionCallerRuns").id("route5") .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end() .throttle(1) - .timePeriodMillis(250) .asyncDelayed() .executorService(badService) .callerRunsWhenRejected(true) .id("mythrottler5") + .delay(250) .to("mock:endAsyncRejectCallerRuns"); from("seda:throttleCountRejectExecution").id("route6") .onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end() .throttle(1) - .timePeriodMillis(250) .asyncDelayed() .executorService(badService) .callerRunsWhenRejected(false) .id("mythrottler6") + .delay(250) .to("mock:endAsyncReject"); } }; diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java index c4557ab90b5..d5ea55476bd 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java @@ -1457,7 +1457,6 @@ public class ModelParser extends BaseParser { case "callerRunsWhenRejected": def.setCallerRunsWhenRejected(val); break; case "executorService": def.setExecutorService(val); break; case "rejectExecution": def.setRejectExecution(val); break; - case "timePeriodMillis": def.setTimePeriodMillis(val); break; default: return processorDefinitionAttributeHandler().accept(def, key, val); } return true; diff --git a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java index ab3770e1d5e..0b1ae97db63 100644 --- a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java +++ b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java @@ -2335,7 +2335,6 @@ public class ModelWriter extends BaseWriter { throws IOException { startElement(name); doWriteProcessorDefinitionAttributes(def); - doWriteAttribute("timePeriodMillis", def.getTimePeriodMillis()); doWriteAttribute("rejectExecution", def.getRejectExecution()); doWriteAttribute("callerRunsWhenRejected", def.getCallerRunsWhenRejected()); doWriteAttribute("executorService", def.getExecutorService()); diff --git a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java index 06bd9313014..f9de1b056d8 100644 --- a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java +++ b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java @@ -2335,7 +2335,6 @@ public class ModelWriter extends BaseWriter { throws IOException { startElement(name); doWriteProcessorDefinitionAttributes(def); - doWriteAttribute("timePeriodMillis", def.getTimePeriodMillis()); doWriteAttribute("rejectExecution", def.getRejectExecution()); doWriteAttribute("callerRunsWhenRejected", def.getCallerRunsWhenRejected()); doWriteAttribute("executorService", def.getExecutorService()); diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc index 931c33a16af..f97b3b25b7c 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc @@ -18,4 +18,12 @@ When using the `SyncCommitManager` then the offset will be committed so that the the behavior described in the documentation. When using the `AsyncCommitManager` then the offset will be committed so that the payload is continually retried. This was -the behavior described in the documentation. \ No newline at end of file +the behavior described in the documentation. + +=== throttle + +Throttle now uses the number of concurrent requests as the throttling measure instead of the number of requests +per period. + +The `throttle` parameter now specifies the maximum number of concurrent requests, +and there is no longer support for the `timePeriodMillis` option. diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java index e6bc1336d0d..8fec7a9903c 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java @@ -17238,8 +17238,7 @@ public final class ModelDeserializers extends YamlDeserializerSupport { @YamlProperty(name = "expression", type = "object:org.apache.camel.model.language.ExpressionDefinition", description = "Expression to configure the maximum number of messages to throttle per request", displayName = "Expression", oneOf = "expression"), @YamlProperty(name = "id", type = "string", description = "Sets the id of this node", displayName = "Id"), @YamlProperty(name = "inherit-error-handler", type = "boolean"), - @YamlProperty(name = "reject-execution", type = "boolean", description = "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false", displayName = "Reject Execution"), - @YamlProperty(name = "time-period-millis", type = "string", defaultValue = "1000", description = "Sets the time period during which the maximum request count is valid for", displayName = "Time Period Millis") + @YamlProperty(name = "reject-execution", type = "boolean", description = "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false", displayName = "Reject Execution") } ) public static class ThrottleDefinitionDeserializer extends YamlDeserializerBase<ThrottleDefinition> { @@ -17296,11 +17295,6 @@ public final class ModelDeserializers extends YamlDeserializerSupport { target.setRejectExecution(val); break; } - case "time-period-millis": { - String val = asText(node); - target.setTimePeriodMillis(val); - break; - } case "id": { String val = asText(node); target.setId(val); diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json index 82cfe6f5ee7..da067a148aa 100644 --- a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json @@ -6538,12 +6538,6 @@ "title" : "Reject Execution", "description" : "Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false" }, - "timePeriodMillis" : { - "type" : "string", - "title" : "Time Period Millis", - "description" : "Sets the time period during which the maximum request count is valid for", - "default" : "1000" - }, "constant" : { }, "csimple" : { }, "datasonnet" : { }, diff --git a/etc/eclipse/camel_xml_templates.xml b/etc/eclipse/camel_xml_templates.xml index 6af5acce144..e283c357d9e 100644 --- a/etc/eclipse/camel_xml_templates.xml +++ b/etc/eclipse/camel_xml_templates.xml @@ -104,7 +104,7 @@ </route> </template><template autoinsert="true" context="xml_all" deleted="false" description="Creates a Throttler" enabled="true" name="camel_throttler"><route> <from uri="from_uri" /> - <throttle maximumRequestsPerPeriod="number_of_messages" timePeriodMillis="milliseconds"> + <throttle maximumConcurrentRequests="number_of_messages"> <to uri="to_uri" /> </throttle> </route>