CAMEL-7833 Use ProducerTemplate to send message and pass the exception to the subscriber
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d85f2f0c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d85f2f0c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d85f2f0c Branch: refs/heads/master Commit: d85f2f0c57021066ec0d674b9c6b4b8be2ac3344 Parents: 0d9928a Author: Willem Jiang <willem.ji...@gmail.com> Authored: Thu Apr 2 15:47:48 2015 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Thu Apr 2 15:48:04 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/camel/rx/CamelOperator.java | 48 ++++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d85f2f0c/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java index f965388..2a6fa3a 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java @@ -21,21 +21,26 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Producer; +import org.apache.camel.ProducerTemplate; import org.apache.camel.util.ServiceHelper; import rx.Observable; import rx.Subscriber; public class CamelOperator implements Observable.Operator<Message, Message> { - private Producer producer; + private ProducerTemplate producerTemplate; + private Endpoint endpoint; public CamelOperator(CamelContext context, String uri) throws Exception { - this(context.getEndpoint(uri)); + producerTemplate = context.createProducerTemplate(); + endpoint = context.getEndpoint(uri); + ServiceHelper.startService(producerTemplate); } public CamelOperator(Endpoint endpoint) throws Exception { - this.producer = endpoint.createProducer(); - ServiceHelper.startService(producer); + this.producerTemplate = endpoint.getCamelContext().createProducerTemplate(); + this.endpoint = endpoint; + ServiceHelper.startService(producerTemplate); } @Override @@ -44,11 +49,11 @@ public class CamelOperator implements Observable.Operator<Message, Message> { @Override public void onCompleted() { try { - ServiceHelper.stopService(producer); + ServiceHelper.stopService(producerTemplate); } catch (Exception e) { throw new RuntimeCamelRxException(e); } finally { - producer = null; + producerTemplate = null; } if (!s.isUnsubscribed()) { s.onCompleted(); @@ -57,9 +62,8 @@ public class CamelOperator implements Observable.Operator<Message, Message> { @Override public void onError(Throwable e) { - Exchange exchange = producer.createExchange(); - exchange.setException(e); - process(exchange); + // producer cannot handler the exception + // so we just pass the exchange to the subscriber if (!s.isUnsubscribed()) { s.onError(e); } @@ -68,7 +72,17 @@ public class CamelOperator implements Observable.Operator<Message, Message> { @Override public void onNext(Message item) { if (!s.isUnsubscribed()) { - s.onNext(process(item)); + Exchange exchange = process(item); + if (exchange.getException() != null) { + s.onError(exchange.getException()); + } else { + if (exchange.hasOut()) { + s.onNext(exchange.getOut()); + } else { + s.onNext(exchange.getIn()); + } + } + } } }; @@ -76,20 +90,16 @@ public class CamelOperator implements Observable.Operator<Message, Message> { private Exchange process(Exchange exchange) { try { - producer.process(exchange); - if (exchange.hasOut()) { - exchange.setIn(exchange.getOut()); - exchange.setOut(null); - } + exchange = producerTemplate.send(endpoint, exchange); } catch (Exception e) { - throw new RuntimeCamelRxException(e); + exchange.setException(e); } return exchange; } - private Message process(Message message) { - Exchange exchange = producer.createExchange(); + private Exchange process(Message message) { + Exchange exchange = endpoint.createExchange(); exchange.setIn(message); - return process(exchange).getIn(); + return process(exchange); } }