[ https://issues.apache.org/jira/browse/CAMEL-10612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15769508#comment-15769508 ]
Nicola Ferraro edited comment on CAMEL-10612 at 12/22/16 8:54 AM: ------------------------------------------------------------------ Yeah, I can work on it. I'm evaluating some options: 1. We can upgrade rx-java to v.2 and expose the new rx-java api plus the reactive streams api through the camel-rx component. In this case the implementation of Publisher and Subscriber will be backed by rx Observable and Observer. 2. We can create a module that is independent from rx and uses a simple implementation of the specs. I thought it was difficult to follow this path but a Camel Producer is more or less a reactive streams Publisher, while a Camel Consumer is equivalent to a Subscriber. I created a basic Publisher from a Camel Producer, adding a pool of worker threads for pushing data downstream, and it was easy to pass the reactive streams TCK for active publishers. So this can be done easily. What I don't like about the current camel-rx module is that it is non-standard. It adds support for rx but it does not contain a component. The suggested way of using it is like: {code} ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Message> observable = rx.toObservable("activemq:MyMessages"); {code} The way a component should work (my opinion) is: {code} // builder from("activemq:MyMessages") .transform().xxx().yyy() .to("reactive-streams:incoming"); // Rx-java, Akka streams, ... Publisher pub = CamelStreams.get(context).getPublisher("incoming"); // subscribe and do anything with the publisher {code} I know you probably can use this latter pattern with the current camel-rx impl by creating an Observable from a "direct:xxx" endpoint, but it's a bit odd. Maybe I'm missing some pieces. Thoughts? was (Author: nferraro): Yeah, I can work on it. I'm evaluating some options: 1. We can upgrade rx-java to v.2 and expose the new rx-java api plus the reactive streams api through the camel-rx component. In this case the implementation of Publisher and Subscriber will be backed by rx Observable and Observer. 2. We can create a module that is independent from rx and uses a simple implementation of the specs. I thought it was difficult to follow this path but a Camel Producer is more or less a reactive streams Publisher, while a Camel Consumer is equivalent to a Subscriber. I created a basic Publisher from a Camel Consumer, adding a pool of worker threads for pushing data downstream, and it was easy to pass the reactive streams TCK for active publishers. So this can be done easily. What I don't like about the current camel-rx module is that it is non-standard. It adds support for rx but it does not contain a component. The suggested way of using it is like: {code} ReactiveCamel rx = new ReactiveCamel(camelContext); Observable<Message> observable = rx.toObservable("activemq:MyMessages"); {code} The way a component should work (my opinion) is: {code} // builder from("activemq:MyMessages") .transform().xxx().yyy() .to("reactive-streams:incoming"); // Rx-java, Akka streams, ... Publisher pub = CamelStreams.get(context).getPublisher("incoming"); // subscribe and do anything with the publisher {code} I know you probably can use this latter pattern with the current camel-rx impl by creating an Observable from a "direct:xxx" endpoint, but it's a bit odd. Maybe I'm missing some pieces. Thoughts? > camel-reactive-streams - New component > -------------------------------------- > > Key: CAMEL-10612 > URL: https://issues.apache.org/jira/browse/CAMEL-10612 > Project: Camel > Issue Type: New Feature > Reporter: Nicola Ferraro > Assignee: Nicola Ferraro > > We have a component for rx-java v1. rx-java v2 now is out and compatible with > the reactive-stream specs, like many other reactive frameworks (including > vert.x). We may want to implement a new component to integrate with any > framework that comply with the reactive-streams specs (Akka-Streams and Slick > among the others). -- This message was sent by Atlassian JIRA (v6.3.4#6332)