CAMEL-7833 Added convenience method ReactiveCamel.to(...).
Changed test to highlight the idea of creating routes with RX.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3c3b0edd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3c3b0edd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3c3b0edd

Branch: refs/heads/master
Commit: 3c3b0edddfe555e1bfd247258c4f1aa4806d3b38
Parents: c67392f
Author: Jyrki Ruuskanen <yur...@kotikone.fi>
Authored: Thu Apr 2 22:38:55 2015 +0300
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Fri Apr 3 09:20:31 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/ReactiveCamel.java    | 14 ++++++++++++++
 .../org/apache/camel/rx/CamelOperatorTest.java     | 17 +++++++++++++----
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3c3b0edd/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java 
b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
index 368cb53..678c4e8 100644
--- a/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
@@ -92,6 +92,20 @@ public class ReactiveCamel {
         }
     }
 
+    /**
+     * Convenience method for creating CamelOperator instances
+     */
+    public CamelOperator to(String uri) throws Exception {
+        return new CamelOperator(camelContext, uri);
+    }
+
+    /**
+     * Convenience method for creating CamelOperator instances
+     */
+    public CamelOperator to(Endpoint endpoint) throws Exception {
+        return new CamelOperator(endpoint);
+    }
+
     public CamelContext getCamelContext() {
         return camelContext;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3c3b0edd/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java 
b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
index 8667e3b..f0bacc3 100644
--- 
a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
+++ 
b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java
@@ -24,6 +24,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
+import rx.Subscription;
+import rx.observables.ConnectableObservable;
 
 /**
  */
@@ -39,12 +41,16 @@ public class CamelOperatorTest extends RxTestSupport {
         mockEndpoint2.expectedMessageCount(1);
         mockEndpoint3.expectedMessageCount(1);
 
-        Observable<Message> result = reactiveCamel.toObservable("direct:start")
-            .lift(new CamelOperator(camelContext, "mock:results1"))
+        ConnectableObservable<Message> route = 
reactiveCamel.toObservable("direct:start")
+            .lift(new CamelOperator(mockEndpoint1))
             .lift(new CamelOperator(camelContext, "log:foo"))
             .debounce(1, TimeUnit.SECONDS)
-            .lift(new CamelOperator(mockEndpoint2));
-        reactiveCamel.sendTo(result, "mock:results3");
+            .lift(reactiveCamel.to(mockEndpoint2))
+            .lift(reactiveCamel.to("mock:results3"))
+            .publish();
+
+        // Start the route
+        Subscription routeSubscription = route.connect();
 
         // Send two test messages
         producerTemplate.sendBody("direct:start", "<test/>");
@@ -53,5 +59,8 @@ public class CamelOperatorTest extends RxTestSupport {
         mockEndpoint1.assertIsSatisfied();
         mockEndpoint2.assertIsSatisfied();
         mockEndpoint3.assertIsSatisfied();
+
+        // Stop the route
+        routeSubscription.unsubscribe();
     }
 }

Reply via email to