Repository: camel
Updated Branches:
  refs/heads/master 33e7087fb -> d85f2f0c5


CAMEL-7833 Added CamelOperator for lift()


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/081e8a73
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/081e8a73
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/081e8a73

Branch: refs/heads/master
Commit: 081e8a73195ce5776b3581b31ed15cbcadc68624
Parents: 33e7087
Author: Jyrki Ruuskanen <yur...@kotikone.fi>
Authored: Wed Apr 1 17:54:26 2015 +0300
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Thu Apr 2 15:48:03 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/camel/rx/CamelOperator.java | 79 ++++++++++++++++++++
 1 file changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/081e8a73/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java 
b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
new file mode 100644
index 0000000..c218776
--- /dev/null
+++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java
@@ -0,0 +1,79 @@
+package org.apache.camel.rx;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Producer;
+import org.apache.camel.util.ServiceHelper;
+import rx.Observable;
+import rx.Subscriber;
+
+public class CamelOperator implements Observable.Operator<Message, Message> {
+
+    private Producer producer;
+
+    public CamelOperator(CamelContext context, String uri) throws Exception {
+        this(context.getEndpoint(uri));
+    }
+
+    public CamelOperator(Endpoint endpoint) throws Exception {
+        this.producer = endpoint.createProducer();
+        ServiceHelper.startService(producer);
+    }
+
+    @Override
+    public Subscriber<? super Message> call(final Subscriber<? super Message> 
s) {
+        return new Subscriber<Message>(s) {
+            @Override
+            public void onCompleted() {
+                try {
+                    ServiceHelper.stopService(producer);
+                } catch (Exception e) {
+                    throw new RuntimeCamelRxException(e);
+                } finally {
+                    producer = null;
+                }
+                if (!s.isUnsubscribed()) {
+                    s.onCompleted();
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                Exchange exchange = producer.createExchange();
+                exchange.setException(e);
+                process(exchange);
+                if (!s.isUnsubscribed()) {
+                    s.onError(e);
+                }
+            }
+
+            @Override
+            public void onNext(Message item) {
+                if (!s.isUnsubscribed()) {
+                    s.onNext(process(item));
+                }
+            }
+        };
+    }
+
+    private Exchange process(Exchange exchange) {
+        try {
+            producer.process(exchange);
+            if (exchange.hasOut()) {
+                exchange.setIn(exchange.getOut());
+                exchange.setOut(null);
+            }
+        } catch (Exception e) {
+            throw new RuntimeCamelRxException(e);
+        }
+        return exchange;
+    }
+
+    private Message process(Message message) {
+        Exchange exchange = producer.createExchange();
+        exchange.setIn(message);
+        return process(exchange).getIn();
+    }
+}

Reply via email to