[ 
https://issues.apache.org/jira/browse/CAMEL-10612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831536#comment-15831536
 ] 

ASF GitHub Bot commented on CAMEL-10612:
----------------------------------------

GitHub user nicolaferraro opened a pull request:

    https://github.com/apache/camel/pull/1412

    CAMEL-10612: camel-reactive-streams

    
    I publish a preview of the reactive-stream component, asking for a review.
    
    The purpose of the component is to allow Camel to exchange messages with 
any 
    asynchronous stream processing system compatible with the reactive-streams 
specs 
    (rx-java, akka-streams, vertx, ..., the list will grow).
    
    I've included in the component the reactive-streams TCK to ensure 
compliance with the specs.
    All streams produced by Camel are open streams (otherwise called hot 
streams).
    
    ## basic usage
    From Camel to an external stream engine:
    ```
    from("timer:tick")
    .setBody().header(Exchange.TIMER_COUNTER)
    .to("reactive-streams:integers");
    
    Publisher<Integer> integers = 
CamelReactiveStreams.get(context).getPublisher("integers", Integer.class)
    // use it eg. in rxjava2
    Observable.fromPublisher(integers)
        .map(...)
        .flatMap(...)
        .subscribe();
    ```
    
    From an external engine to Camel:
    ```
    from("reactive-streams:numbers")
    .to("log:INFO");
    
    Subscriber<Integer> numbers = 
CamelReactiveStreams.get(context).getSubscriber("numbers", Integer.class);
    // use it eg. in rxjava2
    Flowable.range(0, 40)
        .subscribe(numbers); // a stream with numbers from 0 to 39
    ```
    
    Converting a Consumer into a Subscriber and a Producer into a Publisher has 
been straightforward.
    The difficult part has been managing backpressure correctly. I left the 
door open for different implementations of the engine, but it may be not so 
easy to implement them because of the way backpressure should interact with 
Camel mechanisms.
    
    Backpressure is a fundamental part of the specs and establishes mechanisms 
by which a Publisher
    can deal with slow Subscribers. Whithin a suscription, subscribers give 
constant feedbacks to the publishers indicating the number of items they are 
willing to receive (almost like TCP flow control window).
    
    Camel (afaik) has not direct support for this kind of flow control, but has 
similar concepts, like throttling and route policy.
    So here's how implemented backpressure.
    
    ## backpressure in producer
    (from Camel route to an external subscriber)
    
    Backpressure is handled by an internal buffer that caches exchanges before 
delivering them to the subscribers.
    It is important to avoid a buffer overflow. Eg. a route like 
`from("jms:xx").to("reactive-streams:pub")` can cause easily a out-of-memory 
error if the queue contains a lot of messages (eg. after a shutdown).
    
    To avoid this kind of problems, both throttling and 
ThrottlingInflightRoutePolicy can be used.
    Throttling (`from("..").throttle()..`) just delays messages, so it cannot 
deal with idle/too-slow subscribers.
    A `ThrottlingInflightRoutePolicy` is the preferred way to deal with slow 
subscribers. Exchanges are considered completed only when they are received by 
all subscribers, so users can set a maximum number of inflight exchanges in the 
policy then route suspension/resume will be controlled by backpressure: a slow 
subscriber will cause periodic route suspensions.
    
    A snippet from the Junit test:
    ```
    ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy();
    policy.setMaxInflightExchanges(10);
    policy.setScope(ThrottlingInflightRoutePolicy.ThrottlingScope.Route);
    
    from("timer:tick?period=50")
    .routePolicy(policy)
    .to("reactive-streams:pub");
    ```
    
    When a subscriber of the `pub` stream becomes idle, about 10 messages are 
accumulated in the internal buffer,
    then the route is suspended. When the subscriber starts processing the 
messages again, the route is resumed.
    It works also with multiple subscribers (in practice, the slowest one 
controls the suspension/resume).
    
    The `ThrottlingInflightRoutePolicy` should (must) be used eg. when 
consuming from JMS.
    In some circumstances (eg. http consumer), suspending the route is not the 
best approach, so it's better handling backpressure by just buffering (the 
default approach).
    
    ## backpressure in consumer
    (from an external publisher to a Camel route)
    When consuming items from a reactive-streams publisher, the maximum number 
of inflight exchanges can be set as endpoint option.
    The subscriber associated with the consumer interact with the publisher to 
keep the number of messages in the route lower than the threshold.
    
    The number of concurrent consumers can also be set as endpoint option. By 
setting 1 consumer (the default) exchanges are processed by a single thread, so
    the order of items in the stream is maintained. This value can be increased 
and the items will be processed concurrently (so not preserving the order).
    
    A snippet from the tests:
    ```
    
from("reactive-streams:numbers?concurrentConsumers=5&maxInflightExchanges=10")
    .to("mock:endpoint");
    ```
    
    ## TODOS
    - write proper documentation
    - move some global options to the producer endpoint (need to find a way to 
do it properly) 
    - add the possibility to handle "onError" and "onComplete" events in the 
Camel route
    - improve context shutdown by closing correctly all streams
    - test with other frameworks (currently using rxjava2, reactive-streams TCK 
and custom pub/sub)
    - check osgi compliance
    - add the ReactiveStreamService  to the spring-boot application context in 
the starter
    - maybe add direct usage of Camel endpoints directly
    - improve it after feedbacks
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nicolaferraro/camel CAMEL-10612

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/camel/pull/1412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1412
    
----
commit 1142a4139f6f5930cfa63339ac61fa1f80c5a757
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2016-12-21T17:59:01Z

    CAMEL-10612: Reactive streams producer

commit afc4eee98fcca1a6705fd58901262be453e5736d
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2016-12-27T17:29:22Z

    CAMEL-10612: First complete implementation

commit fd88fe0b97c232161d3d3df7203bcc26d27eb517
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-03T11:46:39Z

    CAMEL-10612: Implemented subscriber verification and rx implementation

commit 91cd0fc981b9616980a9a5befec41b17b487d447
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-03T16:09:05Z

    CAMEL-10612: Support switching to a different implementation

commit f99e7b40f4dbe9e49b0d4208db01594f32d21a26
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-03T17:16:02Z

    CAMEL-10612: Added internal engine configuration

commit 46cf793200ffcde69c9d0dc041020266e8063d9f
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-04T18:59:58Z

    CAMEL-10612: Make sure backpressure can be combined with throttling route 
policy

commit 807a5bd7b44142ed8f58e19b615efc18ae4fa7cd
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-12T17:36:21Z

    CAMEL-10612: more consistent structure

commit 861340af92916c58852aa75cdb1281fcd4545779
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-13T11:53:55Z

    CAMEL-10612: fixes and test updates

commit 6c45fc46032625cd80dd3852c1257d0d88f8d97a
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-13T12:20:52Z

    CAMEL-10612: enhanced publisher backpressure with mixed slow and fast 
subscribers

commit 7a6b987500374b268af7cec22ebfdd1b5b2a18e9
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-15T09:32:04Z

    CAMEL-10612: fixed issue with route restart

commit 5f6c24e5d4dfca4be2f0e143de12b4482e9c21d6
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-15T09:40:27Z

    CAMEL-10612: fixed stream completion

commit bd7d4d7f84e408cdd96af785f20933dc1d89ab18
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-19T17:51:56Z

    CAMEL-10650: added backpressure strategy

commit 8124a17be32dcdf90174fdd4d258edc95e40b04f
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-20T00:03:17Z

    CAMEL-10650: added javadoc

commit 9f6ae4fed93c5d11addba88402e340a18caaa3f7
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-20T09:57:38Z

    CAMEL-10650: rebase with master

commit 4f9e9a8208f3a2295b29e442ed9f52185d943a14
Author: Nicola Ferraro <ni.ferr...@gmail.com>
Date:   2017-01-20T10:21:22Z

    CAMEL-10650: source check

----


> camel-reactive-streams - New component
> --------------------------------------
>
>                 Key: CAMEL-10612
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10612
>             Project: Camel
>          Issue Type: New Feature
>            Reporter: Nicola Ferraro
>            Assignee: Nicola Ferraro
>
> We have a component for rx-java v1. rx-java v2 now is out and compatible with 
> the reactive-stream specs, like many other reactive frameworks (including 
> vert.x). We may want to implement a new component to integrate with any 
> framework that comply with the reactive-streams specs (Akka-Streams and Slick 
> among the others).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to