Repository: camel Updated Branches: refs/heads/feature/camel-hystrix [created] 07f38b3b2
CAMEL-9098 camel-hystrix initial implementation (work in progress). Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/07f38b3b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/07f38b3b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/07f38b3b Branch: refs/heads/feature/camel-hystrix Commit: 07f38b3b2bbaf0ab96e18697bcf1d7f30b24c26b Parents: cdbdfb5 Author: Raul Kripalani <ra...@apache.org> Authored: Sun Aug 23 23:36:10 2015 +0100 Committer: Raul Kripalani <ra...@apache.org> Committed: Tue Aug 25 01:59:25 2015 +0100 ---------------------------------------------------------------------- components/camel-hystrix/pom.xml | 90 ++++++ .../component/hystrix/HystrixComponent.java | 41 +++ .../component/hystrix/HystrixConfiguration.java | 287 +++++++++++++++++++ .../hystrix/HystrixDelegateEndpoint.java | 92 ++++++ .../hystrix/HystrixDelegateProcessor.java | 90 ++++++ .../hystrix/HystrixDelegateProducer.java | 58 ++++ .../camel/component/hystrix/HystrixHelper.java | 85 ++++++ .../ProcessorInvokingObservableCommand.java | 201 +++++++++++++ .../hystrix/builders/HystrixWrappers.java | 146 ++++++++++ .../src/main/resources/META-INF/LICENSE.txt | 203 +++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../services/org/apache/camel/component/hystrix | 18 ++ .../component/hystrix/AbstractHystrixTest.java | 57 ++++ .../component/hystrix/HystrixEndpointTest.java | 42 +++ .../component/hystrix/HystrixProcessorTest.java | 235 +++++++++++++++ .../src/test/resources/log4j.properties | 37 +++ components/pom.xml | 1 + parent/pom.xml | 1 + 18 files changed, 1695 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/pom.xml b/components/camel-hystrix/pom.xml new file mode 100644 index 0000000..c3fa90c --- /dev/null +++ b/components/camel-hystrix/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.16-SNAPSHOT</version> + </parent> + + <artifactId>camel-hystrix</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Hystrix</name> + <description>Camel Hystrix component</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.hystrix.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=hystrix</camel.osgi.export.service> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <dependency> + <groupId>com.netflix.hystrix</groupId> + <artifactId>hystrix-core</artifactId> + <version>${hystrix-version}</version> + </dependency> + + <dependency> + <groupId>io.reactivex</groupId> + <artifactId>rxjava</artifactId> + <version>1.0.13</version> + </dependency> + + <dependency> + <groupId>com.netflix.archaius</groupId> + <artifactId>archaius-core</artifactId> + <version>0.4.1</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.netflix.hystrix</groupId> + <artifactId>hystrix-servo-metrics-publisher</artifactId> + <version>1.1.2</version> + <scope>test</scope> + </dependency> + </dependencies> + + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixComponent.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixComponent.java new file mode 100644 index 0000000..12994db --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixComponent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.component.hystrix.builders.HystrixWrappers; +import org.apache.camel.impl.DefaultComponent; + +/** + * Created by raul on 16/08/15. + */ +public class HystrixComponent extends DefaultComponent { + + public static final String HYSTRIX_REQUEST_CONTEXT_HEADER_NAME = "CamelHystrixRequestContext"; + + public HystrixWrappers wrapper() { + return new HystrixWrappers(getCamelContext()); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + throw new IllegalArgumentException("URI support for Hystrix endpoints not implemented. Use fluent builders."); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java new file mode 100644 index 0000000..8467336 --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixConfiguration.java @@ -0,0 +1,287 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.HashSet; +import java.util.Set; + +import com.netflix.hystrix.HystrixObservableCommand; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Processor; +import org.apache.camel.util.ExchangeHelper; +import rx.functions.Action2; + +/** + * Created by raul on 23/08/15. + */ +public class HystrixConfiguration { + + /** + * Enumeration to indicate which type of fallback we want. + */ + public enum FallbackType { + + /** + * The fallback is a {@link Processor}. Use {@link #setFallbackProcessor(Processor)} to set it. + */ + processor, + + /** + * The fallback is an {@link Endpoint}. Use + */ + endpoint, + + none + } + + /** + * A Hystrix {@link HystrixObservableCommand.Setter} setting the command group and command + * key. + */ + protected HystrixObservableCommand.Setter setter; + + /** + * The fallback type. + */ + protected FallbackType fallbackType = FallbackType.none; + + /** + * A fallback processor (optional) to invoke in case the target fails. + */ + protected Processor fallbackProcessor; + + /** + * A fallback endpoint (optional) to invoke in case the target fails. + */ + protected Endpoint fallbackEndpoint; + + /** + * The actual fallback, cached. + */ + protected Processor actualFallback; + + /** + * An {@link Expression} (optional) to calculate a cache key in case the user wants to enable caching. + */ + protected Expression cacheKeyExpression; + + /** + * A strategy for merging responses from the cache. If not set, Camel will use + * {@link ExchangeHelper#copyResults(Exchange, Exchange)} to copy the cached {@link Exchange} into the incoming + * <tt>Exchange</tt>, which will override the body, message headers and Exchange properties. + * <p> + * Setting a custom merge strategy allows the user to decide exactly what information has to be copied over. + * <p> + * The custom merge strategy is an {@link Action2} function that takes the incoming original <tt>Exchange</tt> as + * its first argument and the cached <tt>Exchange</tt> as the second argument. + */ + protected Action2<Exchange, Exchange> cacheMergeStrategy; + + /** + * Whether or not to propagate the {@link HystrixRequestContext}. + */ + protected boolean propagateRequestContext = true; + + /** + * Set of exceptions to ignore and wrap in a {@link com.netflix.hystrix.exception.HystrixBadRequestException}, so + * that they do not trigger a fallback. + */ + protected Set<Class<? extends Throwable>> exceptionsSuppressingFallback = new HashSet<>(); + + /** + * Constructor. + * @param + */ + public HystrixConfiguration() { + + } + + /** + * Validates that the provided options are correct and coherent. + * @throws IllegalArgumentException if options are invalid or incoherent. + */ + public void validateAndInit() throws Exception { + if (fallbackType == FallbackType.endpoint && fallbackEndpoint == null) { + throw new IllegalArgumentException("Fallback type is 'endpoint' but no fallback endpoint was provided"); + } + + if (fallbackType == FallbackType.processor && fallbackProcessor == null) { + throw new IllegalArgumentException("Fallback type is 'processor' but no fallback processor was provided"); + } + + if (fallbackType == FallbackType.none && (fallbackEndpoint != null || fallbackProcessor != null)) { + throw new IllegalArgumentException("Fallback type is 'none' but a fallback processor or endpoint was provided"); + } + + if (fallbackType == FallbackType.none) { + actualFallback = null; + } + + if (fallbackType == FallbackType.endpoint) { + actualFallback = fallbackEndpoint.createProducer(); + } else if (fallbackType == FallbackType.processor) { + actualFallback = fallbackProcessor; + } + } + + /** + * @return The Expression that evaluates the cache key, if any. Else, <tt>null</tt>. + */ + public Expression getCacheKeyExpression() { + return cacheKeyExpression; + } + + /** + * Sets an {@link Expression} to extract a cache key from the incoming {@link Exchange}. It must evaluate to a + * <tt>String</tt>. If set, Hystrix will enable response caching. + * + * @param cacheKeyExpression + */ + public void setCacheKeyExpression(Expression cacheKeyExpression) { + this.cacheKeyExpression = cacheKeyExpression; + } + + /** + * @return The current cache merge strategy, if any. + */ + public Action2<Exchange, Exchange> getCacheMergeStrategy() { + return cacheMergeStrategy; + } + + /** + * A strategy for merging responses from the cache. If not set, Camel will use + * {@link ExchangeHelper#copyResults(Exchange, Exchange)} to copy the cached {@link Exchange} into the incoming + * <tt>Exchange</tt>, which will override the body, message headers and Exchange properties. + * <p> + * Setting a custom merge strategy allows the user to decide exactly what information has to be copied over. + * <p> + * The custom merge strategy is an {@link Action2} function that takes the incoming original <tt>Exchange</tt> as + * its first argument and the cached <tt>Exchange</tt> as the second argument. + + * @param cacheMergeStrategy + */ + public void setCacheMergeStrategy(Action2<Exchange, Exchange> cacheMergeStrategy) { + this.cacheMergeStrategy = cacheMergeStrategy; + } + + /** + * @return The current {@link HystrixRequestContext} propagation policy. + */ + public boolean isPropagateRequestContext() { + return propagateRequestContext; + } + + /** + * Sets whether or not to propagate the {@link HystrixRequestContext} from other threads via a message header. By + * default the value is <tt>true</tt> (enabled). + * @param propagateRequestContext + */ + public void setPropagateRequestContext(boolean propagateRequestContext) { + this.propagateRequestContext = propagateRequestContext; + } + + /** + * @return The Setter. + */ + public HystrixObservableCommand.Setter getSetter() { + return setter; + } + + /** + * Hystrix Setter for HystrixObservableCommand properties. + * @param setter + */ + public void setSetter(HystrixObservableCommand.Setter setter) { + this.setter = setter; + } + + /** + * @return Ignored exceptions. + */ + public Set<Class<? extends Throwable>> getExceptionsSuppressingFallback() { + return exceptionsSuppressingFallback; + } + + /** + * Sets the exceptions to ignore and wrap in a {@link com.netflix.hystrix.exception.HystrixBadRequestException}, so + * that fallbacks are bypassed. + * + * @param exceptionsSuppressingFallback + */ + public void setExceptionsSuppressingFallback(Set<Class<? extends Throwable>> exceptionsSuppressingFallback) { + this.exceptionsSuppressingFallback = exceptionsSuppressingFallback; + } + + /** + * @return The fallback processor, if one was set. Else, <tt>null</tt>. + */ + public Processor getFallbackProcessor() { + return fallbackProcessor; + } + + /** + * Sets the fallback processor (optional). If a fallback processor is set, Hystrix will call it when the target + * processor returns an Exception. + * + * @param fallbackProcessor + */ + public void setFallbackProcessor(Processor fallbackProcessor) { + this.fallbackProcessor = fallbackProcessor; + } + + /** + * @return The fallback type. By default, + * {@link org.apache.camel.component.hystrix.HystrixConfiguration.FallbackType#none}. + */ + public FallbackType getFallbackType() { + return fallbackType; + } + + /** + * Sets the fallback type. By default, + * {@link org.apache.camel.component.hystrix.HystrixConfiguration.FallbackType#none}. + * <p> + * Camel will validate if the {@link Processor} or {@link Endpoint} has been provided; else it will throw + * + * @param fallbackType + */ + public void setFallbackType(FallbackType fallbackType) { + this.fallbackType = fallbackType; + } + + /** + * @return + */ + public Endpoint getFallbackEndpoint() { + return fallbackEndpoint; + } + + /** + * @param fallbackEndpoint + */ + public void setFallbackEndpoint(Endpoint fallbackEndpoint) { + this.fallbackEndpoint = fallbackEndpoint; + } + + public Processor getActualFallback() { + return actualFallback; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateEndpoint.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateEndpoint.java new file mode 100644 index 0000000..d139172 --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateEndpoint.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.util.ObjectHelper; + +/** + * Created by raul on 16/08/15. + */ +public class HystrixDelegateEndpoint extends DefaultEndpoint { + + /** + * Hystrix common configuration. + */ + private final HystrixConfiguration configuration; + + /** + * Target endpoint to invoke. + */ + private Endpoint target; + + /** + * Target endpoint to invoke. + */ + private Producer producer; + + public HystrixDelegateEndpoint(Endpoint target, HystrixConfiguration configuration) { + this.target = target; + this.configuration = configuration; + } + + @Override + public Producer createProducer() throws Exception { + ObjectHelper.notNull(target, "target endpoint"); + producer = target.createProducer(); + return new HystrixDelegateProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("The camel-hystrix component does not allow consumers yet."); + } + + @Override + public boolean isSingleton() { + return false; + } + + @Override + protected String createEndpointUri() { + return "dummy"; + } + + public Endpoint getTarget() { + return target; + } + + public void setTarget(Endpoint target) { + this.target = target; + } + + public Producer getProducer() { + return producer; + } + + public void setProducer(Producer producer) { + this.producer = producer; + } + + public HystrixConfiguration getConfiguration() { + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProcessor.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProcessor.java new file mode 100644 index 0000000..265e382 --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProcessor.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.AsyncProcessorHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by raul on 22/08/15. + */ +public class HystrixDelegateProcessor extends ServiceSupport implements AsyncProcessor { + + protected static final Logger LOG = LoggerFactory.getLogger(HystrixDelegateProcessor.class); + + /** + * Hystrix common configuration. + */ + private HystrixConfiguration configuration; + + /** + * The target processor to invoke. + */ + private Processor target; + + /** + * Constructor. + * @param target + * @param config + */ + public HystrixDelegateProcessor(Processor target, HystrixConfiguration config) { + this.configuration = config; + this.target = target; + } + + /** + * {@inheritDoc} + * @param incoming + * @param callback + * @return + */ + @Override + public boolean process(final Exchange incoming, final AsyncCallback callback) { + HystrixHelper.ensureRequestContextPresent(incoming, configuration); + + ProcessorInvokingObservableCommand command = new ProcessorInvokingObservableCommand(target, incoming, configuration); + command.observe().subscribe(command.generateSubscriber(callback)); + + return false; + } + + /** + * {@inheritDoc} + * @param exchange + * @throws Exception + */ + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + protected void doStart() throws Exception { + configuration.validateAndInit(); + } + + @Override + protected void doStop() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProducer.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProducer.java new file mode 100644 index 0000000..adc01c9 --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixDelegateProducer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by raul on 24/08/15. + */ +public class HystrixDelegateProducer extends DefaultAsyncProducer { + + protected static final Logger LOG = LoggerFactory.getLogger(HystrixDelegateProducer.class); + + private final HystrixDelegateEndpoint endpoint; + + private final HystrixConfiguration configuration; + + public HystrixDelegateProducer(Endpoint endpoint) { + super(endpoint); + this.endpoint = (HystrixDelegateEndpoint) endpoint; + this.configuration = this.endpoint.getConfiguration(); + } + + @Override + public boolean process(Exchange incoming, AsyncCallback callback) { + HystrixHelper.ensureRequestContextPresent(incoming, configuration); + + ProcessorInvokingObservableCommand command = new ProcessorInvokingObservableCommand(endpoint.getProducer(), incoming, configuration); + command.observe().subscribe(command.generateSubscriber(callback)); + + return false; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + endpoint.getConfiguration().validateAndInit(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixHelper.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixHelper.java new file mode 100644 index 0000000..85f942d --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/HystrixHelper.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.Set; + +import com.netflix.hystrix.exception.HystrixBadRequestException; +import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * Hystrix helpers. + */ +public final class HystrixHelper { + + private HystrixHelper() { } + + /** + * Creates a {@link Processor} to initialize a new {@link HystrixRequestContext}. + * + * @param force Whether to initialize a new context even if one already exists. + * @return The resulting processor. + */ + public static Processor initializeContext(final boolean force) { + return new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + if (!HystrixRequestContext.isCurrentThreadInitialized() || force) { + HystrixRequestContext.initializeContext(); + } + } + }; + } + + /** + * Either initializes a new {@link HystrixRequestContext}, or, if allowed by {@link HystrixConfiguration#isPropagateRequestContext()}, + * injects an existent HystrixRequestContext from the Camel {@link HystrixComponent#HYSTRIX_REQUEST_CONTEXT_HEADER_NAME} message header. + * + * If propagation is enabled and a new context is initialized, it is injected in the above mentioned header for subsequent + * propagation if necessary. + * + * This allows the context to be propagated across threads within the same Exchange. + */ + public static void ensureRequestContextPresent(Exchange incoming, HystrixConfiguration configuration) { + if (HystrixRequestContext.getContextForCurrentThread() != null) { + return; + } + + if (configuration.isPropagateRequestContext()) { + HystrixRequestContext hRq = incoming.getIn().getHeader(HystrixComponent.HYSTRIX_REQUEST_CONTEXT_HEADER_NAME, HystrixRequestContext.class); + if (hRq != null) { + HystrixRequestContext.setContextOnCurrentThread(hRq); + } else { + hRq = HystrixRequestContext.initializeContext(); + incoming.getIn().setHeader(HystrixComponent.HYSTRIX_REQUEST_CONTEXT_HEADER_NAME, hRq); + } + } else { + HystrixRequestContext.initializeContext(); + } + } + + public static Throwable maybeConvertThrowable(Throwable t, Set<Class<? extends Throwable>> suppressed) { + return suppressed.contains(t.getClass()) ? new HystrixBadRequestException("Exception suppressed for fallback", t) : t; + } + + public static Throwable unwrapThrowable(Throwable t) { + return t instanceof HystrixBadRequestException ? t.getCause() : t; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/ProcessorInvokingObservableCommand.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/ProcessorInvokingObservableCommand.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/ProcessorInvokingObservableCommand.java new file mode 100644 index 0000000..aaf9279 --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/ProcessorInvokingObservableCommand.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.concurrent.atomic.AtomicBoolean; + +import com.netflix.hystrix.HystrixObservableCommand; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.util.ExchangeHelper; +import rx.Observable; +import rx.Subscriber; + +/** + * {@link HystrixObservableCommand} that wraps the call to the target processor and applies a fallback if provided. + */ +public class ProcessorInvokingObservableCommand extends HystrixObservableCommand<Exchange> { + + /** + * The {@link Exchange}. + */ + private Exchange exchange; + + /** + * The target processor/endpoint. + */ + private Processor target; + + /** + * The fallback processor/endpoint. + */ + private Processor fallback; + + /** + * The Hystrix configuration. + */ + private HystrixConfiguration configuration; + + /** + * Whether a subscriber has already been generated for this command or not. + */ + private AtomicBoolean subscriberGenerated = new AtomicBoolean(false); + + /** + * Constructor. + * @param target + * @param exchange + * @param configuration + */ + public ProcessorInvokingObservableCommand(Processor target, Exchange exchange, HystrixConfiguration configuration) { + super(configuration.getSetter()); + this.exchange = exchange; + this.target = target; + this.fallback = configuration.getActualFallback(); + this.configuration = configuration; + } + + /** + * {@inheritDoc} + */ + @Override + protected Observable<Exchange> construct() { + return Observable.create(new CamelOnSubscribeFunction(target)); + } + + /** + * Uses the <tt>fallback</tt> as a fallback if the user has set one. + */ + @Override + protected Observable<Exchange> resumeWithFallback() { + if (fallback != null) { + return Observable.create(new CamelOnSubscribeFunction(fallback)); + } + return super.resumeWithFallback(); + } + + /** + * Evaluates the cache key if the user has set a cache key expression. + */ + @Override + protected String getCacheKey() { + return configuration.getCacheKeyExpression() == null ? super.getCacheKey() + : configuration.getCacheKeyExpression().evaluate(exchange, String.class); + } + + /** + * @return The Exchange. + */ + public Exchange getExchange() { + return exchange; + } + + /** + * @return The HystrixConfiguration. + */ + public HystrixConfiguration getConfiguration() { + return configuration; + } + + public Subscriber<Exchange> generateSubscriber(final AsyncCallback callback) { + // control that only one subscriber can be generated for this command + if (!subscriberGenerated.compareAndSet(false, true)) { + throw new RuntimeCamelException("A Subscriber had already been generated for a Hystrix Command. Check your code for bugs."); + } + + return new Subscriber<Exchange>() { + /** + * {@inheritDoc} + */ + @Override + public void onCompleted() { + callback.done(false); + } + + /** + * {@inheritDoc} + */ + @Override + public void onError(Throwable e) { + exchange.setException(HystrixHelper.unwrapThrowable(e)); + callback.done(false); + } + + /** + * {@inheritDoc} + */ + @Override + public void onNext(Exchange result) { + // do nothing, as the exchange would have been modified by the Processor already + // unless this is a cached response in which case we can invoke the cacheMergeStrategy + if (!ProcessorInvokingObservableCommand.this.isResponseFromCache()) { + return; + } + + if (configuration.getCacheMergeStrategy() == null) { + ExchangeHelper.copyResults(exchange, result); + } else { + configuration.getCacheMergeStrategy().call(exchange, result); + } + } + }; + } + + private final class CamelOnSubscribeFunction implements Observable.OnSubscribe<Exchange> { + + private final Processor processor; + + private CamelOnSubscribeFunction(Processor processor) { + this.processor = processor; + } + + @Override + public void call(Subscriber<? super Exchange> subscriber) { + if (subscriber.isUnsubscribed()) { + return; + } + + try { + // now let's invoke the processor; we don't care if this is an AsyncProcessor because at this + // point we're already in a thread pool managed by Hystrix and there's no further optimisation + // possible + processor.process(exchange); + + // if the processor set an Exception on the exchange, let's notify the subscriber; we also + // clear the exception from the Exchange because it will be set by the Subscriber + if (exchange.getException() != null) { + Throwable t = exchange.getException(); + exchange.setException(null); + subscriber.onError(HystrixHelper.maybeConvertThrowable(t, configuration.getExceptionsSuppressingFallback())); + return; + } + + // else, we signal an OK and feed the same Exchange onto the Subscriber. + // note: processors or the fallback processors have already modified the original Exchange, and + // that's what Camel will see. However, we need to return a COPY of the Exchange to Hystrix for + // caching purposes, as we don't want it to mutate between the time it was stored and the time + // it's requested again. + subscriber.onNext(exchange.copy()); + subscriber.onCompleted(); + } catch (Throwable t) { + subscriber.onError(HystrixHelper.maybeConvertThrowable(t, configuration.getExceptionsSuppressingFallback())); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/builders/HystrixWrappers.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/builders/HystrixWrappers.java b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/builders/HystrixWrappers.java new file mode 100644 index 0000000..493f731 --- /dev/null +++ b/components/camel-hystrix/src/main/java/org/apache/camel/component/hystrix/builders/HystrixWrappers.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix.builders; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import com.netflix.hystrix.HystrixObservableCommand; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Processor; +import org.apache.camel.component.hystrix.HystrixComponent; +import org.apache.camel.component.hystrix.HystrixConfiguration; +import org.apache.camel.component.hystrix.HystrixDelegateEndpoint; +import org.apache.camel.component.hystrix.HystrixDelegateProcessor; +import org.apache.camel.util.CamelContextHelper; +import rx.functions.Action2; + +/** + * Fluent DSL for building Hystrix Wrappers around {@link Processor}s, {@link Endpoint}s, etc. + * <p> + * The starting point to construct a wrapper is via {@link HystrixComponent#wrapper()}. Do not use the constructor + * of this class. + * + */ +public final class HystrixWrappers { + + private CamelContext context; + + public HystrixWrappers(CamelContext context) { + this.context = context; + } + + public HystrixStaticEndpointWrapperBuilder forStaticEndpoint(String uri, HystrixObservableCommand.Setter setter) { + Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(context, uri); + return forStaticEndpoint(endpoint, setter); + } + + public HystrixStaticEndpointWrapperBuilder forStaticEndpoint(Endpoint endpoint, HystrixObservableCommand.Setter setter) { + HystrixStaticEndpointWrapperBuilder answer = new HystrixStaticEndpointWrapperBuilder(endpoint); + answer.configuration.setSetter(setter); + return answer; + } + + public HystrixProcessorWrapperBuilder forProcessor(Processor processor, HystrixObservableCommand.Setter setter) { + HystrixProcessorWrapperBuilder answer = new HystrixProcessorWrapperBuilder(processor); + answer.configuration.setSetter(setter); + return answer; + } + + public abstract class AbstractHystrixWrapperBuilder<T extends AbstractHystrixWrapperBuilder> { + + protected HystrixConfiguration configuration = new HystrixConfiguration(); + + public abstract T thisBuilder(); + + public T withCacheKey(Expression expression) { + configuration.setCacheKeyExpression(expression); + return thisBuilder(); + } + + public T withCacheMergeStrategy(Action2<Exchange, Exchange> strategy) { + configuration.setCacheMergeStrategy(strategy); + return thisBuilder(); + } + + public T withPropagateRequestContext(boolean propagate) { + configuration.setPropagateRequestContext(propagate); + return thisBuilder(); + } + + public T withFallbackProcessor(Processor fallback) { + configuration.setFallbackType(HystrixConfiguration.FallbackType.processor); + configuration.setFallbackProcessor(fallback); + return thisBuilder(); + } + + public T withFallbackEndpoint(Endpoint fallback) { + configuration.setFallbackType(HystrixConfiguration.FallbackType.endpoint); + configuration.setFallbackEndpoint(fallback); + return thisBuilder(); + } + + public T withSuppressedExceptions(Class<? extends Throwable>... throwables) { + Set<Class<? extends Throwable>> t = new HashSet<>(Arrays.asList(throwables)); + configuration.setExceptionsSuppressingFallback(t); + return thisBuilder(); + } + + } + + public class HystrixStaticEndpointWrapperBuilder extends AbstractHystrixWrapperBuilder<HystrixStaticEndpointWrapperBuilder> { + + private HystrixDelegateEndpoint answer; + + public HystrixStaticEndpointWrapperBuilder(Endpoint target) { + this.answer = new HystrixDelegateEndpoint(target, configuration); + } + + public HystrixDelegateEndpoint build() { + return answer; + } + + @Override + public HystrixStaticEndpointWrapperBuilder thisBuilder() { + return this; + } + + } + + public class HystrixProcessorWrapperBuilder extends AbstractHystrixWrapperBuilder<HystrixProcessorWrapperBuilder> { + + private HystrixDelegateProcessor answer; + + public HystrixProcessorWrapperBuilder(Processor processor) { + answer = new HystrixDelegateProcessor(processor, configuration); + } + + public HystrixDelegateProcessor build() { + return answer; + } + + @Override + public HystrixProcessorWrapperBuilder thisBuilder() { + return this; + } + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/resources/META-INF/LICENSE.txt ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/resources/META-INF/LICENSE.txt b/components/camel-hystrix/src/main/resources/META-INF/LICENSE.txt new file mode 100644 index 0000000..6b0b127 --- /dev/null +++ b/components/camel-hystrix/src/main/resources/META-INF/LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/resources/META-INF/NOTICE.txt ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/resources/META-INF/NOTICE.txt b/components/camel-hystrix/src/main/resources/META-INF/NOTICE.txt new file mode 100644 index 0000000..2e215bf --- /dev/null +++ b/components/camel-hystrix/src/main/resources/META-INF/NOTICE.txt @@ -0,0 +1,11 @@ + ========================================================================= + == NOTICE file corresponding to the section 4 d of == + == the Apache License, Version 2.0, == + == in this case for the Apache Camel distribution. == + ========================================================================= + + This product includes software developed by + The Apache Software Foundation (http://www.apache.org/). + + Please read the different LICENSE files present in the licenses directory of + this distribution. http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/main/resources/META-INF/services/org/apache/camel/component/hystrix ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/main/resources/META-INF/services/org/apache/camel/component/hystrix b/components/camel-hystrix/src/main/resources/META-INF/services/org/apache/camel/component/hystrix new file mode 100644 index 0000000..8fc3ece --- /dev/null +++ b/components/camel-hystrix/src/main/resources/META-INF/services/org/apache/camel/component/hystrix @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.hystrix.HystrixComponent http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/AbstractHystrixTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/AbstractHystrixTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/AbstractHystrixTest.java new file mode 100644 index 0000000..785c764 --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/AbstractHystrixTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandKey; +import com.netflix.hystrix.HystrixObservableCommand; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.test.junit4.CamelTestSupport; + +/** + * Abstract Hystrix test. + */ +public abstract class AbstractHystrixTest extends CamelTestSupport { + + protected HystrixComponent hystrix; + + protected HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter + .withGroupKey(HystrixCommandGroupKey.Factory.asKey("cameltest")) + .andCommandKey(HystrixCommandKey.Factory.asKey("cameltest")); + + protected Processor target = new Processor() { + int i; + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World " + i++); + } + }; + + protected Processor fallback = new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello Fallback"); + } + }; + + @Override + protected void doPostSetup() throws Exception { + hystrix = context.getComponent("hystrix", HystrixComponent.class); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixEndpointTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixEndpointTest.java new file mode 100644 index 0000000..ddabbb3 --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixEndpointTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import org.junit.Test; + +/** + * Tests for Hystrix endpoints. + */ +public class HystrixEndpointTest extends AbstractHystrixTest { + + @Test + public void testEndpointOk() throws Exception { + HystrixDelegateEndpoint hystrixE = hystrix.wrapper() + .forStaticEndpoint("mock:test", setter) + .build(); + hystrixE.setCamelContext(context); + + getMockEndpoint("mock:test").whenAnyExchangeReceived(target); + getMockEndpoint("mock:test").message(0).body().isEqualTo("Hello World"); + + assertEquals("Hello World 0", template.requestBody(hystrixE, "Hello World")); + + assertMockEndpointsSatisfied(); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixProcessorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixProcessorTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixProcessorTest.java new file mode 100644 index 0000000..b3e6b9a --- /dev/null +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/HystrixProcessorTest.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hystrix; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelException; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.ExpressionBuilder; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.util.ServiceHelper; +import org.junit.Test; +import rx.functions.Action2; + +/** + * Tests for Processors. + */ +public class HystrixProcessorTest extends AbstractHystrixTest { + + @Test + public void testProcessorOk() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(target, setter) + .build(); + + Exchange exchange = new DefaultExchange(context); + hystrixP.process(exchange); + assertEquals("Hello World 0", exchange.getIn().getBody()); + } + + @Test + public void testProcessorFallbackExceptionThrown() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new CamelException("Bang!"); + } + }, setter) + .withFallbackProcessor(fallback).build(); + + ServiceHelper.startService(hystrixP); + + Exchange exchange = new DefaultExchange(context); + hystrixP.process(exchange); + assertEquals("Hello Fallback", exchange.getIn().getBody()); + } + + @Test + public void testProcessorFallbackExceptionSet() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new CamelException("Bang!")); + } + }, setter) + .withFallbackProcessor(fallback).build(); + + ServiceHelper.startService(hystrixP); + + Exchange exchange = new DefaultExchange(context); + hystrixP.process(exchange); + assertEquals("Hello Fallback", exchange.getIn().getBody()); + } + + @Test + public void testProcessorNoFallbackSuppressedExceptionThrown() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + throw new DummyException("Bang!"); + } + }, setter) + .withFallbackProcessor(fallback) + .withSuppressedExceptions(DummyException.class) + .build(); + + ServiceHelper.startService(hystrixP); + + Exchange exchange = new DefaultExchange(context); + hystrixP.process(exchange); + + // assert fallback not invoked and exception present and unwrapped + assertNotEquals("Hello Fallback", exchange.getIn().getBody()); + assertEquals(DummyException.class, exchange.getException().getClass()); + } + + @Test + public void testProcessorNoFallbackSuppressedExceptionSet() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setException(new DummyException("Bang!")); + } + }, setter) + .withFallbackProcessor(fallback) + .withSuppressedExceptions(DummyException.class) + .build(); + + ServiceHelper.startService(hystrixP); + + Exchange exchange = new DefaultExchange(context); + hystrixP.process(exchange); + + // assert fallback not invoked and exception present and unwrapped + assertNotEquals("Hello Fallback", exchange.getIn().getBody()); + assertEquals(DummyException.class, exchange.getException().getClass()); + } + + @Test + public void testProcessorWithCache() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(target, setter) + .withCacheKey(ExpressionBuilder.bodyExpression()) + .build(); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello"); + hystrixP.process(exchange); + exchange.getIn().setBody("Hello"); + hystrixP.process(exchange); + + assertEquals("Hello World 0", exchange.getIn().getBody()); + } + + @Test + public void testProcessorWithCacheAndCustomMergeFunction() throws Exception { + HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(target, setter) + .withCacheKey(ExpressionBuilder.bodyExpression()) + .withCacheMergeStrategy(new Action2<Exchange, Exchange>() { + @Override + public void call(Exchange incoming, Exchange result) { + incoming.getIn().setBody("MERGED: " + result.getIn().getBody(String.class)); + } + }) + .build(); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello"); + hystrixP.process(exchange); + exchange.getIn().setBody("Hello"); + hystrixP.process(exchange); + + assertEquals("MERGED: Hello World 0", exchange.getIn().getBody()); + } + + @Test + public void testProcessorWithNoRequestPropagation() throws Exception { + final HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(target, setter) + .withCacheKey(ExpressionBuilder.bodyExpression()) + .withPropagateRequestContext(false) + .build(); + + final Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello"); + hystrixP.process(exchange); + + // now run the second request in a different thread + final CountDownLatch latch = new CountDownLatch(1); + new Thread(new Runnable() { + @Override + public void run() { + exchange.getIn().setBody("Hello"); + try { + hystrixP.process(exchange); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + + latch.await(5, TimeUnit.SECONDS); + assertEquals("Hello World 1", exchange.getIn().getBody()); + } + + @Test + public void testProcessorWithRequestPropagation() throws Exception { + final HystrixDelegateProcessor hystrixP = hystrix.wrapper() + .forProcessor(target, setter) + .withCacheKey(ExpressionBuilder.bodyExpression()) + .withPropagateRequestContext(true) + .build(); + + final Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello"); + hystrixP.process(exchange); + + // now run the second request in a different thread + final CountDownLatch latch = new CountDownLatch(1); + new Thread(new Runnable() { + @Override + public void run() { + exchange.getIn().setBody("Hello"); + try { + hystrixP.process(exchange); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + + latch.await(5, TimeUnit.SECONDS); + assertEquals("Hello World 0", exchange.getIn().getBody()); + } + + private class DummyException extends RuntimeException { + public DummyException(String message) { + super(message); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/camel-hystrix/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-hystrix/src/test/resources/log4j.properties b/components/camel-hystrix/src/test/resources/log4j.properties new file mode 100644 index 0000000..cb64298 --- /dev/null +++ b/components/camel-hystrix/src/test/resources/log4j.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, file +# change the logging level of this category to increase verbosity of the MongoDB component +log4j.category.org.apache.camel.component.mongodb=INFO, file +log4j.additivity.org.apache.camel.component.mongodb=false + +# uncomment the following line to turn on Camel debugging +#log4j.logger.org.apache.camel=DEBUG + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n +#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.file.file=target/camel-mongodb-test.log http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index f79d07d..2f72891 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -122,6 +122,7 @@ <module>camel-hdfs2</module> <module>camel-hipchat</module> <module>camel-hl7</module> + <module>camel-hystrix</module> <module>camel-ibatis</module> <module>camel-ical</module> <module>camel-infinispan</module> http://git-wip-us.apache.org/repos/asf/camel/blob/07f38b3b/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index d9bc3f7..169bbe1 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -224,6 +224,7 @@ <httpclient4-version>4.4.1</httpclient4-version> <httpasyncclient-version>4.1</httpasyncclient-version> <httpclient-version>3.1</httpclient-version> + <hystrix-version>1.4.14</hystrix-version> <ibatis-bundle-version>2.3.4.726_4</ibatis-bundle-version> <ibatis-version>2.3.4.726</ibatis-version> <ical4j-version>1.0.6</ical4j-version>