Author: jstrachan Date: Wed Mar 6 09:26:48 2013 New Revision: 1453234 URL: http://svn.apache.org/r1453234 Log: added the ObservableMessage and ObservableBody helper classes which are Processors and make it easy to embed some RX processing code to handle messages / bodies inside an existing camel route. e.g. its handy if you want to do filtering, marshalling or transforming before hitting the RX code. Also refactored some useful functions and classes into the support package, renamed the test cases to be more descriptive
Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java (with props) camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java (with props) camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java (with props) camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java (with props) camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java (with props) camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java (contents, props changed) - copied, changed from r1453182, camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java (with props) camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java (with props) camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java (with props) camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java (contents, props changed) - copied, changed from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java (contents, props changed) - copied, changed from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java (contents, props changed) - copied, changed from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java Removed: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java Modified: camel/trunk/components/camel-rx/pom.xml camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java Modified: camel/trunk/components/camel-rx/pom.xml URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/pom.xml?rev=1453234&r1=1453233&r2=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/pom.xml (original) +++ camel/trunk/components/camel-rx/pom.xml Wed Mar 6 09:26:48 2013 @@ -54,7 +54,7 @@ <!-- test dependencies --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-spring</artifactId> + <artifactId>camel-test</artifactId> <scope>test</scope> </dependency> <dependency> Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java (added) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import org.apache.camel.rx.support.ExchangeToBodyFunc1; +import org.apache.camel.rx.support.ObservableProcessor; + +/** + * A base class for a {@link Processor} which allows you to process + * messages using an {@link Observable< org.apache.camel.Message>} by implementing the + * abstract {@link org.apache.camel.rx.support.ObservableProcessor#configure(rx.Observable} method. + */ +public abstract class ObservableBody<T> extends ObservableProcessor<T> { + private final Class<T> bodyType; + + public ObservableBody(Class<T> bodyType) { + super(new ExchangeToBodyFunc1(bodyType)); + this.bodyType = bodyType; + } + + public String toString() { + return "ObservableBody[" + bodyType.getName() + "]"; + } +} Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableBody.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java (added) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,33 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import org.apache.camel.Message; +import org.apache.camel.rx.support.ExchangeToMessageFunc1; +import org.apache.camel.rx.support.ObservableProcessor; + +/** + * A base class for a {@link Processor} which allows you to process + * messages using an {@link Observable<Message>} by implementing the + * abstract {@link org.apache.camel.rx.support.ObservableProcessor#configure(rx.Observable} method. + */ +public abstract class ObservableMessage extends ObservableProcessor<Message> { + public ObservableMessage() { + super(ExchangeToMessageFunc1.getInstance()); + } +} Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ObservableMessage.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java?rev=1453234&r1=1453233&r2=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java (original) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/ReactiveCamel.java Wed Mar 6 09:26:48 2013 @@ -23,7 +23,9 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.rx.support.EndpointObservable; import org.apache.camel.rx.support.EndpointSubscription; -import org.apache.camel.rx.support.ProducerObserver; +import org.apache.camel.rx.support.ExchangeToBodyFunc1; +import org.apache.camel.rx.support.ExchangeToMessageFunc1; +import org.apache.camel.rx.support.ObserverSender; import org.apache.camel.util.CamelContextHelper; import rx.Observable; @@ -66,12 +68,7 @@ public class ReactiveCamel { * to be processed using <a href="https://rx.codeplex.com/">Reactive Extensions</a> */ public Observable<Message> toObservable(Endpoint endpoint) { - return createEndpointObservable(endpoint, new Func1<Exchange, Message>() { - @Override - public Message call(Exchange exchange) { - return exchange.getIn(); - } - }); + return createEndpointObservable(endpoint, ExchangeToMessageFunc1.getInstance()); } /** @@ -80,13 +77,7 @@ public class ReactiveCamel { * to be processed using <a href="https://rx.codeplex.com/">Reactive Extensions</a> */ public <T> Observable<T> toObservable(Endpoint endpoint, final Class<T> bodyType) { - return createEndpointObservable(endpoint, new Func1<Exchange, T>() { - @Override - public T call(Exchange exchange) { - Message in = exchange.getIn(); - return in.getBody(bodyType); - } - }); + return createEndpointObservable(endpoint, new ExchangeToBodyFunc1<T>(bodyType)); } /** @@ -100,7 +91,7 @@ public class ReactiveCamel { */ public <T> void sendTo(Observable<T> observable, Endpoint endpoint) { try { - ProducerObserver observer = new ProducerObserver(endpoint); + ObserverSender observer = new ObserverSender(endpoint); observable.subscribe(observer); } catch (Exception e) { throw new RuntimeCamelRxException(e); Modified: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java?rev=1453234&r1=1453233&r2=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java (original) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java Wed Mar 6 09:26:48 2013 @@ -40,13 +40,7 @@ public class EndpointSubscription<T> imp this.observer = observer; // lets create the consumer - Processor processor = new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - T value = func.call(exchange); - observer.onNext(value); - } - }; + Processor processor = new ProcessorToObserver<T>(func, observer); try { this.consumer = endpoint.createConsumer(processor); this.consumer.start(); @@ -81,4 +75,5 @@ public class EndpointSubscription<T> imp public Observer<T> getObserver() { return observer; } + } Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java (added) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,41 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx.support; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +import rx.util.functions.Func1; + +/** + * A simple {@link Func1} to convert an {@link Exchange} to the given type using the + * IN {@link Message}'s body + */ +public class ExchangeToBodyFunc1<T> implements Func1<Exchange, T> { + private final Class<T> bodyType; + + public ExchangeToBodyFunc1(Class<T> bodyType) { + this.bodyType = bodyType; + } + + @Override + public T call(Exchange exchange) { + Message in = exchange.getIn(); + return in.getBody(bodyType); + } +} Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToBodyFunc1.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java (added) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,39 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx.support; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +import rx.util.functions.Func1; + +/** + * A simple {@link Func1} to convert an {@link Exchange} to its IN {@link Message} + */ +public class ExchangeToMessageFunc1 implements Func1<Exchange, Message> { + private static ExchangeToMessageFunc1 instance = new ExchangeToMessageFunc1(); + + public static ExchangeToMessageFunc1 getInstance() { + return instance; + } + + @Override + public Message call(Exchange exchange) { + return exchange.getIn(); + } +} Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ExchangeToMessageFunc1.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java (added) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,67 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx.support; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.support.ServiceSupport; + +import rx.Observable; +import rx.subjects.Subject; +import rx.util.functions.Func1; + +/** + * A base class for implementing a {@link Processor} which provides access to an {@link Observable} + * so that the messages can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a> + */ +public abstract class ObservableProcessor<T> extends ServiceSupport implements Processor { + private final Subject<T> observable = Subject.create(); + private final ProcessorToObserver processor; + + protected ObservableProcessor(Func1<Exchange, T> func) { + this.processor = new ProcessorToObserver(func, observable); + } + + public void process(Exchange exchange) throws Exception { + processor.process(exchange); + } + + /** + * Returns the {@link Observable} for this {@link Processor} so that the messages that are received + * can be processed using the <a href="https://github.com/Netflix/RxJava/wiki">RX Java API</a> + */ + public Observable<T> getObservable() { + return observable; + } + + /** + * Provides the configuration hook so that derived classes can process the observable + * to use whatever RX methods they wish to process the incoming events + * @param observable + */ + protected abstract void configure(Observable<T> observable); + + protected void doStart() throws Exception { + configure(getObservable()); + } + + + protected void doStop() throws Exception { + observable.onCompleted(); + } +} Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObservableProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java (from r1453182, camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java?p2=camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java&p1=camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java&r1=1453182&r2=1453234&rev=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProducerObserver.java (original) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java Wed Mar 6 09:26:48 2013 @@ -26,12 +26,13 @@ import org.apache.camel.rx.RuntimeCamelR import rx.Observer; /** + * An {@link Observer} which sends events to a given {@link Endpoint} */ -public class ProducerObserver implements Observer { +public class ObserverSender implements Observer { private Endpoint endpoint; private Producer producer; - public ProducerObserver(Endpoint endpoint) throws Exception { + public ObserverSender(Endpoint endpoint) throws Exception { this.endpoint = endpoint; this.producer = endpoint.createProducer(); this.producer.start(); Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java (added) +++ camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,53 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx.support; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +import rx.Observer; +import rx.util.functions.Func1; + +/** + * A {@link Processor} which invokes an underling {@link Observer} as messages + * arrive using hte given function to convert the {@link Exchange} to the required + * object + */ +public class ProcessorToObserver<T> implements Processor { + private final Func1<Exchange, T> func; + private final Observer<T> observer; + + public ProcessorToObserver(Func1<Exchange, T> func, Observer<T> observer) { + this.func = func; + this.observer = observer; + } + + @Override + public void process(Exchange exchange) throws Exception { + Exception exception = null; + if (exchange.isFailed()) { + exception = exchange.getException(); + } + if (exception != null) { + observer.onError(exception); + } else { + T value = func.call(exchange); + observer.onNext(value); + } + } +} Propchange: camel/trunk/components/camel-rx/src/main/java/org/apache/camel/rx/support/ProcessorToObserver.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java (added) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,78 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import rx.Observable; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class ObservableBodyTest extends CamelTestSupport { + protected MyObservableBody observableBody = new MyObservableBody(); + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @Test + public void testUseObservableInRoute() throws Exception { + resultEndpoint.expectedBodiesReceived("Hello James", "Hello Claus"); + + template.sendBody("James"); + template.sendBody("Claus"); + + assertMockEndpointsSatisfied(); + } + + public class MyObservableBody extends ObservableBody<String> { + public MyObservableBody() { + super(String.class); + } + + protected void configure(Observable<String> observable) { + // lets process the messages using the RX API + observable.map(new Func1<String, String>() { + public String call(String body) { + return "Hello " + body; + } + }).subscribe(new Action1<String>() { + public void call(String body) { + template.sendBody(resultEndpoint, body); + } + }); + } + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").process(observableBody); + } + }; + } +} Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java?rev=1453234&view=auto ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java (added) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java Wed Mar 6 09:26:48 2013 @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Message; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import rx.Observable; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class ObservableMessageTest extends CamelTestSupport { + protected MyObservableMessage observableMessage = new MyObservableMessage(); + + @EndpointInject(uri = "mock:result") + protected MockEndpoint resultEndpoint; + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @Test + public void testUseObservableInRoute() throws Exception { + resultEndpoint.expectedBodiesReceived("Hello James", "Hello Claus"); + + template.sendBody("James"); + template.sendBody("Claus"); + + assertMockEndpointsSatisfied(); + } + + public class MyObservableMessage extends ObservableMessage { + protected void configure(Observable<Message> observable) { + // lets process the messages using the RX API + observable.map(new Func1<Message, String>() { + public String call(Message message) { + return "Hello " + message.getBody(String.class); + } + }).subscribe(new Action1<String>() { + public void call(String body) { + template.sendBody(resultEndpoint, body); + } + }); + } + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").process(observableMessage); + } + }; + } +} Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java (from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageMapTest.java (original) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java Wed Mar 6 09:26:48 2013 @@ -29,8 +29,8 @@ import rx.util.functions.Func1; /** */ -public class ObservableMessageMapTest extends RxTestSupport { - private static final transient Logger LOG = LoggerFactory.getLogger(ObservableMessageMapTest.class); +public class ToObservableAndMapTest extends RxTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(ToObservableAndMapTest.class); @Test public void testConsume() throws Exception { Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableAndMapTest.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java (from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableBodyTest.java (original) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java Wed Mar 6 09:26:48 2013 @@ -28,8 +28,8 @@ import rx.util.functions.Func1; /** */ -public class ObservableBodyTest extends RxTestSupport { - private static final transient Logger LOG = LoggerFactory.getLogger(ObservableBodyTest.class); +public class ToObservableBodyTest extends RxTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(ToObservableBodyTest.class); @Test public void testConsume() throws Exception { Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableBodyTest.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java (from r1453182, camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java?p2=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java&p1=camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java&r1=1453182&r2=1453234&rev=1453234&view=diff ============================================================================== --- camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ObservableMessageTest.java (original) +++ camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java Wed Mar 6 09:26:48 2013 @@ -28,8 +28,8 @@ import rx.util.functions.Action1; /** */ -public class ObservableMessageTest extends RxTestSupport { - private static final transient Logger LOG = LoggerFactory.getLogger(ObservableMessageTest.class); +public class ToObservableTest extends RxTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(ToObservableTest.class); @Test public void testConsume() throws Exception { Propchange: camel/trunk/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableTest.java ------------------------------------------------------------------------------ svn:eol-style = native