CAMEL-7833 Added convenience method ReactiveCamel.to(...). Changed test to highlight the idea of creating routes with RX.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3c3b0edd Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3c3b0edd Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3c3b0edd Branch: refs/heads/master Commit: 3c3b0edddfe555e1bfd247258c4f1aa4806d3b38 Parents: c67392f Author: Jyrki Ruuskanen <yur...@kotikone.fi> Authored: Thu Apr 2 22:38:55 2015 +0300 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Fri Apr 3 09:20:31 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/camel/rx/ReactiveCamel.java | 14 ++++++++++++++ .../org/apache/camel/rx/CamelOperatorTest.java | 17 +++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3c3b0edd/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java index 368cb53..678c4e8 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java @@ -92,6 +92,20 @@ public class ReactiveCamel { } } + /** + * Convenience method for creating CamelOperator instances + */ + public CamelOperator to(String uri) throws Exception { + return new CamelOperator(camelContext, uri); + } + + /** + * Convenience method for creating CamelOperator instances + */ + public CamelOperator to(Endpoint endpoint) throws Exception { + return new CamelOperator(endpoint); + } + public CamelContext getCamelContext() { return camelContext; } http://git-wip-us.apache.org/repos/asf/camel/blob/3c3b0edd/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java index 8667e3b..f0bacc3 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java @@ -24,6 +24,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; +import rx.Subscription; +import rx.observables.ConnectableObservable; /** */ @@ -39,12 +41,16 @@ public class CamelOperatorTest extends RxTestSupport { mockEndpoint2.expectedMessageCount(1); mockEndpoint3.expectedMessageCount(1); - Observable<Message> result = reactiveCamel.toObservable("direct:start") - .lift(new CamelOperator(camelContext, "mock:results1")) + ConnectableObservable<Message> route = reactiveCamel.toObservable("direct:start") + .lift(new CamelOperator(mockEndpoint1)) .lift(new CamelOperator(camelContext, "log:foo")) .debounce(1, TimeUnit.SECONDS) - .lift(new CamelOperator(mockEndpoint2)); - reactiveCamel.sendTo(result, "mock:results3"); + .lift(reactiveCamel.to(mockEndpoint2)) + .lift(reactiveCamel.to("mock:results3")) + .publish(); + + // Start the route + Subscription routeSubscription = route.connect(); // Send two test messages producerTemplate.sendBody("direct:start", "<test/>"); @@ -53,5 +59,8 @@ public class CamelOperatorTest extends RxTestSupport { mockEndpoint1.assertIsSatisfied(); mockEndpoint2.assertIsSatisfied(); mockEndpoint3.assertIsSatisfied(); + + // Stop the route + routeSubscription.unsubscribe(); } }