Added unit tests, usage of headers, and other imrpovements to Hystrix component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49678cfb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49678cfb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49678cfb Branch: refs/heads/jdk8-lambdas Commit: 49678cfbc040ed8da6214d5d46f4c0ba69649aff Parents: fe66ce3 Author: bibryam <bibr...@apache.org> Authored: Sat Mar 26 18:59:54 2016 +0000 Committer: bibryam <bibr...@apache.org> Committed: Sat Mar 26 18:59:54 2016 +0000 ---------------------------------------------------------------------- .../component/hystrix/CamelHystrixCommand.java | 22 +- .../component/hystrix/HystrixConfiguration.java | 58 +++-- .../component/hystrix/HystrixConstants.java | 58 ++++- .../component/hystrix/HystrixProducer.java | 255 +++++++++++++------ .../component/hystrix/HystrixComponentBase.java | 50 ++++ .../hystrix/HystrixComponentCacheTest.java | 84 ++++++ .../HystrixComponentCircuitBreakerTest.java | 30 +-- .../hystrix/HystrixComponentFallbackTest.java | 98 +++++++ .../HystrixComponentRequestContextTest.java | 25 +- .../component/hystrix/HystrixComponentTest.java | 154 ----------- .../hystrix/HystrixComponentTimeOutTest.java | 98 +++++++ 11 files changed, 623 insertions(+), 309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/CamelHystrixCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/CamelHystrixCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/CamelHystrixCommand.java index b728d37..184f036 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/CamelHystrixCommand.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/CamelHystrixCommand.java @@ -47,9 +47,17 @@ public class CamelHystrixCommand extends HystrixCommand<Exchange> { } try { Endpoint endpoint = findEndpoint(fallbackEndpointId); + if (exchange.getException() != null) { + Exception exception = exchange.getException(); + exchange.setException(null); + if (exception instanceof InterruptedException) { + exchange.removeProperty(Exchange.ROUTE_STOP); + } + } + endpoint.createProducer().process(exchange); - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); + } catch (Exception exception) { + throw new RuntimeException(exception.getMessage()); } return exchange; } @@ -59,14 +67,20 @@ public class CamelHystrixCommand extends HystrixCommand<Exchange> { try { Endpoint endpoint = findEndpoint(runEndpointId); endpoint.createProducer().process(exchange); - } catch (Exception e) { + } catch (Exception exception) { exchange.setException(null); - throw new RuntimeException(e.getMessage()); + if (exception instanceof InterruptedException) { + exchange.removeProperty(Exchange.ROUTE_STOP); + } + throw new RuntimeException(exception.getMessage()); } if (exchange.getException() != null) { Exception exception = exchange.getException(); exchange.setException(null); + if (exception instanceof InterruptedException) { + exchange.removeProperty(Exchange.ROUTE_STOP); + } throw new RuntimeException(exception.getMessage()); } return exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java index 8e0ee8e..157b5ac 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java @@ -52,10 +52,10 @@ public class HystrixConfiguration { /** - * Specifies the propagateRequestContext to use + * Specifies the initializeRequestContext to use */ @UriParam(label = "producer") - private Boolean propagateRequestContext; + private Boolean initializeRequestContext; /** * Specifies the endpoint to use @@ -69,12 +69,12 @@ public class HystrixConfiguration { @UriParam(label = "producer") private String fallbackEndpointId; - private Integer coreSize; - private Integer keepAliveTimeMinutes; + private Integer corePoolSize; + private Integer keepAliveTime; private Integer maxQueueSize; private Integer queueSizeRejectionThreshold; - private Integer rollingStatisticalWindowInMilliseconds; - private Integer rollingStatisticalWindowBuckets; + private Integer threadPoolRollingNumberStatisticalWindowInMilliseconds; + private Integer threadPoolRollingNumberStatisticalWindowBuckets; private Boolean circuitBreakerEnabled; private Integer circuitBreakerErrorThresholdPercentage; @@ -83,6 +83,11 @@ public class HystrixConfiguration { private Integer circuitBreakerRequestVolumeThreshold; private Integer circuitBreakerSleepWindowInMilliseconds; private Integer executionIsolationSemaphoreMaxConcurrentRequests; + + /** + * Specifies the isolation strategy (thread or semaphore) to use + */ + @UriParam(label = "producer", defaultValue = "THREAD") private String executionIsolationStrategy; private Boolean executionIsolationThreadInterruptOnTimeout; private Integer executionTimeoutInMilliseconds; @@ -94,7 +99,6 @@ public class HystrixConfiguration { private Boolean metricsRollingPercentileEnabled; private Integer metricsRollingPercentileWindowInMilliseconds; private Integer metricsRollingPercentileWindowBuckets; - /* null means it hasn't been overridden */ private Integer metricsRollingStatisticalWindowInMilliseconds; private Integer metricsRollingStatisticalWindowBuckets; private Boolean requestCacheEnabled; @@ -125,12 +129,12 @@ public class HystrixConfiguration { } - public Boolean getPropagateRequestContext() { - return propagateRequestContext; + public Boolean getInitializeRequestContext() { + return initializeRequestContext; } - public void setPropagateRequestContext(Boolean propagateRequestContext) { - this.propagateRequestContext = propagateRequestContext; + public void setInitializeRequestContext(Boolean initializeRequestContext) { + this.initializeRequestContext = initializeRequestContext; } public String getGroupKey() { @@ -157,20 +161,20 @@ public class HystrixConfiguration { this.threadPoolKey = threadPoolKey; } - public Integer getCoreSize() { - return coreSize; + public Integer getCorePoolSize() { + return corePoolSize; } - public void setCoreSize(Integer coreSize) { - this.coreSize = coreSize; + public void setCorePoolSize(Integer corePoolSize) { + this.corePoolSize = corePoolSize; } - public Integer getKeepAliveTimeMinutes() { - return keepAliveTimeMinutes; + public Integer getKeepAliveTime() { + return keepAliveTime; } - public void setKeepAliveTimeMinutes(Integer keepAliveTimeMinutes) { - this.keepAliveTimeMinutes = keepAliveTimeMinutes; + public void setKeepAliveTime(Integer keepAliveTime) { + this.keepAliveTime = keepAliveTime; } public Integer getMaxQueueSize() { @@ -189,20 +193,20 @@ public class HystrixConfiguration { this.queueSizeRejectionThreshold = queueSizeRejectionThreshold; } - public Integer getRollingStatisticalWindowInMilliseconds() { - return rollingStatisticalWindowInMilliseconds; + public Integer getThreadPoolRollingNumberStatisticalWindowInMilliseconds() { + return threadPoolRollingNumberStatisticalWindowInMilliseconds; } - public void setRollingStatisticalWindowInMilliseconds(Integer rollingStatisticalWindowInMilliseconds) { - this.rollingStatisticalWindowInMilliseconds = rollingStatisticalWindowInMilliseconds; + public void setThreadPoolRollingNumberStatisticalWindowInMilliseconds(Integer threadPoolRollingNumberStatisticalWindowInMilliseconds) { + this.threadPoolRollingNumberStatisticalWindowInMilliseconds = threadPoolRollingNumberStatisticalWindowInMilliseconds; } - public Integer getRollingStatisticalWindowBuckets() { - return rollingStatisticalWindowBuckets; + public Integer getThreadPoolRollingNumberStatisticalWindowBuckets() { + return threadPoolRollingNumberStatisticalWindowBuckets; } - public void setRollingStatisticalWindowBuckets(Integer rollingStatisticalWindowBuckets) { - this.rollingStatisticalWindowBuckets = rollingStatisticalWindowBuckets; + public void setThreadPoolRollingNumberStatisticalWindowBuckets(Integer threadPoolRollingNumberStatisticalWindowBuckets) { + this.threadPoolRollingNumberStatisticalWindowBuckets = threadPoolRollingNumberStatisticalWindowBuckets; } public Boolean getCircuitBreakerEnabled() { http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConstants.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConstants.java index 33ba6d4..803635d 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConstants.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConstants.java @@ -17,6 +17,62 @@ package org.apache.camel.component.hystrix; public interface HystrixConstants { - String CAMEL_HYSTRIX_REQUEST_CONTEXT_KEY = "CamelHystrixRequestContextKey"; + + // in message header String CAMEL_HYSTRIX_CLEAR_CACHE_FIRST = "CamelHystrixClearCacheFirst"; + String CAMEL_HYSTRIX_REQUEST_CONTEXT = "CamelHystrixRequestContex"; + String CAMEL_HYSTRIX_GROUP_KEY = "CamelHystrixGroupKey"; + String CAMEL_HYSTRIX_COMMAND_KEY = "CamelHystrixCommandKey"; + String CAMEL_HYSTRIX_THREAD_POOL_KEY = "CamelHystrixThreadPoolKey"; + String CAMEL_HYSTRIX_RUN_ENDPOINT_ID = "CamelHystrixRunEndpointId"; + String CAMEL_HYSTRIX_FALLBACK_ENDPOINT_ID = "CamelHystrixFallbackEndpointId"; + String CAMEL_HYSTRIX_CORE_POOL_SIZE = "CamelHystrixCorePoolSize"; + String CAMEL_HYSTRIX_KEEP_ALIVE_TIME = "CamelHystrixKeepAliveTime"; + String CAMEL_HYSTRIX_MAX_QUEUE_SIZE = "CamelHystrixMaxQueueSize"; + String CAMEL_HYSTRIX_QUEUE_SIZE_REJECTION_THRESHOLD = "CamelHystrixQueueSizeRejectionThreshold"; + String CAMEL_HYSTRIX_THREAD_POOL_ROLLING_NUMBER_STATISTICAL_WINDOW_IN_MILLISECONDS = "CamelHystrixThreadPoolRollingNumberStatisticalWindowInMilliseconds"; + String CAMEL_HYSTRIX_THREAD_POOL_ROLLING_NUMBER_STATISTICAL_WINDOW_BUCKETS = "CamelHystrixThreadPoolRollingNumberStatisticalWindowBuckets"; + String CAMEL_HYSTRIX_CIRCUIT_BREAKER_ENABLED = "CamelHystrixCircuitBreakerEnabled"; + String CAMEL_HYSTRIX_CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE = "CamelHystrixCircuitBreakerErrorThresholdPercentage"; + String CAMEL_HYSTRIX_CIRCUIT_BREAKER_FORCE_CLOSED = "CamelHystrixCircuitBreakerForceClosed"; + String CAMEL_HYSTRIX_CIRCUIT_BREAKER_FORCE_OPEN = "CamelHystrixCircuitBreakerForceOpen"; + String CAMEL_HYSTRIX_CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD = "CamelHystrixCircuitBreakerRequestVolumeThreshold"; + String CAMEL_HYSTRIX_CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS = "CamelHystrixCircuitBreakerSleepWindowInMilliseconds"; + String CAMEL_HYSTRIX_EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS = "CamelHystrixExecutionIsolationSemaphoreMaxConcurrentRequests"; + String CAMEL_HYSTRIX_EXECUTION_ISOLATION_STRATEGY = "CamelHystrixExecutionIsolationStrategy"; + String CAMEL_HYSTRIX_EXECUTION_ISOLATION_THREAD_INTERRUPTION_ON_TIMEOUT = "CamelHystrixExecutionIsolationThreadInterruptOnTimeout"; + String CAMEL_HYSTRIX_EXECUTION_TIMEOUT_IN_MILLISECONDS = "CamelHystrixExecutionTimeoutInMilliseconds"; + String CAMEL_HYSTRIX_EXECUTION_TIMEOUT_ENABLED = "CamelHystrixExecutionTimeoutEnabled"; + String CAMEL_HYSTRIX_FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS = "CamelHystrixFallbackIsolationSemaphoreMaxConcurrentRequests"; + String CAMEL_HYSTRIX_FALLBACK_ENABLED = "CamelHystrixFallbackEnabled"; + String CAMEL_HYSTRIX_METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS = "CamelHystrixMetricsHealthSnapshotIntervalInMilliseconds"; + String CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_BUCKET_SIZE = "CamelHystrixMetricsRollingPercentileBucketSize"; + String CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_ENABLED = "CamelHystrixMetricsRollingPercentileEnabled"; + String CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_WINDOW_IN_MILLISECONDS = "CamelHystrixMetricsRollingPercentileWindowInMilliseconds"; + String CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_WINDOW_BUCKETS = "CamelHystrixMetricsRollingPercentileWindowBuckets"; + String CAMEL_HYSTRIX_METRICS_ROLLING_STATISTICAL_WINDOW_IN_MILLISECONDS = "CamelHystrixMetricsRollingStatisticalWindowInMilliseconds"; + String CAMEL_HYSTRIX_METRICS_ROLLING_STATISTICAL_WINDOW_BUCKETS = "CamelHystrixMetricsRollingStatisticalWindowBuckets"; + String CAMEL_HYSTRIX_REQUEST_CACHE_ENABLED = "CamelHystrixRequestCacheEnabled"; + String CAMEL_HYSTRIX_REQUEST_LOG_ENABLED = "CamelHystrixRequestLogEnabled"; + + //out message headers + String CAMEL_HYSTRIX_COMMAND_METRICS_TOTAL_REQUESTS = "CamelHystrixCommandMetricsTotalRequests"; + String CAMEL_HYSTRIX_COMMAND_METRICS_ERROR_COUNT = "CamelHystrixCommandMetricsErrorCount"; + String CAMEL_HYSTRIX_COMMAND_METRICS_ERROR_PERCENTAGE = "CamelHystrixCommandMetricsErrorPercentage"; + String CAMEL_HYSTRIX_COMMAND_METRICS_CURRENT_CONCURRENT_EXECUTION_COUNT = "CamelHystrixCommandMetricsCurrentConcurrentExecutionCount"; + String CAMEL_HYSTRIX_COMMAND_METRICS_EXECUTION_TIME_MEAN = "CamelHystrixCommandMetricsExecutionTimeMean"; + String CAMEL_HYSTRIX_COMMAND_METRICS_ROLLING_MAX_CONCURRENT_EXECUTIONS = "CamelHystrixCommandMetricsRollingMaxConcurrentExecutions"; + String CAMEL_HYSTRIX_COMMAND_METRICS_TOTAL_TIME_MEAN = "CamelHystrixCommandMetricsTotalTimeMean"; + + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_ACTIVE_COUNT = "CamelHystrixThreadPoolMetricsCurrentActiveCount"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CUMULATIVE_COUNT_THREADS_EXECUTED = "CamelHystrixThreadPoolMetricsCumulativeCountThreadsExecuted"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_COMPLETED_TASK_COUNT = "CamelHystrixThreadPoolMetricsCurrentCompletedTaskCount"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_CORE_POOL_SIZE = "CamelHystrixThreadPoolMetricsCurrentCorePoolSize"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_LARGEST_POOL_SIZE = "CamelHystrixThreadPoolMetricsCurrentLargestPoolSize"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_MAXIMUM_POOL_SIZE = "CamelHystrixThreadPoolMetricsCurrentMaximumPoolSize"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_POOL_SIZE = "CamelHystrixThreadPoolMetricsCurrentPoolSize"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_QUEUE_SIZE = "CamelHystrixThreadPoolMetricsCurrentQueueSize"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_TASK_COUNT = "CamelHystrixThreadPoolMetricsCurrentTaskCount"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_ROLLING_COUNT_THREADS_EXECUTED = "CamelHystrixThreadPoolMetricsRollingCountThreadsExecuted"; + String CAMEL_HYSTRIX_THREAD_POOL_METRICS_ROLLING_MAX_ACTIVE_THREADS = "CamelHystrixThreadPoolMetricsRollingMaxActiveThreads"; } http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProducer.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProducer.java index bafe5c7..851c4cb 100644 --- a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProducer.java +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixProducer.java @@ -19,14 +19,17 @@ package org.apache.camel.component.hystrix; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandProperties; import com.netflix.hystrix.HystrixRequestCache; import com.netflix.hystrix.HystrixThreadPoolKey; +import com.netflix.hystrix.HystrixThreadPoolMetrics; import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.util.ExchangeHelper; /** * The Hystrix producer. @@ -42,148 +45,231 @@ public class HystrixProducer extends DefaultProducer { public void process(final Exchange exchange) throws Exception { HystrixCommand.Setter setter = HystrixCommand.Setter.withGroupKey( - HystrixCommandGroupKey.Factory.asKey(configuration.getGroupKey())); - setCommandPropertiesDefaults(setter); - setThreadPoolPropertiesDefaults(setter); + HystrixCommandGroupKey.Factory.asKey(exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_GROUP_KEY, configuration.getGroupKey(), String.class))); + + setCommandPropertiesDefaults(setter, exchange); + setThreadPoolPropertiesDefaults(setter, exchange); + CamelHystrixCommand camelHystrixCommand = new CamelHystrixCommand(setter, exchange, getCacheKey(exchange), - configuration.getRunEndpointId(), configuration.getFallbackEndpointId()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_RUN_ENDPOINT_ID, configuration.getRunEndpointId(), String.class), + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_FALLBACK_ENDPOINT_ID, configuration.getFallbackEndpointId(), String.class)); checkRequestContextPresent(exchange); clearCache(camelHystrixCommand.getCommandKey(), exchange); camelHystrixCommand.execute(); + populateWithMetrics(exchange, camelHystrixCommand); } - private void setCommandPropertiesDefaults(HystrixCommand.Setter setter) { - if (configuration.getCommandKey() != null) { - setter.andCommandKey(HystrixCommandKey.Factory.asKey(configuration.getCommandKey())); + private void setHeader(Exchange exchange, String key, Object value) { + if (ExchangeHelper.isOutCapable(exchange)) { + exchange.getOut().setHeader(key, value); + } else { + exchange.getIn().setHeader(key, value); } + } + private void setCommandPropertiesDefaults(HystrixCommand.Setter setter, Exchange exchange) { HystrixCommandProperties.Setter commandDefaults = HystrixCommandProperties.Setter(); setter.andCommandPropertiesDefaults(commandDefaults); - if (configuration.getCircuitBreakerEnabled() != null) { - commandDefaults.withCircuitBreakerEnabled(configuration.getCircuitBreakerEnabled()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_COMMAND_KEY, configuration.getCommandKey(), String.class) != null) { + setter.andCommandKey(HystrixCommandKey.Factory.asKey( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_COMMAND_KEY, configuration.getCommandKey(), String.class))); + } + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_ENABLED, configuration.getCircuitBreakerEnabled(), Boolean.class) != null) { + commandDefaults.withCircuitBreakerEnabled( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_ENABLED, configuration.getCircuitBreakerEnabled(), Boolean.class)); } - if (configuration.getCircuitBreakerErrorThresholdPercentage() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, + configuration.getCircuitBreakerErrorThresholdPercentage(), Integer.class) != null) { commandDefaults.withCircuitBreakerErrorThresholdPercentage( - configuration.getCircuitBreakerErrorThresholdPercentage()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE, + configuration.getCircuitBreakerErrorThresholdPercentage(), Integer.class)); } - if (configuration.getCircuitBreakerForceClosed() != null) { - commandDefaults.withCircuitBreakerForceClosed(configuration.getCircuitBreakerForceClosed()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_FORCE_CLOSED, + configuration.getCircuitBreakerForceClosed(), Boolean.class) != null) { + commandDefaults.withCircuitBreakerForceClosed( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_FORCE_CLOSED, + configuration.getCircuitBreakerForceClosed(), Boolean.class)); } - if (configuration.getCircuitBreakerForceOpen() != null) { - commandDefaults.withCircuitBreakerForceOpen(configuration.getCircuitBreakerForceOpen()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_FORCE_OPEN, + configuration.getCircuitBreakerForceOpen(), Boolean.class) != null) { + commandDefaults.withCircuitBreakerForceOpen( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_FORCE_OPEN, + configuration.getCircuitBreakerForceOpen(), Boolean.class)); } - if (configuration.getCircuitBreakerRequestVolumeThreshold() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, + configuration.getCircuitBreakerRequestVolumeThreshold(), Integer.class) != null) { commandDefaults.withCircuitBreakerRequestVolumeThreshold( - configuration.getCircuitBreakerRequestVolumeThreshold()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_REQUEST_VOLUME_THRESHOLD, + configuration.getCircuitBreakerRequestVolumeThreshold(), Integer.class)); } - if (configuration.getCircuitBreakerSleepWindowInMilliseconds() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, + configuration.getCircuitBreakerSleepWindowInMilliseconds(), Integer.class) != null) { commandDefaults.withCircuitBreakerSleepWindowInMilliseconds( - configuration.getCircuitBreakerSleepWindowInMilliseconds()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CIRCUIT_BREAKER_SLEEP_WINDOW_IN_MILLISECONDS, + configuration.getCircuitBreakerSleepWindowInMilliseconds(), Integer.class)); } - if (configuration.getExecutionIsolationSemaphoreMaxConcurrentRequests() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, + configuration.getExecutionIsolationSemaphoreMaxConcurrentRequests(), Integer.class) != null) { commandDefaults.withExecutionIsolationSemaphoreMaxConcurrentRequests( - configuration.getExecutionIsolationSemaphoreMaxConcurrentRequests()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, + configuration.getExecutionIsolationSemaphoreMaxConcurrentRequests(), Integer.class)); } - if (configuration.getExecutionIsolationStrategy() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_ISOLATION_STRATEGY, + configuration.getExecutionIsolationStrategy(), String.class) != null) { commandDefaults.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.valueOf( - configuration.getExecutionIsolationStrategy())); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_ISOLATION_STRATEGY, + configuration.getExecutionIsolationStrategy(), String.class))); } - if (configuration.getExecutionIsolationThreadInterruptOnTimeout() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_ISOLATION_THREAD_INTERRUPTION_ON_TIMEOUT, + configuration.getExecutionIsolationThreadInterruptOnTimeout(), Boolean.class) != null) { commandDefaults.withExecutionIsolationThreadInterruptOnTimeout( - configuration.getExecutionIsolationThreadInterruptOnTimeout()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_ISOLATION_THREAD_INTERRUPTION_ON_TIMEOUT, + configuration.getExecutionIsolationThreadInterruptOnTimeout(), Boolean.class)); } - if (configuration.getExecutionTimeoutInMilliseconds() != null) { - commandDefaults.withExecutionTimeoutInMilliseconds(configuration.getExecutionTimeoutInMilliseconds()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_TIMEOUT_IN_MILLISECONDS, + configuration.getExecutionTimeoutInMilliseconds(), Integer.class) != null) { + commandDefaults.withExecutionTimeoutInMilliseconds( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_TIMEOUT_IN_MILLISECONDS, + configuration.getExecutionTimeoutInMilliseconds(), Integer.class)); } - if (configuration.getExecutionTimeoutEnabled() != null) { - commandDefaults.withExecutionTimeoutEnabled(configuration.getExecutionTimeoutEnabled()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_TIMEOUT_ENABLED, + configuration.getExecutionTimeoutEnabled(), Boolean.class) != null) { + commandDefaults.withExecutionTimeoutEnabled( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_EXECUTION_TIMEOUT_ENABLED, + configuration.getExecutionTimeoutEnabled(), Boolean.class)); } - if (configuration.getFallbackIsolationSemaphoreMaxConcurrentRequests() != null) { + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, + configuration.getFallbackIsolationSemaphoreMaxConcurrentRequests(), Integer.class) != null) { commandDefaults.withFallbackIsolationSemaphoreMaxConcurrentRequests( - configuration.getFallbackIsolationSemaphoreMaxConcurrentRequests()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_FALLBACK_ISOLATION_SEMAPHORE_MAX_CONCURRENT_REQUESTS, + configuration.getFallbackIsolationSemaphoreMaxConcurrentRequests(), Integer.class)); } - if (configuration.getFallbackEnabled() != null) { - commandDefaults.withFallbackEnabled(configuration.getFallbackEnabled()); - } - if (configuration.getMetricsHealthSnapshotIntervalInMilliseconds() != null) { - commandDefaults.withMetricsHealthSnapshotIntervalInMilliseconds(configuration.getMetricsHealthSnapshotIntervalInMilliseconds()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_FALLBACK_ENABLED, + configuration.getFallbackEnabled(), Boolean.class) != null) { + commandDefaults.withFallbackEnabled( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_FALLBACK_ENABLED, + configuration.getFallbackEnabled(), Boolean.class)); } - if (configuration.getMetricsRollingPercentileBucketSize() != null) { - commandDefaults.withMetricsRollingPercentileBucketSize(configuration.getMetricsRollingPercentileBucketSize()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS, + configuration.getMetricsHealthSnapshotIntervalInMilliseconds(), Integer.class) != null) { + commandDefaults.withMetricsHealthSnapshotIntervalInMilliseconds( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_HEALTH_SNAPSHOT_INTERVAL_IN_MILLISECONDS, + configuration.getMetricsHealthSnapshotIntervalInMilliseconds(), Integer.class)); } - if (configuration.getMetricsRollingPercentileEnabled() != null) { - commandDefaults.withMetricsRollingPercentileEnabled(configuration.getMetricsRollingPercentileEnabled()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, + configuration.getMetricsRollingPercentileBucketSize(), Integer.class) != null) { + commandDefaults.withMetricsRollingPercentileBucketSize( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_BUCKET_SIZE, + configuration.getMetricsRollingPercentileBucketSize(), Integer.class)); } - if (configuration.getMetricsRollingPercentileWindowInMilliseconds() != null) { - commandDefaults.withMetricsRollingPercentileWindowInMilliseconds(configuration.getMetricsRollingPercentileWindowInMilliseconds()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_ENABLED, + configuration.getMetricsRollingPercentileEnabled(), Boolean.class) != null) { + commandDefaults.withMetricsRollingPercentileEnabled( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_ENABLED, + configuration.getMetricsRollingPercentileEnabled(), Boolean.class)); } - if (configuration.getMetricsRollingPercentileWindowBuckets() != null) { - commandDefaults.withMetricsRollingPercentileWindowBuckets(configuration.getMetricsRollingPercentileWindowBuckets()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_WINDOW_IN_MILLISECONDS, + configuration.getMetricsRollingPercentileWindowInMilliseconds(), Integer.class) != null) { + commandDefaults.withMetricsRollingPercentileWindowInMilliseconds( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_WINDOW_IN_MILLISECONDS, + configuration.getMetricsRollingPercentileWindowInMilliseconds(), Integer.class)); } - if (configuration.getMetricsRollingStatisticalWindowInMilliseconds() != null) { - commandDefaults.withMetricsRollingStatisticalWindowInMilliseconds(configuration.getMetricsRollingStatisticalWindowInMilliseconds()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_WINDOW_BUCKETS, + configuration.getMetricsRollingPercentileWindowBuckets(), Integer.class) != null) { + commandDefaults.withMetricsRollingPercentileWindowBuckets( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_PERCENTILE_WINDOW_BUCKETS, + configuration.getMetricsRollingPercentileWindowBuckets(), Integer.class)); } - if (configuration.getMetricsRollingStatisticalWindowBuckets() != null) { - commandDefaults.withMetricsRollingStatisticalWindowBuckets(configuration.getMetricsRollingStatisticalWindowBuckets()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_STATISTICAL_WINDOW_IN_MILLISECONDS, + configuration.getMetricsRollingStatisticalWindowInMilliseconds(), Integer.class) != null) { + commandDefaults.withMetricsRollingStatisticalWindowInMilliseconds( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_STATISTICAL_WINDOW_IN_MILLISECONDS, + configuration.getMetricsRollingStatisticalWindowInMilliseconds(), Integer.class)); } - if (configuration.getRequestCacheEnabled() != null) { - commandDefaults.withRequestCacheEnabled(configuration.getRequestCacheEnabled()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_STATISTICAL_WINDOW_BUCKETS, + configuration.getMetricsRollingStatisticalWindowBuckets(), Integer.class) != null) { + commandDefaults.withMetricsRollingStatisticalWindowBuckets( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_METRICS_ROLLING_STATISTICAL_WINDOW_BUCKETS, + configuration.getMetricsRollingStatisticalWindowBuckets(), Integer.class)); } - if (configuration.getRequestLogEnabled() != null) { - commandDefaults.withRequestLogEnabled(configuration.getRequestLogEnabled()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CACHE_ENABLED, configuration.getRequestCacheEnabled(), Boolean.class) != null) { + commandDefaults.withRequestCacheEnabled( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CACHE_ENABLED, configuration.getRequestCacheEnabled(), Boolean.class)); } - } - private void setThreadPoolPropertiesDefaults(HystrixCommand.Setter setter) { - if (configuration.getThreadPoolKey() != null) { - setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(configuration.getThreadPoolKey())); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_LOG_ENABLED, configuration.getRequestLogEnabled(), Boolean.class) != null) { + commandDefaults.withRequestLogEnabled( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_LOG_ENABLED, configuration.getRequestLogEnabled(), Boolean.class)); } + } + private void setThreadPoolPropertiesDefaults(HystrixCommand.Setter setter, Exchange exchange) { HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter(); setter.andThreadPoolPropertiesDefaults(threadPoolProperties); - if (configuration.getCoreSize() != null) { - threadPoolProperties.withCoreSize(configuration.getCoreSize()); + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_KEY, configuration.getThreadPoolKey(), String.class) != null) { + setter.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_KEY, configuration.getThreadPoolKey(), String.class))); } - if (configuration.getKeepAliveTimeMinutes() != null) { - threadPoolProperties.withKeepAliveTimeMinutes(configuration.getKeepAliveTimeMinutes()); + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CORE_POOL_SIZE, configuration.getCorePoolSize(), Integer.class) != null) { + threadPoolProperties.withCoreSize( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_CORE_POOL_SIZE, configuration.getCorePoolSize(), Integer.class)); } - if (configuration.getMaxQueueSize() != null) { - threadPoolProperties.withMaxQueueSize(configuration.getMaxQueueSize()); + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_KEEP_ALIVE_TIME, configuration.getKeepAliveTime(), Integer.class) != null) { + threadPoolProperties.withKeepAliveTimeMinutes( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_KEEP_ALIVE_TIME, configuration.getKeepAliveTime(), Integer.class)); + } + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_MAX_QUEUE_SIZE, configuration.getMaxQueueSize(), Integer.class) != null) { + threadPoolProperties.withMaxQueueSize( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_MAX_QUEUE_SIZE, configuration.getMaxQueueSize(), Integer.class)); } - if (configuration.getQueueSizeRejectionThreshold() != null) { - threadPoolProperties.withQueueSizeRejectionThreshold(configuration.getQueueSizeRejectionThreshold()); + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_QUEUE_SIZE_REJECTION_THRESHOLD, + configuration.getQueueSizeRejectionThreshold(), Integer.class) != null) { + threadPoolProperties.withQueueSizeRejectionThreshold( + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_QUEUE_SIZE_REJECTION_THRESHOLD, + configuration.getQueueSizeRejectionThreshold(), Integer.class)); } - if (configuration.getMetricsRollingStatisticalWindowInMilliseconds() != null) { + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_ROLLING_NUMBER_STATISTICAL_WINDOW_IN_MILLISECONDS, + configuration.getThreadPoolRollingNumberStatisticalWindowInMilliseconds(), Integer.class) != null) { threadPoolProperties.withMetricsRollingStatisticalWindowInMilliseconds( - configuration.getMetricsRollingStatisticalWindowInMilliseconds()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_ROLLING_NUMBER_STATISTICAL_WINDOW_IN_MILLISECONDS, + configuration.getThreadPoolRollingNumberStatisticalWindowInMilliseconds(), Integer.class)); } - if (configuration.getMetricsRollingStatisticalWindowBuckets() != null) { + + if (exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_ROLLING_NUMBER_STATISTICAL_WINDOW_BUCKETS, + configuration.getThreadPoolRollingNumberStatisticalWindowBuckets(), Integer.class) != null) { threadPoolProperties.withMetricsRollingStatisticalWindowBuckets( - configuration.getMetricsRollingStatisticalWindowBuckets()); + exchange.getIn().getHeader(HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_ROLLING_NUMBER_STATISTICAL_WINDOW_BUCKETS, + configuration.getThreadPoolRollingNumberStatisticalWindowBuckets(), Integer.class)); } } @@ -195,13 +281,13 @@ public class HystrixProducer extends DefaultProducer { private synchronized void checkRequestContextPresent(Exchange exchange) { if (!HystrixRequestContext.isCurrentThreadInitialized()) { HystrixRequestContext customRequestContext = exchange.getIn() - .getHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT_KEY, HystrixRequestContext.class); + .getHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT, HystrixRequestContext.class); if (customRequestContext != null) { HystrixRequestContext.setContextOnCurrentThread(customRequestContext); - } else { + } else if (requestContext != null) { HystrixRequestContext.setContextOnCurrentThread(requestContext); - exchange.getIn().setHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT_KEY, requestContext); + exchange.getIn().setHeader(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT, requestContext); } } } @@ -214,9 +300,34 @@ public class HystrixProducer extends DefaultProducer { } } + private void populateWithMetrics(Exchange exchange, CamelHystrixCommand camelHystrixCommand) { + HystrixCommandMetrics commandMetrics = HystrixCommandMetrics.getInstance(camelHystrixCommand.getCommandKey()); + HystrixThreadPoolMetrics threadPoolMetrics = HystrixThreadPoolMetrics.getInstance(camelHystrixCommand.getThreadPoolKey()); + + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_TOTAL_REQUESTS, commandMetrics.getHealthCounts().getTotalRequests()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_ERROR_COUNT, commandMetrics.getHealthCounts().getErrorCount()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_ERROR_PERCENTAGE, commandMetrics.getHealthCounts().getErrorPercentage()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_CURRENT_CONCURRENT_EXECUTION_COUNT, commandMetrics.getCurrentConcurrentExecutionCount()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_EXECUTION_TIME_MEAN, commandMetrics.getExecutionTimeMean()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_ROLLING_MAX_CONCURRENT_EXECUTIONS, commandMetrics.getRollingMaxConcurrentExecutions()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_COMMAND_METRICS_TOTAL_TIME_MEAN, commandMetrics.getTotalTimeMean()); + + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_ACTIVE_COUNT, threadPoolMetrics.getCurrentActiveCount()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CUMULATIVE_COUNT_THREADS_EXECUTED, threadPoolMetrics.getCumulativeCountThreadsExecuted()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_COMPLETED_TASK_COUNT, threadPoolMetrics.getCurrentCompletedTaskCount()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_CORE_POOL_SIZE, threadPoolMetrics.getCurrentCorePoolSize()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_LARGEST_POOL_SIZE, threadPoolMetrics.getCurrentLargestPoolSize()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_MAXIMUM_POOL_SIZE, threadPoolMetrics.getCurrentMaximumPoolSize()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_POOL_SIZE, threadPoolMetrics.getCurrentPoolSize()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_QUEUE_SIZE, threadPoolMetrics.getCurrentQueueSize()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_CURRENT_TASK_COUNT, threadPoolMetrics.getCurrentTaskCount()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_ROLLING_COUNT_THREADS_EXECUTED, threadPoolMetrics.getRollingCountThreadsExecuted()); + setHeader(exchange, HystrixConstants.CAMEL_HYSTRIX_THREAD_POOL_METRICS_ROLLING_MAX_ACTIVE_THREADS, threadPoolMetrics.getRollingMaxActiveThreads()); + } + @Override protected void doStart() throws Exception { - if (configuration.getPropagateRequestContext() != null && configuration.getPropagateRequestContext()) { + if (configuration.getInitializeRequestContext() != null && configuration.getInitializeRequestContext()) { requestContext = HystrixRequestContext.initializeContext(); } super.doStart(); http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentBase.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentBase.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentBase.java new file mode 100644 index 0000000..dfa163c --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentBase.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; + +public class HystrixComponentBase extends CamelTestSupport { + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @EndpointInject(uri = "mock:error") + protected MockEndpoint errorEndpoint; + + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + CamelContext context = new DefaultCamelContext(registry); + registry.put("run", context.getEndpoint("direct:run")); + registry.put("fallback", context.getEndpoint("direct:fallback")); + registry.put("headerExpression", ExpressionBuilder.headerExpression("key")); + return context; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCacheTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCacheTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCacheTest.java new file mode 100644 index 0000000..a279ed1 --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCacheTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +public class HystrixComponentCacheTest extends HystrixComponentBase { + + @Test + public void invokesCachedEndpoint() throws Exception { + resultEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("body", "key", "cachedKey"); + template.sendBodyAndHeader("body", "key", "cachedKey"); + + assertMockEndpointsSatisfied(); + + resultEndpoint.expectedMessageCount(2); + template.sendBodyAndHeader("body", "key", "cachedKey"); + template.sendBodyAndHeader("body", "key", "differentCachedKey"); + assertMockEndpointsSatisfied(); + } + + @Test + public void invokesCachedEndpointFromDifferentThread() throws Exception { + resultEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("body", "key", "cachedKey"); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread(new Runnable() { + @Override + public void run() { + template.sendBodyAndHeader("body", "key", "cachedKey"); + latch.countDown(); + } + }).start(); + + latch.await(2, TimeUnit.SECONDS); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + public void configure() { + + from("direct:fallback") + .to("mock:error"); + + from("direct:run") + .to("mock:result"); + + from("direct:start") + .to("hystrix:testKey?runEndpointId=run&fallbackEndpointId=fallback&cacheKeyExpression=#headerExpression&initializeRequestContext=true"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCircuitBreakerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCircuitBreakerTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCircuitBreakerTest.java index ab11518..332bf1c 100644 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCircuitBreakerTest.java +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentCircuitBreakerTest.java @@ -16,30 +16,12 @@ */ package org.apache.camel.component.hystrix; -import org.apache.camel.CamelContext; -import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.Produce; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.SimpleRegistry; -import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HystrixComponentCircuitBreakerTest extends CamelTestSupport { - - @Produce(uri = "direct:start") - protected ProducerTemplate template; - - @EndpointInject(uri = "mock:result") - protected MockEndpoint resultEndpoint; - - @EndpointInject(uri = "mock:error") - protected MockEndpoint errorEndpoint; +public class HystrixComponentCircuitBreakerTest extends HystrixComponentBase { @Test public void circuitBreakerRejectsWhenTresholdReached() throws Exception { @@ -64,16 +46,6 @@ public class HystrixComponentCircuitBreakerTest extends CamelTestSupport { } @Override - protected CamelContext createCamelContext() throws Exception { - SimpleRegistry registry = new SimpleRegistry(); - CamelContext context = new DefaultCamelContext(registry); - registry.put("run", context.getEndpoint("direct:run")); - registry.put("fallback", context.getEndpoint("direct:fallback")); - registry.put("headerExpression", ExpressionBuilder.headerExpression("key")); - return context; - } - - @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentFallbackTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentFallbackTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentFallbackTest.java new file mode 100644 index 0000000..9ef2161 --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentFallbackTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class HystrixComponentFallbackTest extends HystrixComponentBase { + + @Test + public void invokesTargetEndpoint() throws Exception { + resultEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(0); + + template.sendBody("test"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void invokesFallbackEndpointExceptionThrown() throws Exception { + resultEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(1); + resultEndpoint.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new RuntimeException("blow"); + } + }); + + template.sendBody("test"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void invokesFallbackEndpointExceptionSet() throws Exception { + resultEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(1); + resultEndpoint.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new RuntimeException("blow")); + } + }); + + template.sendBody("test"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + public void configure() { + + from("direct:fallback") + .to("mock:error"); + + from("direct:run") + .to("mock:result"); + + from("direct:start") + .to("hystrix:testKey?runEndpointId=run&fallbackEndpointId=fallback&cacheKeyExpression=#headerExpression&initializeRequestContext=true"); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentRequestContextTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentRequestContextTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentRequestContextTest.java index 9286373..7e52c83 100644 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentRequestContextTest.java +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentRequestContextTest.java @@ -34,16 +34,7 @@ import org.apache.camel.impl.SimpleRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; -public class HystrixComponentRequestContextTest extends CamelTestSupport { - - @Produce(uri = "direct:start") - protected ProducerTemplate template; - - @EndpointInject(uri = "mock:result") - protected MockEndpoint resultEndpoint; - - @EndpointInject(uri = "mock:error") - protected MockEndpoint errorEndpoint; +public class HystrixComponentRequestContextTest extends HystrixComponentBase { @Test public void invokesCachedEndpointWithCustomRequestContext() throws Exception { @@ -53,7 +44,7 @@ public class HystrixComponentRequestContextTest extends CamelTestSupport { HystrixRequestContext customContext = HystrixRequestContext.initializeContext(); final Map headers = new HashMap<>(); headers.put("key", "cachedKey"); - headers.put(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT_KEY, customContext); + headers.put(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT, customContext); template.sendBodyAndHeaders("body", headers); @@ -79,7 +70,7 @@ public class HystrixComponentRequestContextTest extends CamelTestSupport { HystrixRequestContext customContext = HystrixRequestContext.initializeContext(); final Map headers = new HashMap<>(); headers.put("key", "cachedKey"); - headers.put(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT_KEY, customContext); + headers.put(HystrixConstants.CAMEL_HYSTRIX_REQUEST_CONTEXT, customContext); template.sendBodyAndHeaders("body", headers); @@ -90,16 +81,6 @@ public class HystrixComponentRequestContextTest extends CamelTestSupport { } @Override - protected CamelContext createCamelContext() throws Exception { - SimpleRegistry registry = new SimpleRegistry(); - CamelContext context = new DefaultCamelContext(registry); - registry.put("run", context.getEndpoint("direct:run")); - registry.put("fallback", context.getEndpoint("direct:fallback")); - registry.put("headerExpression", ExpressionBuilder.headerExpression("key")); - return context; - } - - @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTest.java deleted file mode 100644 index f3b3739..0000000 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.hystrix; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.camel.CamelContext; -import org.apache.camel.EndpointInject; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.Produce; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.ExpressionBuilder; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.SimpleRegistry; -import org.apache.camel.test.junit4.CamelTestSupport; -import org.junit.Test; - -public class HystrixComponentTest extends CamelTestSupport { - - @Produce(uri = "direct:start") - protected ProducerTemplate template; - - @EndpointInject(uri = "mock:result") - protected MockEndpoint resultEndpoint; - - @EndpointInject(uri = "mock:error") - protected MockEndpoint errorEndpoint; - - @Test - public void invokesTargetEndpoint() throws Exception { - resultEndpoint.expectedMessageCount(1); - errorEndpoint.expectedMessageCount(0); - - template.sendBody("test"); - - assertMockEndpointsSatisfied(); - } - - @Test - public void invokesFallbackEndpointExceptionThrown() throws Exception { - resultEndpoint.expectedMessageCount(1); - errorEndpoint.expectedMessageCount(1); - resultEndpoint.whenAnyExchangeReceived(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - throw new RuntimeException("blow"); - } - }); - - template.sendBody("test"); - - assertMockEndpointsSatisfied(); - } - - @Test - public void invokesFallbackEndpointExceptionSet() throws Exception { - resultEndpoint.expectedMessageCount(1); - errorEndpoint.expectedMessageCount(1); - resultEndpoint.whenAnyExchangeReceived(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - exchange.setException(new RuntimeException("blow")); - } - }); - - template.sendBody("test"); - - assertMockEndpointsSatisfied(); - } - - @Test - public void invokesCachedEndpoint() throws Exception { - resultEndpoint.expectedMessageCount(1); - errorEndpoint.expectedMessageCount(0); - - template.sendBodyAndHeader("body", "key", "cachedKey"); - template.sendBodyAndHeader("body", "key", "cachedKey"); - - assertMockEndpointsSatisfied(); - - resultEndpoint.expectedMessageCount(2); - template.sendBodyAndHeader("body", "key", "cachedKey"); - template.sendBodyAndHeader("body", "key", "differentCachedKey"); - assertMockEndpointsSatisfied(); - } - - @Test - public void invokesCachedEndpointFromDifferentThread() throws Exception { - resultEndpoint.expectedMessageCount(1); - errorEndpoint.expectedMessageCount(0); - - template.sendBodyAndHeader("body", "key", "cachedKey"); - - final CountDownLatch latch = new CountDownLatch(1); - new Thread(new Runnable() { - @Override - public void run() { - template.sendBodyAndHeader("body", "key", "cachedKey"); - latch.countDown(); - } - }).start(); - - latch.await(2, TimeUnit.SECONDS); - - assertMockEndpointsSatisfied(); - } - - @Override - protected CamelContext createCamelContext() throws Exception { - SimpleRegistry registry = new SimpleRegistry(); - CamelContext context = new DefaultCamelContext(registry); - registry.put("run", context.getEndpoint("direct:run")); - registry.put("fallback", context.getEndpoint("direct:fallback")); - registry.put("bodyExpression", ExpressionBuilder.headerExpression("key")); - return context; - } - - @Override - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - - public void configure() { - - from("direct:fallback") - .to("mock:error"); - - from("direct:run") - .to("mock:result"); - - from("direct:start") - .to("hystrix:testKey?runEndpointId=run&fallbackEndpointId=fallback&cacheKeyExpression=#bodyExpression&propagateRequestContext=true"); - } - }; - } -} - http://git-wip-us.apache.org/repos/asf/camel/blob/49678cfb/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTimeOutTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTimeOutTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTimeOutTest.java new file mode 100644 index 0000000..f105c08 --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixComponentTimeOutTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import org.apache.camel.CamelContext; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class HystrixComponentTimeOutTest extends CamelTestSupport { + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @EndpointInject(uri = "mock:error") + protected MockEndpoint errorEndpoint; + + @Test + public void slowOperationTimesOutAndFallbacks() throws Exception { + resultEndpoint.expectedMessageCount(0); + errorEndpoint.expectedMessageCount(1); + + template.sendBody("test"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void slowOperationSucceedsWithExtendedTimeout() throws Exception { + resultEndpoint.expectedMessageCount(1); + errorEndpoint.expectedMessageCount(0); + + template.sendBodyAndHeader("test", HystrixConstants.CAMEL_HYSTRIX_EXECUTION_TIMEOUT_IN_MILLISECONDS, Integer.valueOf(700)); + + assertMockEndpointsSatisfied(); + } + + @Override + protected CamelContext createCamelContext() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + CamelContext context = new DefaultCamelContext(registry); + registry.put("run", context.getEndpoint("direct:run")); + registry.put("fallback", context.getEndpoint("direct:fallback")); + registry.put("headerExpression", ExpressionBuilder.headerExpression("key")); + return context; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + + public void configure() { + + from("direct:fallback") + .to("mock:error"); + + from("direct:run") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Thread.sleep(500); //a slow operation + } + }) + .to("mock:result"); + + from("direct:start") + .to("hystrix:testKey?runEndpointId=run&fallbackEndpointId=fallback&executionTimeoutInMilliseconds=100"); + } + }; + } +} +