[ 
https://issues.apache.org/jira/browse/CAMEL-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507310#comment-16507310
 ] 

ASF GitHub Bot commented on CAMEL-6840:
---------------------------------------

onderson closed pull request #2366: CAMEL-6840 make it possible grouped 
throttling
URL: https://github.com/apache/camel/pull/2366
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc 
b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 71da9599589..aa0582b297e 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,11 +6,12 @@ The Throttler Pattern allows you to ensure that a specific 
endpoint does not get
 === Options
 
 // eip options: START
-The Throttle EIP supports 5 options which are listed below:
+The Throttle EIP supports 6 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *correlationExpression* | The expression used to calculate the correlation 
key to use for throttle grouping. The Exchange which has the same correlation 
key is throttled together. |  | NamespaceAware Expression
 | *executorServiceRef* | To use a custom thread pool 
(ScheduledExecutorService) by the throttler. |  | String
 | *timePeriodMillis* | Sets the time period during which the maximum request 
count is valid for | 1000 | Long
 | *asyncDelayed* | Enables asynchronous delay which means the thread will not 
block while delaying. | false | Boolean
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 2e60ec370de..1aa34c4c409 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -142,7 +142,7 @@ public AggregateDefinition(Expression expression) {
         this(ExpressionNodeHelper.toExpressionDefinition(expression));
     }
 
-    public AggregateDefinition(ExpressionDefinition correlationExpression) {
+    private AggregateDefinition(ExpressionDefinition correlationExpression) {
         setExpression(correlationExpression);
 
         ExpressionSubElementDefinition cor = new 
ExpressionSubElementDefinition();
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 005270e313b..8a5fbf11338 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,6 +2284,48 @@ public ThrottleDefinition throttle(Expression 
maximumRequestCount) {
         addOutput(answer);
         return answer;
     }
+
+    /**
+     * <a href="http://camel.apache.org/throttler.html";>Throttler EIP:</a>
+     * Creates a throttler allowing you to ensure that a specific endpoint 
does not get overloaded,
+     * or that we don't exceed an agreed SLA with some external service.
+     * Here another parameter correlationExpressionKey is introduced for the 
functionality which
+     * will throttle based on the key expression to group exchanges. This will 
make key-based throttling
+     * instead of overall throttling.
+     * <p/>
+     * Will default use a time period of 1 second, so setting the 
maximumRequestCount to eg 10
+     * will default ensure at most 10 messages per second.
+     *
+     * @param correlationExpressionKey  is a correlation key that can throttle 
by the given key instead of overall throttling
+     * @param maximumRequestCount  an expression to calculate the maximum 
request count
+     * @return the builder
+     */
+    public ThrottleDefinition throttle(long correlationExpressionKey, 
Expression maximumRequestCount) {
+        ThrottleDefinition answer = new 
ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey),
 maximumRequestCount);
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
+     * <a href="http://camel.apache.org/throttler.html";>Throttler EIP:</a>
+     * Creates a throttler allowing you to ensure that a specific endpoint 
does not get overloaded,
+     * or that we don't exceed an agreed SLA with some external service.
+     * Here another parameter correlationExpressionKey is introduced for the 
functionality which
+     * will throttle based on the key expression to group exchanges. This will 
make key-based throttling
+     * instead of overall throttling.
+     * <p/>
+     * Will default use a time period of 1 second, so setting the 
maximumRequestCount to eg 10
+     * will default ensure at most 10 messages per second.
+     *
+     * @param correlationExpressionKey  is a correlation key as an expression 
that can throttle by the given key instead of overall throttling
+     * @param maximumRequestCount  an expression to calculate the maximum 
request count
+     * @return the builder
+     */
+    public ThrottleDefinition throttle(Expression correlationExpressionKey, 
Expression maximumRequestCount) {
+        ThrottleDefinition answer = new 
ThrottleDefinition(correlationExpressionKey, maximumRequestCount);
+        addOutput(answer);
+        return answer;
+    }
     
     /**
      * <a href="http://camel.apache.org/loop.html";>Loop EIP:</a>
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 613d2b351c5..06ac79ca658 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,6 +21,7 @@
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -43,6 +44,8 @@
 public class ThrottleDefinition extends ExpressionNode implements 
ExecutorServiceAwareDefinition<ThrottleDefinition> {
     // TODO: Camel 3.0 Should not support outputs
 
+    @XmlElement(name = "correlationExpression")
+    private ExpressionSubElementDefinition correlationExpression;
     @XmlTransient
     private ExecutorService executorService;
     @XmlAttribute
@@ -55,7 +58,7 @@
     private Boolean callerRunsWhenRejected;
     @XmlAttribute
     private Boolean rejectExecution;
-    
+
     public ThrottleDefinition() {
     }
 
@@ -63,6 +66,18 @@ public ThrottleDefinition(Expression 
maximumRequestsPerPeriod) {
         super(maximumRequestsPerPeriod);
     }
 
+    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression 
correlationExpression) {
+        
this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), 
correlationExpression);
+    }
+
+    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, 
Expression correlationExpression) {
+        super(maximumRequestsPerPeriod);
+
+        ExpressionSubElementDefinition cor = new 
ExpressionSubElementDefinition();
+        
cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
+        setCorrelationExpression(cor);
+    }
+
     @Override
     public String toString() {
         return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -93,9 +108,14 @@ public Processor createProcessor(RouteContext routeContext) 
throws Exception {
         if (maxRequestsExpression == null) {
             throw new IllegalArgumentException("MaxRequestsPerPeriod 
expression must be provided on " + this);
         }
+        
+        Expression correlation = null;
+        if (correlationExpression != null) {
+            correlation = correlationExpression.createExpression(routeContext);
+        }
 
         boolean reject = getRejectExecution() != null && getRejectExecution();
-        Throttler answer = new Throttler(routeContext.getCamelContext(), 
childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, 
reject);
+        Throttler answer = new Throttler(routeContext.getCamelContext(), 
childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, 
reject, correlation);
 
         answer.setAsyncDelayed(async);
         if (getCallerRunsWhenRejected() == null) {
@@ -104,6 +124,7 @@ public Processor createProcessor(RouteContext routeContext) 
throws Exception {
         } else {
             answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
         }
+
         return answer;
     }
 
@@ -256,4 +277,16 @@ public Boolean getRejectExecution() {
     public void setRejectExecution(Boolean rejectExecution) {
         this.rejectExecution = rejectExecution;
     }
+
+    /**
+     * The expression used to calculate the correlation key to use for 
throttle grouping.
+     * The Exchange which has the same correlation key is throttled together.
+     */
+    public void setCorrelationExpression(ExpressionSubElementDefinition 
correlationExpression) {
+        this.correlationExpression = correlationExpression;
+    }
+
+    public ExpressionSubElementDefinition getCorrelationExpression() {
+        return correlationExpression;
+    }
 }
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java 
b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 543ec9a9cb0..73d53f09b4a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,8 +16,11 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -31,7 +34,11 @@
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,12 +68,14 @@
 
     private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = 
"CamelThrottlerExchangeQueuedTimestamp";
     private static final String PROPERTY_EXCHANGE_STATE = 
"CamelThrottlerExchangeState";
+    // (throttling grouping) defaulted as 1 because there will be only one 
queue which is similar to implementation
+    // when there is no grouping for throttling
+    private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1);
 
     private enum State { SYNC, ASYNC, ASYNC_REJECTED }
 
     private final Logger log = LoggerFactory.getLogger(Throttler.class);
     private final CamelContext camelContext;
-    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
     private final ExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
 
@@ -77,9 +86,14 @@
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
+    private Expression correlationExpression;
+    // below 2 fields added for (throttling grouping)
+    private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
+    private ExecutorService delayQueueCacheExecutorService;
+    
 
     public Throttler(final CamelContext camelContext, final Processor 
processor, final Expression maxRequestsPerPeriodExpression, final long 
timePeriodMillis,
-                     final ExecutorService asyncExecutor, final boolean 
shutdownAsyncExecutor, final boolean rejectExecution) {
+                     final ExecutorService asyncExecutor, final boolean 
shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
         super(processor);
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
@@ -93,6 +107,7 @@ public Throttler(final CamelContext camelContext, final 
Processor processor, fin
         }
         this.timePeriodMillis = timePeriodMillis;
         this.asyncExecutor = asyncExecutor;
+        this.correlationExpression = correlation;
     }
 
     @Override
@@ -111,7 +126,8 @@ public boolean process(final Exchange exchange, final 
AsyncCallback callback) {
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            calculateAndSetMaxRequestsPerPeriod(exchange);
+            calculateAndSetMaxRequestsPerPeriod(exchange, doneSync);
+            DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(exchange, 
doneSync);
             ThrottlePermit permit = delayQueue.poll();
 
             if (permit == null) {
@@ -135,7 +151,7 @@ public boolean process(final Exchange exchange, final 
AsyncCallback callback) {
                     if (log.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    enqueuePermit(permit, exchange);
+                    enqueuePermit(permit, exchange, doneSync);
 
                     if (state == State.ASYNC) {
                         if (log.isTraceEnabled()) {
@@ -147,7 +163,7 @@ public boolean process(final Exchange exchange, final 
AsyncCallback callback) {
                     }
                 }
             } else {
-                enqueuePermit(permit, exchange);
+                enqueuePermit(permit, exchange, doneSync);
 
                 if (state == State.ASYNC) {
                     if (log.isTraceEnabled()) {
@@ -192,6 +208,34 @@ public boolean process(final Exchange exchange, final 
AsyncCallback callback) {
         }
     }
 
+    private DelayQueue<ThrottlePermit> locateDelayQueue(final Exchange 
exchange, final boolean doneSync) throws InterruptedException, 
ExecutionException {
+        Integer key;
+        CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new 
CompletableFuture<>();
+        
+        if (correlationExpression != null) {
+            key = correlationExpression.evaluate(exchange, Integer.class);
+        } else {
+            key = NO_CORRELATION_QUEUE_ID;
+        }
+        
+        if (!doneSync) {
+            delayQueueCacheExecutorService.submit(() -> {
+                futureDelayQueue.complete(findDelayQueue(key));
+            });
+        }
+            
+        return (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);
+    }
+
+    private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
+        DelayQueue<ThrottlePermit> currentDelayQueue = 
delayQueueCache.get(key);
+        if (currentDelayQueue == null) {
+            currentDelayQueue = new DelayQueue<>();
+            delayQueueCache.put(key, currentDelayQueue);
+        }
+        return currentDelayQueue;
+    }
+
     /**
      * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the 
executor rejects the submission
      * and isCallerRunsWhenRejected() is enabled, then this method will 
delegate back to process(), but not
@@ -222,10 +266,12 @@ public void run() {
 
     /**
      * Returns a permit to the DelayQueue, first resetting it's delay to be 
relative to now.
+     * @throws ExecutionException 
+     * @throws InterruptedException 
      */
-    protected void enqueuePermit(final ThrottlePermit permit, final Exchange 
exchange) {
+    protected void enqueuePermit(final ThrottlePermit permit, final Exchange 
exchange, final boolean doneSync) throws InterruptedException, 
ExecutionException {
         permit.setDelayMs(getTimePeriodMillis());
-        delayQueue.put(permit);
+        locateDelayQueue(exchange, doneSync).put(permit);
         // try and incur the least amount of overhead while releasing permits 
back to the queue
         if (log.isTraceEnabled()) {
             log.trace("Permit released, for exchangeId: {}", 
exchange.getExchangeId());
@@ -235,7 +281,7 @@ protected void enqueuePermit(final ThrottlePermit permit, 
final Exchange exchang
     /**
      * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle 
rate up or down.
      */
-    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange 
exchange) throws Exception {
+    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange 
exchange, final boolean doneSync) throws Exception {
         Integer newThrottle = 
maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
 
         if (newThrottle != null && newThrottle < 0) {
@@ -249,6 +295,8 @@ protected void calculateAndSetMaxRequestsPerPeriod(final 
Exchange exchange) thro
 
             if (newThrottle != null) {
                 if (newThrottle != throttleRate) {
+                    // get the queue from the cache
+                    DelayQueue<ThrottlePermit> delayQueue = 
locateDelayQueue(exchange, doneSync);
                     // decrease
                     if (throttleRate > newThrottle) {
                         int delta = throttleRate - newThrottle;
@@ -279,19 +327,62 @@ protected void calculateAndSetMaxRequestsPerPeriod(final 
Exchange exchange) thro
         }
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     protected void doStart() throws Exception {
         if (isAsyncDelayed()) {
             ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
+        if (camelContext != null) {
+            int maxSize = 
CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
+            if (maxSize > 0) {
+                delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, 
false);
+                log.debug("DelayQueues cache size: {}", maxSize);
+            } else {
+                delayQueueCache = LRUCacheFactory.newLRUCache(100);
+                log.debug("Defaulting DelayQueues cache size: {}", 100);
+            }
+        }
+        if (delayQueueCache != null) {
+            ServiceHelper.startService(delayQueueCache);
+        }
+        if (delayQueueCacheExecutorService == null) {
+            String name = getClass().getSimpleName() + 
"-DelayQueueLocatorTask";
+            delayQueueCacheExecutorService = 
createDelayQueueCacheExecutorService(name);
+        }
         super.doStart();
     }
+    
+    /**
+     * Strategy to create the thread pool for locating right DelayQueue from 
the case as a background task
+     *
+     * @param name  the suggested name for the background thread
+     * @return the thread pool
+     */
+    protected synchronized ExecutorService 
createDelayQueueCacheExecutorService(String name) {
+        // use a cached thread pool so we each on-the-fly task has a dedicated 
thread to process completions as they come in
+        return 
camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
+    }
 
+    @SuppressWarnings("rawtypes")
     @Override
     protected void doShutdown() throws Exception {
         if (shutdownAsyncExecutor && asyncExecutor != null) {
             
camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
         }
+        if (delayQueueCacheExecutorService != null) {
+            
camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService);
+        }
+        if (delayQueueCache != null) {
+            ServiceHelper.stopService(delayQueueCache);
+            if (log.isDebugEnabled()) {
+                if (delayQueueCache instanceof LRUCache) {
+                    log.debug("Clearing deleay queues cache[size={}, hits={}, 
misses={}, evicted={}]",
+                            delayQueueCache.size(), ((LRUCache) 
delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), 
((LRUCache) delayQueueCache).getEvicted());
+                }
+            }
+            delayQueueCache.clear();
+        }
         super.doShutdown();
     }
 
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
new file mode 100644
index 00000000000..01cd378cfe8
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version 
+ */
+public class ThrottlingGroupingTest extends ContextTestSupport {
+
+    public void testGroupingWithSingleConstant() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+        template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+        template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+        template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+    
+    public void testGroupingWithDynamicHeaderExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World");
+        getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+        getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello 
Dynamic World", "Bye Dynamic World");
+        
+        Map<String, Object> headers = new HashMap<String, Object>();
+        headers.put("max", null);
+
+        template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
+        
+        headers.put("max", "2");
+        template.sendBodyAndHeaders("seda:a", "Hello World", headers);
+        template.sendBodyAndHeaders("seda:b", "Bye World", headers);
+
+        headers.put("key", "1");
+        template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
+        headers.put("key", "2");
+        template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
+        
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead"));
+
+                from("seda:a").throttle(1, header("max")).to("mock:result");
+                from("seda:b").throttle(2, header("max")).to("mock:result");
+                from("seda:c").throttle(header("key"), 
header("max")).to("mock:resultdynamic");
+            }
+        };
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow Throttler EIP to specify SLA per client/correlated-group
> --------------------------------------------------------------
>
>                 Key: CAMEL-6840
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6840
>             Project: Camel
>          Issue Type: New Feature
>          Components: camel-core, eip
>            Reporter: Christian Posta
>            Priority: Major
>             Fix For: Future
>
>
> Basic idea is to allow throttler to have a predicate to determine whether or 
> not to apply throttling to that exchange. 
> From this Mailing List discussion:
> http://camel.465427.n5.nabble.com/Throttling-by-client-ID-td5741032.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to