Author: jstrachan
Date: Mon Mar  4 08:26:40 2013
New Revision: 1452207

URL: http://svn.apache.org/r1452207
Log:
added a helper method for sending events on any Observable<T> to a camel 
endpoint

Added:
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
   (with props)
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
   (with props)
Modified:
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java

Modified: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1452207&r1=1452206&r2=1452207&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
 Mon Mar  4 08:26:40 2013
@@ -23,6 +23,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.rx.support.EndpointObservable;
 import org.apache.camel.rx.support.EndpointSubscription;
+import org.apache.camel.rx.support.ProducerObserver;
+import org.apache.camel.util.CamelContextHelper;
 
 import rx.Observable;
 import rx.Observer;
@@ -45,7 +47,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
      */
     public Observable<Message> toObservable(String uri) {
-        return toObservable(camelContext.getEndpoint(uri));
+        return toObservable(endpoint(uri));
     }
 
     /**
@@ -54,7 +56,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
      */
     public <T> Observable<T> toObservable(String uri, final Class<T> bodyType) 
{
-        return toObservable(camelContext.getEndpoint(uri), bodyType);
+        return toObservable(endpoint(uri), bodyType);
     }
 
 
@@ -87,10 +89,33 @@ public class ReactiveCamel {
         });
     }
 
+    /**
+     * Sends events on the given {@link Observable} to the given camel endpoint
+     */
+    public <T> void sendTo(Observable<T> observable, String endpointUri) {
+        sendTo(observable, endpoint(endpointUri));
+    }
+    /**
+     * Sends events on the given {@link Observable} to the given camel endpoint
+     */
+    public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
+        try {
+            ProducerObserver observer = new ProducerObserver(endpoint);
+            observable.subscribe(observer);
+        } catch (Exception e) {
+            throw new RuntimeCamelRxException(e);
+        }
+    }
+
+
     public CamelContext getCamelContext() {
         return camelContext;
     }
 
+    public Endpoint endpoint(String endpointUri) {
+        return CamelContextHelper.getMandatoryEndpoint(camelContext, 
endpointUri);
+    }
+
 
     /**
      * Returns a newly created {@link Observable} given a function which 
converts
@@ -106,4 +131,5 @@ public class ReactiveCamel {
         };
         return new EndpointObservable(endpoint, func);
     }
+
 }

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java?rev=1452207&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
 Mon Mar  4 08:26:40 2013
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Producer;
+import org.apache.camel.rx.RuntimeCamelRxException;
+
+import rx.Observer;
+
+/**
+ */
+public class ProducerObserver implements Observer {
+    private Endpoint endpoint;
+    private Producer producer;
+
+    public ProducerObserver(Endpoint endpoint) throws Exception {
+        this.endpoint = endpoint;
+        this.producer = endpoint.createProducer();
+        this.producer.start();
+    }
+
+    public void onCompleted() {
+        if (producer != null) {
+            try {
+                producer.stop();
+            } catch (Exception e) {
+                throw new RuntimeCamelRxException(e);
+            } finally {
+                producer = null;
+            }
+        }
+    }
+
+    public void onError(Exception e) {
+        Exchange exchange = producer.createExchange();
+        exchange.setException(e);
+        send(exchange);
+    }
+
+    public void onNext(Object o) {
+        Exchange exchange = producer.createExchange();
+        exchange.getIn().setBody(o);
+        send(exchange);
+    }
+
+    protected void send(Exchange exchange) {
+        try {
+            producer.process(exchange);
+        } catch (Exception e) {
+            throw new RuntimeCamelRxException(e);
+        }
+    }
+
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java?rev=1452207&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
 Mon Mar  4 08:26:40 2013
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import rx.Observable;
+
+/**
+ */
+public class SendToTest extends RxTestSupport {
+    @Test
+    public void testSendObservableToEndpoint() throws Exception {
+        Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 
2.20), new Order("o3", 3.30)};
+        Observable<Order> someObservable = 
Observable.toObservable(expectedBodies);
+
+        final MockEndpoint mockEndpoint = 
camelContext.getEndpoint("mock:results", MockEndpoint.class);
+        mockEndpoint.expectedBodiesReceived(expectedBodies);
+
+        // lets send events on the observable to the camel endpoint
+        reactiveCamel.sendTo(someObservable, "mock:results");
+
+        mockEndpoint.assertIsSatisfied();
+
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to