[ https://issues.apache.org/jira/browse/CAMEL-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16512393#comment-16512393 ]
ASF GitHub Bot commented on CAMEL-6840: --------------------------------------- onderson closed pull request #2375: CAMEL-6840- New PR for grouped throttling - Java DSL, XML DSL is WIP. URL: https://github.com/apache/camel/pull/2375 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java index 673c13e88fe..6e993b0633f 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java @@ -20,10 +20,10 @@ public interface ManagedThrottlerMBean extends ManagedProcessorMBean { - @ManagedAttribute(description = "Maximum requires per period") + @ManagedAttribute(description = "Maximum requests per period") long getMaximumRequestsPerPeriod(); - @ManagedAttribute(description = "Maximum requires per period") + @ManagedAttribute(description = "Maximum requests per period") void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod); @ManagedAttribute(description = "Time period in millis") diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 005270e313b..0fdd5b6325a 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -2284,6 +2284,48 @@ public ThrottleDefinition throttle(Expression maximumRequestCount) { addOutput(answer); return answer; } + + /** + * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> + * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, + * or that we don't exceed an agreed SLA with some external service. + * Here another parameter correlationExpressionKey is introduced for the functionality which + * will throttle based on the key expression to group exchanges. This will make key-based throttling + * instead of overall throttling. + * <p/> + * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 + * will default ensure at most 10 messages per second. + * + * @param correlationExpressionKey is a correlation key that can throttle by the given key instead of overall throttling + * @param maximumRequestCount an expression to calculate the maximum request count + * @return the builder + */ + public ThrottleDefinition throttle(long correlationExpressionKey, Expression maximumRequestCount) { + ThrottleDefinition answer = new ThrottleDefinition(ExpressionBuilder.constantExpression(correlationExpressionKey), maximumRequestCount); + addOutput(answer); + return answer; + } + + /** + * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> + * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, + * or that we don't exceed an agreed SLA with some external service. + * Here another parameter correlationExpressionKey is introduced for the functionality which + * will throttle based on the key expression to group exchanges. This will make key-based throttling + * instead of overall throttling. + * <p/> + * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10 + * will default ensure at most 10 messages per second. + * + * @param correlationExpressionKey is a correlation key as an expression that can throttle by the given key instead of overall throttling + * @param maximumRequestCount an expression to calculate the maximum request count + * @return the builder + */ + public ThrottleDefinition throttle(Expression correlationExpressionKey, Expression maximumRequestCount) { + ThrottleDefinition answer = new ThrottleDefinition(correlationExpressionKey, maximumRequestCount); + addOutput(answer); + return answer; + } /** * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> @@ -4247,4 +4289,4 @@ public void setOtherAttributes(Map<QName, Object> otherAttributes) { public String getLabel() { return ""; } -} +} \ No newline at end of file diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java index 613d2b351c5..3986de90f6b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java @@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; @@ -43,6 +44,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> { // TODO: Camel 3.0 Should not support outputs + @XmlElement(name = "correlationExpression") + private ExpressionSubElementDefinition correlationExpression; @XmlTransient private ExecutorService executorService; @XmlAttribute @@ -55,7 +58,7 @@ private Boolean callerRunsWhenRejected; @XmlAttribute private Boolean rejectExecution; - + public ThrottleDefinition() { } @@ -63,6 +66,18 @@ public ThrottleDefinition(Expression maximumRequestsPerPeriod) { super(maximumRequestsPerPeriod); } + public ThrottleDefinition(Expression correlationExpression, Expression maximumRequestsPerPeriod) { + this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression); + } + + private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) { + super(maximumRequestsPerPeriod); + + ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); + cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); + setCorrelationExpression(cor); + } + @Override public String toString() { return "Throttle[" + description() + " -> " + getOutputs() + "]"; @@ -93,9 +108,14 @@ public Processor createProcessor(RouteContext routeContext) throws Exception { if (maxRequestsExpression == null) { throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); } + + Expression correlation = null; + if (correlationExpression != null) { + correlation = correlationExpression.createExpression(routeContext); + } boolean reject = getRejectExecution() != null && getRejectExecution(); - Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject); + Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation); answer.setAsyncDelayed(async); if (getCallerRunsWhenRejected() == null) { @@ -104,6 +124,7 @@ public Processor createProcessor(RouteContext routeContext) throws Exception { } else { answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); } + return answer; } @@ -256,4 +277,16 @@ public Boolean getRejectExecution() { public void setRejectExecution(Boolean rejectExecution) { this.rejectExecution = rejectExecution; } -} + + /** + * The expression used to calculate the correlation key to use for throttle grouping. + * The Exchange which has the same correlation key is throttled together. + */ + public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { + this.correlationExpression = correlationExpression; + } + + public ExpressionSubElementDefinition getCorrelationExpression() { + return correlationExpression; + } +} \ No newline at end of file diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java index 543ec9a9cb0..473c3d09697 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java @@ -16,8 +16,13 @@ */ package org.apache.camel.processor; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -31,7 +36,11 @@ import org.apache.camel.Traceable; import org.apache.camel.spi.IdAware; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.CamelContextHelper; +import org.apache.camel.util.LRUCache; +import org.apache.camel.util.LRUCacheFactory; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,25 +70,32 @@ private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp"; private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState"; + // (throttling grouping) defaulted as 1 because there will be only one queue which is similar to implementation + // when there is no grouping for throttling + private static final Integer NO_CORRELATION_QUEUE_ID = new Integer(1); private enum State { SYNC, ASYNC, ASYNC_REJECTED } private final Logger log = LoggerFactory.getLogger(Throttler.class); private final CamelContext camelContext; - private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>(); private final ExecutorService asyncExecutor; private final boolean shutdownAsyncExecutor; private volatile long timePeriodMillis; - private volatile int throttleRate; private String id; private Expression maxRequestsPerPeriodExpression; private boolean rejectExecution; private boolean asyncDelayed; private boolean callerRunsWhenRejected = true; + private Expression correlationExpression; + // below 2 fields added for (throttling grouping) + private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache; + private Map<Integer, Integer> throttleRatesMap = new HashMap<>(); + private ExecutorService delayQueueCacheExecutorService; + public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis, - final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution) { + final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) { super(processor); this.camelContext = camelContext; this.rejectExecution = rejectExecution; @@ -93,6 +109,7 @@ public Throttler(final CamelContext camelContext, final Processor processor, fin } this.timePeriodMillis = timePeriodMillis; this.asyncExecutor = asyncExecutor; + this.correlationExpression = correlation; } @Override @@ -111,13 +128,21 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) { throw new RejectedExecutionException("Run is not allowed"); } - calculateAndSetMaxRequestsPerPeriod(exchange); + Integer key; + if (correlationExpression != null) { + key = correlationExpression.evaluate(exchange, Integer.class); + } else { + key = NO_CORRELATION_QUEUE_ID; + } + + DelayQueue<ThrottlePermit> delayQueue = locateDelayQueue(key); + calculateAndSetMaxRequestsPerPeriod(delayQueue, exchange, key); ThrottlePermit permit = delayQueue.poll(); if (permit == null) { if (isRejectExecution()) { throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of " - + throttleRate + " within " + timePeriodMillis + "ms"); + + throttleRatesMap.get(key) + " within " + timePeriodMillis + "ms"); } else { // delegate to async pool if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) { @@ -135,7 +160,7 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) { if (log.isTraceEnabled()) { elapsed = System.currentTimeMillis() - start; } - enqueuePermit(permit, exchange); + enqueuePermit(permit, exchange, delayQueue); if (state == State.ASYNC) { if (log.isTraceEnabled()) { @@ -147,7 +172,7 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) { } } } else { - enqueuePermit(permit, exchange); + enqueuePermit(permit, exchange, delayQueue); if (state == State.ASYNC) { if (log.isTraceEnabled()) { @@ -192,6 +217,26 @@ public boolean process(final Exchange exchange, final AsyncCallback callback) { } } + private DelayQueue<ThrottlePermit> locateDelayQueue(final Integer key) throws InterruptedException, ExecutionException { + CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>(); + + delayQueueCacheExecutorService.submit(() -> { + futureDelayQueue.complete(findDelayQueue(key)); + }); + DelayQueue<ThrottlePermit> currentQueue = futureDelayQueue.get(); + return currentQueue; + } + + private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) { + DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key); + if (currentDelayQueue == null) { + currentDelayQueue = new DelayQueue<>(); + throttleRatesMap.put(key, 0); + delayQueueCache.put(key, currentDelayQueue); + } + return currentDelayQueue; + } + /** * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not @@ -222,8 +267,10 @@ public void run() { /** * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now. + * @throws ExecutionException + * @throws InterruptedException */ - protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange) { + protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, DelayQueue<ThrottlePermit> delayQueue) throws InterruptedException, ExecutionException { permit.setDelayMs(getTimePeriodMillis()); delayQueue.put(permit); // try and incur the least amount of overhead while releasing permits back to the queue @@ -235,7 +282,7 @@ protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchang /** * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down. */ - protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception { + protected void calculateAndSetMaxRequestsPerPeriod(DelayQueue<ThrottlePermit> delayQueue, final Exchange exchange, final Integer key) throws Exception { Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class); if (newThrottle != null && newThrottle < 0) { @@ -243,12 +290,14 @@ protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) thro } synchronized (this) { + Integer throttleRate = throttleRatesMap.get(key); if (newThrottle == null && throttleRate == 0) { throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange); } if (newThrottle != null) { if (newThrottle != throttleRate) { + // get the queue from the cache // decrease if (throttleRate > newThrottle) { int delta = throttleRate - newThrottle; @@ -273,25 +322,68 @@ protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) thro log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); } } - throttleRate = newThrottle; + throttleRatesMap.put(key, newThrottle); } } } } + @SuppressWarnings("unchecked") @Override protected void doStart() throws Exception { if (isAsyncDelayed()) { ObjectHelper.notNull(asyncExecutor, "executorService", this); } + if (camelContext != null) { + int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext); + if (maxSize > 0) { + delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false); + log.debug("DelayQueues cache size: {}", maxSize); + } else { + delayQueueCache = LRUCacheFactory.newLRUCache(100); + log.debug("Defaulting DelayQueues cache size: {}", 100); + } + } + if (delayQueueCache != null) { + ServiceHelper.startService(delayQueueCache); + } + if (delayQueueCacheExecutorService == null) { + String name = getClass().getSimpleName() + "-DelayQueueLocatorTask"; + delayQueueCacheExecutorService = createDelayQueueCacheExecutorService(name); + } super.doStart(); } + + /** + * Strategy to create the thread pool for locating right DelayQueue from the case as a background task + * + * @param name the suggested name for the background thread + * @return the thread pool + */ + protected synchronized ExecutorService createDelayQueueCacheExecutorService(String name) { + // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in + return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); + } + @SuppressWarnings("rawtypes") @Override protected void doShutdown() throws Exception { if (shutdownAsyncExecutor && asyncExecutor != null) { camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor); } + if (delayQueueCacheExecutorService != null) { + camelContext.getExecutorServiceManager().shutdownNow(delayQueueCacheExecutorService); + } + if (delayQueueCache != null) { + ServiceHelper.stopService(delayQueueCache); + if (log.isDebugEnabled()) { + if (delayQueueCache instanceof LRUCache) { + log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]", + delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted()); + } + } + delayQueueCache.clear(); + } super.doShutdown(); } @@ -365,9 +457,11 @@ public Expression getMaximumRequestsPerPeriodExpression() { /** * Gets the current maximum request per period value. + * If it is grouped throttling applied with correlationExpression + * than the max per period within the group will return */ public int getCurrentMaximumRequestsPerPeriod() { - return throttleRate; + return Collections.max(throttleRatesMap.entrySet(), (entry1, entry2) -> entry1.getValue() - entry2.getValue()).getValue(); } /** @@ -390,4 +484,4 @@ public String toString() { return "Throttler[requests: " + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + " (ms) to: " + getProcessor() + "]"; } -} +} \ No newline at end of file diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java new file mode 100644 index 00000000000..1886a8ebf20 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version + */ +public class ThrottlingGroupingTest extends ContextTestSupport { + + public void testGroupingWithSingleConstant() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom"); + + template.sendBodyAndHeader("seda:a", "Kaboom", "max", null); + template.sendBodyAndHeader("seda:a", "Hello World", "max", 2); + template.sendBodyAndHeader("seda:a", "Bye World", "max", 2); + + assertMockEndpointsSatisfied(); + } + + public void testGroupingWithDynamicHeaderExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result2").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom", "Saloon"); + getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello Dynamic World", "Bye Dynamic World"); + + Map<String, Object> headers = new HashMap<String, Object>(); + + template.sendBodyAndHeaders("seda:a", "Kaboom", headers); + template.sendBodyAndHeaders("seda:a", "Saloon", headers); + + headers.put("max", "2"); + template.sendBodyAndHeaders("seda:a", "Hello World", headers); + template.sendBodyAndHeaders("seda:b", "Bye World", headers); + headers.put("max", "2"); + headers.put("key", "1"); + template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers); + headers.put("key", "2"); + template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead")); + + from("seda:a").throttle(1, header("max")).to("mock:result"); + from("seda:b").throttle(2, header("max")).to("mock:result2"); + from("seda:c").throttle(header("key"), header("max")).timePeriodMillis(2000).to("mock:resultdynamic"); + } + }; + } +} \ No newline at end of file diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java index 3ba12bf138a..95705c7a83d 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java @@ -191,7 +191,7 @@ public void done(boolean doneSync) { } }); - } else if (continuation.isResumed()) { + } else if (!continuation.isTimeout() && continuation.isResumed()) { org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); try { setResponseBack(cxfExchange, camelExchange); @@ -199,7 +199,8 @@ public void done(boolean doneSync) { CxfConsumer.this.doneUoW(camelExchange); throw ex; } - } else if (!continuation.isResumed() && !continuation.isPending()) { + + } else if (continuation.isTimeout() || (!continuation.isResumed() && !continuation.isPending())) { org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); try { if (!continuation.isPending()) { diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java index 29d9fa3eb90..fef3818f43a 100644 --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java @@ -104,7 +104,7 @@ public void done(boolean doneSync) { }); return null; } - if (continuation.isResumed()) { + if (!continuation.isTimeout() && continuation.isResumed()) { cxfExchange.put(SUSPENED, Boolean.FALSE); org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); try { @@ -114,7 +114,7 @@ public void done(boolean doneSync) { throw ex; } } else { - if (!continuation.isPending()) { + if (continuation.isTimeout() || !continuation.isPending()) { cxfExchange.put(SUSPENED, Boolean.FALSE); org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject(); camelExchange.setException(new ExchangeTimedOutException(camelExchange, endpoint.getContinuationTimeout())); diff --git a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java index cfe31bde6e5..d628501ddb3 100644 --- a/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java +++ b/components/camel-cxf/src/test/java/org/apache/camel/component/cxf/CxfConsumerContinuationTimeoutTest.java @@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.util.AsyncProcessorHelper; -import org.junit.Ignore; import org.junit.Test; public class CxfConsumerContinuationTimeoutTest extends CamelTestSupport { @@ -114,7 +113,6 @@ public void testNoTimeout() throws Exception { } @Test - @Ignore("CAMEL-12104") public void testTimeout() throws Exception { String out = template.requestBodyAndHeader("direct:start", "Bye World", "priority", "slow", String.class); assertTrue(out.contains("The OUT message was not received within: 5000 millis.")); diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java new file mode 100644 index 00000000000..a0a7a445110 --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.ThrottlingGroupingTest; +import org.junit.Ignore; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +@Ignore +public class SpringThrottlerGroupingTest extends ThrottlingGroupingTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, + "org/apache/camel/spring/processor/ThrottlerGroupingTest.xml"); + } +} diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml new file mode 100644 index 00000000000..ab74a3d9820 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <bean id="myBean" class="org.apache.camel.spring.processor.SpringThrottlerMethodCallTest"/> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <errorHandler id="dlc" deadLetterUri="mock:dead" type="DeadLetterChannel"/> + <route errorHandlerRef="dlc"> + <from uri="seda:a"/> + <throttle timePeriodMillis="1000"> + <correlationExpression> + <constant>1</constant> + </correlationExpression> + <header>max</header> + <to uri="log:result"/> + <to uri="mock:result"/> + </throttle> + </route> + + <route errorHandlerRef="dlc"> + <from uri="seda:b"/> + <throttle timePeriodMillis="1000"> + <correlationExpression> + <constant>2</constant> + </correlationExpression> + <header>max</header> + <to uri="log:result"/> + <to uri="mock:result"/> + </throttle> + </route> + + <route errorHandlerRef="dlc"> + <from uri="seda:a"/> + <throttle timePeriodMillis="1000"> + <correlationExpression> + <header>key</header> + </correlationExpression> + <header>max</header> + <to uri="log:result"/> + <to uri="mock:resultdynamic"/> + </throttle> + </route> + + </camelContext> + +</beans> diff --git a/parent/pom.xml b/parent/pom.xml index 08ecae38360..d9e6c0e6a2d 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -542,7 +542,7 @@ <openhft-affinity-version>3.1.7</openhft-affinity-version> <openhft-compiler-version>2.3.0</openhft-compiler-version> <openhft-lang-version>6.8.2</openhft-lang-version> - <openjpa-version>2.4.2</openjpa-version> + <openjpa-version>2.4.3</openjpa-version> <openshift-java-client-version>2.7.0.Final</openshift-java-client-version> <openstack4j-version>3.0.2</openstack4j-version> <openstack4j-guava-version>17.0</openstack4j-guava-version> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow Throttler EIP to specify SLA per client/correlated-group > -------------------------------------------------------------- > > Key: CAMEL-6840 > URL: https://issues.apache.org/jira/browse/CAMEL-6840 > Project: Camel > Issue Type: New Feature > Components: camel-core, eip > Reporter: Christian Posta > Assignee: Önder Sezgin > Priority: Major > Fix For: 2.22.0 > > > Basic idea is to allow throttler to have a predicate to determine whether or > not to apply throttling to that exchange. > From this Mailing List discussion: > http://camel.465427.n5.nabble.com/Throttling-by-client-ID-td5741032.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)