Camel RX Asciidoc documentation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3bfefb4f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3bfefb4f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3bfefb4f Branch: refs/heads/master Commit: 3bfefb4ffa2170d84b1c5457ccb6000058139c34 Parents: ce42784 Author: Antonin Stefanutti <[email protected]> Authored: Wed Jan 27 11:10:26 2016 +0100 Committer: Andrea Cosentino <[email protected]> Committed: Wed Jan 27 13:03:33 2016 +0100 ---------------------------------------------------------------------- components/camel-rx/src/main/docs/rx.adoc | 192 +++++++++++++++++++++++++ docs/user-manual/en/SUMMARY.md | 2 +- 2 files changed, 193 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3bfefb4f/components/camel-rx/src/main/docs/rx.adoc ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/docs/rx.adoc b/components/camel-rx/src/main/docs/rx.adoc new file mode 100644 index 0000000..6c37663 --- /dev/null +++ b/components/camel-rx/src/main/docs/rx.adoc @@ -0,0 +1,192 @@ +[[ConfluenceContent]] +[[RX-CamelRX]] +Camel RX +-------- + +*Available as of Camel 2.11* + +The camel-rx library provides Camel support for the +https://rx.codeplex.com/[Reactive Extensions]Â (RX) using the +https://github.com/Netflix/RxJava/wiki[RxJava] library so that: + +* Camel users can use the +http://netflix.github.com/RxJava/javadoc/[RxJava API] for processing +messages on endpoints using a typesafe composable API +* https://github.com/Netflix/RxJava/wiki[RxJava] users get to use all of +the link:components.html[Camel transports and protocols] from within the +http://netflix.github.com/RxJava/javadoc/[RxJava API] + +[[RX-BackgroundonRX]] +Background on RX +~~~~~~~~~~~~~~~~ + +For a more in depth background on RX check out +http://reactivex.io/documentation/observable.html[the RxJava wiki on +Observable and the Reactive pattern] or the +https://rx.codeplex.com/[Microsoft RX documentation]. + +You can think of RX as providing an API similar to Java 8 / Groovy / +Scala collections (methods like filter, forEach, map, reduce, zip etc) - +but which operates on an asynchronous stream of events rather than a +collection. So you could think of RX as like working with asynchronous +push based collections (rather than the traditional synchronous pull +based collections). + +In RX you work with an +http://netflix.github.com/RxJava/javadoc/rx/Observable.html[Observable<T>] +which behaves quite like a Collection<T> in Java 8 so you can +filter/map/concat and so forth. The Observable<T> then acts as a +typesafe composable API for working with asynchronous events in a +collection-like way. + +Once you have an +http://netflix.github.com/RxJava/javadoc/rx/Observable.html[Observable<T>] +you can then + +* https://github.com/Netflix/RxJava/wiki/Filtering-Operators[filter +events] +* https://github.com/Netflix/RxJava/wiki/Transformative-Operators[transform +events] +* https://github.com/Netflix/RxJava/wiki/Combinatorial-Operators[combine +event streams] +* https://github.com/Netflix/RxJava/wiki/Utility-Operators[other utility +methods] + +[[RX-ObservingeventsonCamelendpoints]] +Observing events on Camel endpoints +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You can create an Observable<Message> from any endpoint using the +ReactiveCamel helper class and the *toObservable()* method. + +[source,java] +---- +import org.apache.camel.rx.*; + +ReactiveCamel rx = new ReactiveCamel(camelContext); +Observable<Message> observable = rx.toObservable("activemq:MyMessages"); + +// we can now call filter/map/concat etc +filtered = observable.filter(m -> m.getHeader("foo") != null).map(m -> "Hello " + m.getBody()); +---- + +If you know the type of the message payload (its body), you can use an +overloaded version of toObservable() to pass in the class and get a +typesafe Observable<T> back: + +[source,java] +---- +import org.apache.camel.rx.*; + +ReactiveCamel rx = new ReactiveCamel(camelContext); +Observable<Order> observable = rx.toObservable("seda:orders", Order.class); + +// now lets filter and map using Java 7 +Observable<String> largeOrderIds = observable.filter(new Func1<Order, Boolean>() { + public Boolean call(Order order) { + return order.getAmount() > 100.0; + } +}).map(new Func1<Order, String>() { + public String call(Order order) { + return order.getId(); + } +}); +---- + +[[RX-SendingObservableEventstoCamelendpoints]] +Sending Observable<T> events to Camel endpoints +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you have an +http://netflix.github.com/RxJava/javadoc/rx/Observable.html[Observable<T>] +from some other library; or have created one from a +http://netflix.github.com/RxJava/javadoc/rx/Observable.html#toObservable(java.util.concurrent.Future)[Future<T> +using RxJava] and you wish to send the events on the observable to a +Camel endpoint you can use the *sendTo()* method on ReactiveCamel: + +[source,java] +---- +import org.apache.camel.rx.*; + +// take some observable from somewhere +Observable<T> observable = ...; +ReactiveCamel rx = new ReactiveCamel(camelContext); + +// lets send the events to a message queue +rx.sendTo(observable, "activemq:MyQueue"); +---- + +[[RX-EmbeddingsomeRxJavaprocessinginsideaCamelroute]] +Embedding some RxJava processing inside a Camel route +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sometimes you may wish to use a Camel route to consume messages, perform +content based routing, transformation, deal with data format marshalling +and so forth and then within the route invoke some typesafe RxJava event +processing. + +One approach is to just send messages from inside the Camel route to an +endpoint; then use the *toObservable()* method to bind the endpoint to +an Observable<T>. + +However if you prefer to embed the RxJava processing of messages inside +your route there are 2 helper classes which can be used to wrap up the +RxJava processing as a Camel Processor that can be easily embed into a +Camel route. + +You can use the *ObservableMessage* or *ObservableBody* classes which +both have an abstract *configure()* method like RouteBuilder. In the +configure method you can then process the Observable<T> for the Camel +Message or the message body. + +e.g. + +[source,java] +---- + public class MyObservableBody extends ObservableBody<String> { + public MyObservableBody() { + super(String.class); + } + + protected void configure(Observable<String> observable) { + // lets process the messages using the RX API + observable.map(new Func1<String, String>() { + public String call(String body) { + return "Hello " + body; + } + }).subscribe(new Action1<String>() { + public void call(String body) { + template.sendBody(resultEndpoint, body); + } + }); + } + } + ... + // now lets use this inside a route... + from("seda:foo").process(new MyObservableBody()); +---- + +Another approach, if you are consuming directly from Camel using the +link:bean-integration.html[Bean Integration] is to just use the RxJava +Subject directly: + +[source,java] +---- +import rx.subjects.Subject; + +public class MyThing { + private final Subject<String> observable = Subject.create(); + + public MyThing() { + // now process the observable somehow.... + } + + @Consume(uri="activemq:myqueue") + public void onMessageBody(String body) { + subject.onNext(body); + } +} +---- + +Though using the *toObservable* on *ReactiveCamel* is maybe a little +simpler. http://git-wip-us.apache.org/repos/asf/camel/blob/3bfefb4f/docs/user-manual/en/SUMMARY.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index cd69358..820ef82 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -9,6 +9,7 @@ * [Exchange](exchange.adoc) * [Exchange Pattern](exchange-pattern.adoc) * [Registry](registry.adoc) + * [RX](rx.adoc) <!-- * [Dozer Type Conversion](dozer-type-conversion.adoc) @@ -52,7 +53,6 @@ * [Security](.adoc) * [UuidGenerator](.adoc) * [Binding](.adoc) - * [RX](.adoc) * [CEP](.adoc) * [BacklogTracer](.adoc) * [Endpoint Annotations](.adoc)
