Author: jstrachan
Date: Wed Mar  6 09:26:48 2013
New Revision: 1453234

URL: http://svn.apache.org/r1453234
Log:
added the ObservableMessage and ObservableBody helper classes which are 
Processors and make it easy to embed some RX processing code to handle messages 
/ bodies inside an existing camel route. e.g. its handy if you want to do 
filtering, marshalling or transforming before hitting the RX code. Also 
refactored some useful functions and classes into the support package, renamed 
the test cases to be more descriptive

Added:
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
   (with props)
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
   (with props)
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
   (with props)
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
   (with props)
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
   (with props)
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
   (contents, props changed)
      - copied, changed from r1453182, 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
   (with props)
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
   (with props)
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
   (with props)
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
   (contents, props changed)
      - copied, changed from r1453182, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
   (contents, props changed)
      - copied, changed from r1453182, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
   (contents, props changed)
      - copied, changed from r1453182, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
Removed:
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
    
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java
Modified:
    camel/trunk/components/camel-rx/pom.xml
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
    
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java

Modified: camel/trunk/components/camel-rx/pom.xml
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/pom.xml?rev=1453234&r1=1453233&r2=1453234&view=diff
==============================================================================
--- camel/trunk/components/camel-rx/pom.xml (original)
+++ camel/trunk/components/camel-rx/pom.xml Wed Mar  6 09:26:48 2013
@@ -54,7 +54,7 @@
     <!-- test dependencies -->
     <dependency>
       <groupId>org.apache.camel</groupId>
-      <artifactId>camel-test-spring</artifactId>      
+      <artifactId>camel-test</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.rx.support.ExchangeToBodyFunc1;
+import org.apache.camel.rx.support.ObservableProcessor;
+
+/**
+ * A base class for a {@link Processor} which allows you to process
+ * messages using an {@link Observable< org.apache.camel.Message>} by 
implementing the
+ * abstract {@link 
org.apache.camel.rx.support.ObservableProcessor#configure(rx.Observable} method.
+ */
+public abstract class ObservableBody<T> extends ObservableProcessor<T> {
+    private final Class<T> bodyType;
+
+    public ObservableBody(Class<T> bodyType) {
+        super(new ExchangeToBodyFunc1(bodyType));
+        this.bodyType = bodyType;
+    }
+
+    public String toString() {
+        return "ObservableBody[" + bodyType.getName() + "]";
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.Message;
+import org.apache.camel.rx.support.ExchangeToMessageFunc1;
+import org.apache.camel.rx.support.ObservableProcessor;
+
+/**
+ * A base class for a {@link Processor} which allows you to process
+ * messages using an {@link Observable<Message>} by implementing the
+ * abstract {@link 
org.apache.camel.rx.support.ObservableProcessor#configure(rx.Observable} method.
+ */
+public abstract class ObservableMessage extends ObservableProcessor<Message> {
+    public ObservableMessage() {
+        super(ExchangeToMessageFunc1.getInstance());
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1453234&r1=1453233&r2=1453234&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java
 Wed Mar  6 09:26:48 2013
@@ -23,7 +23,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.rx.support.EndpointObservable;
 import org.apache.camel.rx.support.EndpointSubscription;
-import org.apache.camel.rx.support.ProducerObserver;
+import org.apache.camel.rx.support.ExchangeToBodyFunc1;
+import org.apache.camel.rx.support.ExchangeToMessageFunc1;
+import org.apache.camel.rx.support.ObserverSender;
 import org.apache.camel.util.CamelContextHelper;
 
 import rx.Observable;
@@ -66,12 +68,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
      */
     public Observable<Message> toObservable(Endpoint endpoint) {
-        return createEndpointObservable(endpoint, new Func1<Exchange, 
Message>() {
-            @Override
-            public Message call(Exchange exchange) {
-                return exchange.getIn();
-            }
-        });
+        return createEndpointObservable(endpoint, 
ExchangeToMessageFunc1.getInstance());
     }
 
     /**
@@ -80,13 +77,7 @@ public class ReactiveCamel {
      * to be processed using  <a href="https://rx.codeplex.com/";>Reactive 
Extensions</a>
      */
     public <T> Observable<T> toObservable(Endpoint endpoint, final Class<T> 
bodyType) {
-        return createEndpointObservable(endpoint, new Func1<Exchange, T>() {
-            @Override
-            public T call(Exchange exchange) {
-                Message in = exchange.getIn();
-                return in.getBody(bodyType);
-            }
-        });
+        return createEndpointObservable(endpoint, new 
ExchangeToBodyFunc1<T>(bodyType));
     }
 
     /**
@@ -100,7 +91,7 @@ public class ReactiveCamel {
      */
     public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
         try {
-            ProducerObserver observer = new ProducerObserver(endpoint);
+            ObserverSender observer = new ObserverSender(endpoint);
             observable.subscribe(observer);
         } catch (Exception e) {
             throw new RuntimeCamelRxException(e);

Modified: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java?rev=1453234&r1=1453233&r2=1453234&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java
 Wed Mar  6 09:26:48 2013
@@ -40,13 +40,7 @@ public class EndpointSubscription<T> imp
         this.observer = observer;
 
         // lets create the consumer
-        Processor processor = new Processor() {
-            @Override
-            public void process(Exchange exchange) throws Exception {
-                T value = func.call(exchange);
-                observer.onNext(value);
-            }
-        };
+        Processor processor = new ProcessorToObserver<T>(func, observer);
         try {
             this.consumer = endpoint.createConsumer(processor);
             this.consumer.start();
@@ -81,4 +75,5 @@ public class EndpointSubscription<T> imp
     public Observer<T> getObserver() {
         return observer;
     }
+
 }

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+import rx.util.functions.Func1;
+
+/**
+ * A simple {@link Func1} to convert an {@link Exchange} to the given type 
using the
+ * IN {@link Message}'s body
+ */
+public class ExchangeToBodyFunc1<T> implements Func1<Exchange, T> {
+    private final Class<T> bodyType;
+
+    public ExchangeToBodyFunc1(Class<T> bodyType) {
+        this.bodyType = bodyType;
+    }
+
+    @Override
+    public T call(Exchange exchange) {
+        Message in = exchange.getIn();
+        return in.getBody(bodyType);
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,39 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+import rx.util.functions.Func1;
+
+/**
+ * A simple {@link Func1} to convert an {@link Exchange} to its IN {@link 
Message}
+ */
+public class ExchangeToMessageFunc1 implements Func1<Exchange, Message> {
+    private static ExchangeToMessageFunc1 instance = new 
ExchangeToMessageFunc1();
+
+    public static ExchangeToMessageFunc1 getInstance() {
+        return instance;
+    }
+
+    @Override
+    public Message call(Exchange exchange) {
+        return exchange.getIn();
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.support.ServiceSupport;
+
+import rx.Observable;
+import rx.subjects.Subject;
+import rx.util.functions.Func1;
+
+/**
+ * A base class for implementing a {@link Processor} which provides access to 
an {@link Observable}
+ * so that the messages can be processed using the <a 
href="https://github.com/Netflix/RxJava/wiki";>RX Java API</a>
+ */
+public abstract class ObservableProcessor<T> extends ServiceSupport implements 
Processor {
+    private final Subject<T> observable = Subject.create();
+    private final ProcessorToObserver processor;
+
+    protected ObservableProcessor(Func1<Exchange, T> func) {
+        this.processor = new ProcessorToObserver(func, observable);
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        processor.process(exchange);
+    }
+
+    /**
+     * Returns the {@link Observable} for this {@link Processor} so that the 
messages that are received
+     * can be processed using the <a 
href="https://github.com/Netflix/RxJava/wiki";>RX Java API</a>
+     */
+    public Observable<T> getObservable() {
+        return observable;
+    }
+
+    /**
+     * Provides the configuration hook so that derived classes can process the 
observable
+     * to use whatever RX methods they wish to process the incoming events
+     * @param observable
+     */
+    protected abstract void configure(Observable<T> observable);
+
+    protected void doStart() throws Exception {
+        configure(getObservable());
+    }
+
+
+    protected void doStop() throws Exception {
+        observable.onCompleted();
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
 (from r1453182, 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java?p2=camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java&p1=camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
 Wed Mar  6 09:26:48 2013
@@ -26,12 +26,13 @@ import org.apache.camel.rx.RuntimeCamelR
 import rx.Observer;
 
 /**
+ * An {@link Observer} which sends events to a given {@link Endpoint}
  */
-public class ProducerObserver implements Observer {
+public class ObserverSender implements Observer {
     private Endpoint endpoint;
     private Producer producer;
 
-    public ProducerObserver(Endpoint endpoint) throws Exception {
+    public ObserverSender(Endpoint endpoint) throws Exception {
         this.endpoint = endpoint;
         this.producer = endpoint.createProducer();
         this.producer.start();

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx.support;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import rx.Observer;
+import rx.util.functions.Func1;
+
+/**
+ * A {@link Processor} which invokes an underling {@link Observer} as messages
+ * arrive using hte given function to convert the {@link Exchange} to the 
required
+ * object
+ */
+public class ProcessorToObserver<T> implements Processor {
+    private final Func1<Exchange, T> func;
+    private final Observer<T> observer;
+
+    public ProcessorToObserver(Func1<Exchange, T> func, Observer<T> observer) {
+        this.func = func;
+        this.observer = observer;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        Exception exception = null;
+        if (exchange.isFailed()) {
+            exception = exchange.getException();
+        }
+        if (exception != null) {
+            observer.onError(exception);
+        } else {
+            T value = func.call(exchange);
+            observer.onNext(value);
+        }
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,78 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+public class ObservableBodyTest extends CamelTestSupport {
+    protected MyObservableBody observableBody = new MyObservableBody();
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    @Test
+    public void testUseObservableInRoute() throws Exception {
+        resultEndpoint.expectedBodiesReceived("Hello James", "Hello Claus");
+
+        template.sendBody("James");
+        template.sendBody("Claus");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public class MyObservableBody extends ObservableBody<String> {
+        public MyObservableBody() {
+            super(String.class);
+        }
+
+        protected void configure(Observable<String> observable) {
+            // lets process the messages using the RX API
+            observable.map(new Func1<String, String>() {
+                public String call(String body) {
+                    return "Hello " + body;
+                }
+            }).subscribe(new Action1<String>() {
+                public void call(String body) {
+                    template.sendBody(resultEndpoint, body);
+                }
+            });
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").process(observableBody);
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java?rev=1453234&view=auto
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
 (added)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
 Wed Mar  6 09:26:48 2013
@@ -0,0 +1,75 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.rx;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Message;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+public class ObservableMessageTest extends CamelTestSupport {
+    protected MyObservableMessage observableMessage = new 
MyObservableMessage();
+
+    @EndpointInject(uri = "mock:result")
+    protected MockEndpoint resultEndpoint;
+
+    @Produce(uri = "direct:start")
+    protected ProducerTemplate template;
+
+    @Test
+    public void testUseObservableInRoute() throws Exception {
+        resultEndpoint.expectedBodiesReceived("Hello James", "Hello Claus");
+
+        template.sendBody("James");
+        template.sendBody("Claus");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public class MyObservableMessage extends ObservableMessage {
+        protected void configure(Observable<Message> observable) {
+            // lets process the messages using the RX API
+            observable.map(new Func1<Message, String>() {
+                public String call(Message message) {
+                    return "Hello " + message.getBody(String.class);
+                }
+            }).subscribe(new Action1<String>() {
+                public void call(String body) {
+                    template.sendBody(resultEndpoint, body);
+                }
+            });
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").process(observableMessage);
+            }
+        };
+    }
+}

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
 (from r1453182, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
 Wed Mar  6 09:26:48 2013
@@ -29,8 +29,8 @@ import rx.util.functions.Func1;
 
 /**
  */
-public class ObservableMessageMapTest extends RxTestSupport {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(ObservableMessageMapTest.class);
+public class ToObservableAndMapTest extends RxTestSupport {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(ToObservableAndMapTest.class);
 
     @Test
     public void testConsume() throws Exception {

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
 (from r1453182, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
 Wed Mar  6 09:26:48 2013
@@ -28,8 +28,8 @@ import rx.util.functions.Func1;
 
 /**
  */
-public class ObservableBodyTest extends RxTestSupport {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(ObservableBodyTest.class);
+public class ToObservableBodyTest extends RxTestSupport {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(ToObservableBodyTest.class);
 
     @Test
     public void testConsume() throws Exception {

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
 (from r1453182, 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff
==============================================================================
--- 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java
 (original)
+++ 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
 Wed Mar  6 09:26:48 2013
@@ -28,8 +28,8 @@ import rx.util.functions.Action1;
 
 /**
  */
-public class ObservableMessageTest extends RxTestSupport {
-    private static final transient Logger LOG = 
LoggerFactory.getLogger(ObservableMessageTest.class);
+public class ToObservableTest extends RxTestSupport {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(ToObservableTest.class);
 
     @Test
     public void testConsume() throws Exception {

Propchange: 
camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to