[ 
https://issues.apache.org/jira/browse/CAMEL-10612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15769508#comment-15769508
 ] 

Nicola Ferraro edited comment on CAMEL-10612 at 12/22/16 8:54 AM:
------------------------------------------------------------------

Yeah, I can work on it. I'm evaluating some options: 

1. We can upgrade rx-java to v.2 and expose the new rx-java api plus the 
reactive streams api through the camel-rx component. In this case the 
implementation of Publisher and Subscriber will be backed by rx Observable and 
Observer.

2. We can create a module that is independent from rx and uses a simple 
implementation of the specs. I thought it was difficult to follow this path but 
a Camel Producer is more or less a reactive streams Publisher, while a Camel 
Consumer is equivalent to a Subscriber. I created a basic Publisher from a 
Camel Producer, adding a pool of worker threads for pushing data downstream, 
and it was easy to pass the reactive streams TCK for active publishers. So this 
can be done easily.

What I don't like about the current camel-rx module is that it is non-standard. 
It adds support for rx but it does not contain a component.

The suggested way of using it is like:
{code}
ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");
{code}

The way a component should work (my opinion) is:
{code}
// builder
from("activemq:MyMessages")
.transform().xxx().yyy()
.to("reactive-streams:incoming");

// Rx-java, Akka streams, ... 
Publisher pub = CamelStreams.get(context).getPublisher("incoming");
// subscribe and do anything with the publisher
{code}

I know you probably can use this latter pattern with the current camel-rx impl 
by creating an Observable from a "direct:xxx" endpoint, but it's a bit odd.

Maybe I'm missing some pieces. Thoughts?



was (Author: nferraro):
Yeah, I can work on it. I'm evaluating some options: 

1. We can upgrade rx-java to v.2 and expose the new rx-java api plus the 
reactive streams api through the camel-rx component. In this case the 
implementation of Publisher and Subscriber will be backed by rx Observable and 
Observer.

2. We can create a module that is independent from rx and uses a simple 
implementation of the specs. I thought it was difficult to follow this path but 
a Camel Producer is more or less a reactive streams Publisher, while a Camel 
Consumer is equivalent to a Subscriber. I created a basic Publisher from a 
Camel Consumer, adding a pool of worker threads for pushing data downstream, 
and it was easy to pass the reactive streams TCK for active publishers. So this 
can be done easily.

What I don't like about the current camel-rx module is that it is non-standard. 
It adds support for rx but it does not contain a component.

The suggested way of using it is like:
{code}
ReactiveCamel rx = new ReactiveCamel(camelContext);
Observable<Message> observable = rx.toObservable("activemq:MyMessages");
{code}

The way a component should work (my opinion) is:
{code}
// builder
from("activemq:MyMessages")
.transform().xxx().yyy()
.to("reactive-streams:incoming");

// Rx-java, Akka streams, ... 
Publisher pub = CamelStreams.get(context).getPublisher("incoming");
// subscribe and do anything with the publisher
{code}

I know you probably can use this latter pattern with the current camel-rx impl 
by creating an Observable from a "direct:xxx" endpoint, but it's a bit odd.

Maybe I'm missing some pieces. Thoughts?


> camel-reactive-streams - New component
> --------------------------------------
>
>                 Key: CAMEL-10612
>                 URL: https://issues.apache.org/jira/browse/CAMEL-10612
>             Project: Camel
>          Issue Type: New Feature
>            Reporter: Nicola Ferraro
>            Assignee: Nicola Ferraro
>
> We have a component for rx-java v1. rx-java v2 now is out and compatible with 
> the reactive-stream specs, like many other reactive frameworks (including 
> vert.x). We may want to implement a new component to integrate with any 
> framework that comply with the reactive-streams specs (Akka-Streams and Slick 
> among the others).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to