Repository: camel Updated Branches: refs/heads/master 33e7087fb -> d85f2f0c5
CAMEL-7833 Added CamelOperator for lift() Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/081e8a73 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/081e8a73 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/081e8a73 Branch: refs/heads/master Commit: 081e8a73195ce5776b3581b31ed15cbcadc68624 Parents: 33e7087 Author: Jyrki Ruuskanen <yur...@kotikone.fi> Authored: Wed Apr 1 17:54:26 2015 +0300 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Apr 2 15:48:03 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/camel/rx/CamelOperator.java | 79 ++++++++++++++++++++ 1 file changed, 79 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/081e8a73/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java new file mode 100644 index 0000000..c218776 --- /dev/null +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java @@ -0,0 +1,79 @@ +package org.apache.camel.rx; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.Producer; +import org.apache.camel.util.ServiceHelper; +import rx.Observable; +import rx.Subscriber; + +public class CamelOperator implements Observable.Operator<Message, Message> { + + private Producer producer; + + public CamelOperator(CamelContext context, String uri) throws Exception { + this(context.getEndpoint(uri)); + } + + public CamelOperator(Endpoint endpoint) throws Exception { + this.producer = endpoint.createProducer(); + ServiceHelper.startService(producer); + } + + @Override + public Subscriber<? super Message> call(final Subscriber<? super Message> s) { + return new Subscriber<Message>(s) { + @Override + public void onCompleted() { + try { + ServiceHelper.stopService(producer); + } catch (Exception e) { + throw new RuntimeCamelRxException(e); + } finally { + producer = null; + } + if (!s.isUnsubscribed()) { + s.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + Exchange exchange = producer.createExchange(); + exchange.setException(e); + process(exchange); + if (!s.isUnsubscribed()) { + s.onError(e); + } + } + + @Override + public void onNext(Message item) { + if (!s.isUnsubscribed()) { + s.onNext(process(item)); + } + } + }; + } + + private Exchange process(Exchange exchange) { + try { + producer.process(exchange); + if (exchange.hasOut()) { + exchange.setIn(exchange.getOut()); + exchange.setOut(null); + } + } catch (Exception e) { + throw new RuntimeCamelRxException(e); + } + return exchange; + } + + private Message process(Message message) { + Exchange exchange = producer.createExchange(); + exchange.setIn(message); + return process(exchange).getIn(); + } +}