[ https://issues.apache.org/jira/browse/CAMEL-10612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831536#comment-15831536 ]
ASF GitHub Bot commented on CAMEL-10612: ---------------------------------------- GitHub user nicolaferraro opened a pull request: https://github.com/apache/camel/pull/1412 CAMEL-10612: camel-reactive-streams I publish a preview of the reactive-stream component, asking for a review. The purpose of the component is to allow Camel to exchange messages with any asynchronous stream processing system compatible with the reactive-streams specs (rx-java, akka-streams, vertx, ..., the list will grow). I've included in the component the reactive-streams TCK to ensure compliance with the specs. All streams produced by Camel are open streams (otherwise called hot streams). ## basic usage From Camel to an external stream engine: ``` from("timer:tick") .setBody().header(Exchange.TIMER_COUNTER) .to("reactive-streams:integers"); Publisher<Integer> integers = CamelReactiveStreams.get(context).getPublisher("integers", Integer.class) // use it eg. in rxjava2 Observable.fromPublisher(integers) .map(...) .flatMap(...) .subscribe(); ``` From an external engine to Camel: ``` from("reactive-streams:numbers") .to("log:INFO"); Subscriber<Integer> numbers = CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class); // use it eg. in rxjava2 Flowable.range(0, 40) .subscribe(numbers); // a stream with numbers from 0 to 39 ``` Converting a Consumer into a Subscriber and a Producer into a Publisher has been straightforward. The difficult part has been managing backpressure correctly. I left the door open for different implementations of the engine, but it may be not so easy to implement them because of the way backpressure should interact with Camel mechanisms. Backpressure is a fundamental part of the specs and establishes mechanisms by which a Publisher can deal with slow Subscribers. Whithin a suscription, subscribers give constant feedbacks to the publishers indicating the number of items they are willing to receive (almost like TCP flow control window). Camel (afaik) has not direct support for this kind of flow control, but has similar concepts, like throttling and route policy. So here's how implemented backpressure. ## backpressure in producer (from Camel route to an external subscriber) Backpressure is handled by an internal buffer that caches exchanges before delivering them to the subscribers. It is important to avoid a buffer overflow. Eg. a route like `from("jms:xx").to("reactive-streams:pub")` can cause easily a out-of-memory error if the queue contains a lot of messages (eg. after a shutdown). To avoid this kind of problems, both throttling and ThrottlingInflightRoutePolicy can be used. Throttling (`from("..").throttle()..`) just delays messages, so it cannot deal with idle/too-slow subscribers. A `ThrottlingInflightRoutePolicy` is the preferred way to deal with slow subscribers. Exchanges are considered completed only when they are received by all subscribers, so users can set a maximum number of inflight exchanges in the policy then route suspension/resume will be controlled by backpressure: a slow subscriber will cause periodic route suspensions. A snippet from the Junit test: ``` ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); policy.setMaxInflightExchanges(10); policy.setScope(ThrottlingInflightRoutePolicy.ThrottlingScope.Route); from("timer:tick?period=50") .routePolicy(policy) .to("reactive-streams:pub"); ``` When a subscriber of the `pub` stream becomes idle, about 10 messages are accumulated in the internal buffer, then the route is suspended. When the subscriber starts processing the messages again, the route is resumed. It works also with multiple subscribers (in practice, the slowest one controls the suspension/resume). The `ThrottlingInflightRoutePolicy` should (must) be used eg. when consuming from JMS. In some circumstances (eg. http consumer), suspending the route is not the best approach, so it's better handling backpressure by just buffering (the default approach). ## backpressure in consumer (from an external publisher to a Camel route) When consuming items from a reactive-streams publisher, the maximum number of inflight exchanges can be set as endpoint option. The subscriber associated with the consumer interact with the publisher to keep the number of messages in the route lower than the threshold. The number of concurrent consumers can also be set as endpoint option. By setting 1 consumer (the default) exchanges are processed by a single thread, so the order of items in the stream is maintained. This value can be increased and the items will be processed concurrently (so not preserving the order). A snippet from the tests: ``` from("reactive-streams:numbers?concurrentConsumers=5&maxInflightExchanges=10") .to("mock:endpoint"); ``` ## TODOS - write proper documentation - move some global options to the producer endpoint (need to find a way to do it properly) - add the possibility to handle "onError" and "onComplete" events in the Camel route - improve context shutdown by closing correctly all streams - test with other frameworks (currently using rxjava2, reactive-streams TCK and custom pub/sub) - check osgi compliance - add the ReactiveStreamService to the spring-boot application context in the starter - maybe add direct usage of Camel endpoints directly - improve it after feedbacks You can merge this pull request into a Git repository by running: $ git pull https://github.com/nicolaferraro/camel CAMEL-10612 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/camel/pull/1412.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1412 ---- commit 1142a4139f6f5930cfa63339ac61fa1f80c5a757 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2016-12-21T17:59:01Z CAMEL-10612: Reactive streams producer commit afc4eee98fcca1a6705fd58901262be453e5736d Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2016-12-27T17:29:22Z CAMEL-10612: First complete implementation commit fd88fe0b97c232161d3d3df7203bcc26d27eb517 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-03T11:46:39Z CAMEL-10612: Implemented subscriber verification and rx implementation commit 91cd0fc981b9616980a9a5befec41b17b487d447 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-03T16:09:05Z CAMEL-10612: Support switching to a different implementation commit f99e7b40f4dbe9e49b0d4208db01594f32d21a26 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-03T17:16:02Z CAMEL-10612: Added internal engine configuration commit 46cf793200ffcde69c9d0dc041020266e8063d9f Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-04T18:59:58Z CAMEL-10612: Make sure backpressure can be combined with throttling route policy commit 807a5bd7b44142ed8f58e19b615efc18ae4fa7cd Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-12T17:36:21Z CAMEL-10612: more consistent structure commit 861340af92916c58852aa75cdb1281fcd4545779 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-13T11:53:55Z CAMEL-10612: fixes and test updates commit 6c45fc46032625cd80dd3852c1257d0d88f8d97a Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-13T12:20:52Z CAMEL-10612: enhanced publisher backpressure with mixed slow and fast subscribers commit 7a6b987500374b268af7cec22ebfdd1b5b2a18e9 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-15T09:32:04Z CAMEL-10612: fixed issue with route restart commit 5f6c24e5d4dfca4be2f0e143de12b4482e9c21d6 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-15T09:40:27Z CAMEL-10612: fixed stream completion commit bd7d4d7f84e408cdd96af785f20933dc1d89ab18 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-19T17:51:56Z CAMEL-10650: added backpressure strategy commit 8124a17be32dcdf90174fdd4d258edc95e40b04f Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-20T00:03:17Z CAMEL-10650: added javadoc commit 9f6ae4fed93c5d11addba88402e340a18caaa3f7 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-20T09:57:38Z CAMEL-10650: rebase with master commit 4f9e9a8208f3a2295b29e442ed9f52185d943a14 Author: Nicola Ferraro <ni.ferr...@gmail.com> Date: 2017-01-20T10:21:22Z CAMEL-10650: source check ---- > camel-reactive-streams - New component > -------------------------------------- > > Key: CAMEL-10612 > URL: https://issues.apache.org/jira/browse/CAMEL-10612 > Project: Camel > Issue Type: New Feature > Reporter: Nicola Ferraro > Assignee: Nicola Ferraro > > We have a component for rx-java v1. rx-java v2 now is out and compatible with > the reactive-stream specs, like many other reactive frameworks (including > vert.x). We may want to implement a new component to integrate with any > framework that comply with the reactive-streams specs (Akka-Streams and Slick > among the others). -- This message was sent by Atlassian JIRA (v6.3.4#6332)