This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit f05914da2c2ee99eb0929c32c786093e0db87558 Author: Nicola Ferraro <ni.ferr...@gmail.com> AuthorDate: Wed Nov 27 05:01:02 2019 +0100 CAMEL-14219: enforce type conversion on reactive-streams subscriber (#3362) --- .../streams/engine/DefaultCamelReactiveStreamsService.java | 4 ++-- .../component/reactive/streams/util/ConvertingSubscriber.java | 8 ++++++-- .../camel/component/reactor/engine/ReactorStreamsService.java | 4 ++-- .../camel/component/rxjava/engine/RxJavaStreamsService.java | 4 ++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java index 96ca0ed..19d50a6 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java @@ -139,7 +139,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement return (Subscriber<T>) streamSubscriber(name); } - return new ConvertingSubscriber<>(streamSubscriber(name), context); + return new ConvertingSubscriber<>(streamSubscriber(name), context, type); } @Override @@ -249,7 +249,7 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement @Override public <T> Subscriber<T> subscriber(String uri, Class<T> type) { - return new ConvertingSubscriber<>(subscriber(uri), context); + return new ConvertingSubscriber<>(subscriber(uri), context, type); } @Override diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java index 3e4b375..90e830a 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java @@ -29,14 +29,18 @@ import org.reactivestreams.Subscription; */ public class ConvertingSubscriber<R> implements Subscriber<R> { + private Class<R> type; + private Subscriber<Exchange> delegate; private CamelContext context; - public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context) { + public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context, Class<R> type) { Objects.requireNonNull(delegate, "delegate subscriber cannot be null"); + Objects.requireNonNull(type, "type cannot be null"); this.delegate = delegate; this.context = context; + this.type = type; } @Override @@ -55,7 +59,7 @@ public class ConvertingSubscriber<R> implements Subscriber<R> { } Exchange exchange = new DefaultExchange(context); - exchange.getIn().setBody(r); + exchange.getIn().setBody(r, type); delegate.onNext(exchange); } diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java index c1ec616..fee248d 100644 --- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java +++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java @@ -115,7 +115,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv return Subscriber.class.cast(subscriber); } - return new ConvertingSubscriber<>(subscriber, context); + return new ConvertingSubscriber<>(subscriber, context, type); } @Override @@ -190,7 +190,7 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv @Override public <T> Subscriber<T> subscriber(String uri, Class<T> type) { - return new ConvertingSubscriber<>(subscriber(uri), context); + return new ConvertingSubscriber<>(subscriber(uri), context, type); } @Override diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java index b367ec9..50e83a1 100644 --- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java +++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java @@ -115,7 +115,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive return Subscriber.class.cast(subscriber); } - return new ConvertingSubscriber<>(subscriber, context); + return new ConvertingSubscriber<>(subscriber, context, type); } @Override @@ -185,7 +185,7 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive @Override public <T> Subscriber<T> subscriber(String uri, Class<T> type) { - return new ConvertingSubscriber<>(subscriber(uri), context); + return new ConvertingSubscriber<>(subscriber(uri), context, type); } @Override