This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new b8d6fae camel-reactive-stream: cleanup b8d6fae is described below commit b8d6faeb09ffe6ba9f5c02d930833bfecc89bc0a Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Fri Jul 10 17:56:50 2020 +0200 camel-reactive-stream: cleanup --- .../reactive/streams/ReactiveStreamsCamelSubscriber.java | 4 ++-- .../component/reactive/streams/ReactiveStreamsHelper.java | 1 - .../reactive/streams/api/CamelReactiveStreamsService.java | 3 ++- .../component/reactive/streams/engine/CamelPublisher.java | 8 ++++---- .../reactive/streams/engine/CamelSubscription.java | 12 ++++++------ .../streams/engine/DefaultCamelReactiveStreamsService.java | 5 +++++ .../reactive/streams/engine/DelayedMonoPublisher.java | 12 ++++++------ .../reactive/streams/engine/UnwrappingPublisher.java | 2 +- .../component/reactive/streams/util/BodyConverter.java | 11 ++--------- .../reactive/streams/util/ConvertingPublisher.java | 6 +++--- .../reactive/streams/util/ConvertingSubscriber.java | 6 +++--- .../component/reactive/streams/util/MonoPublisher.java | 2 +- .../streams/support/ReactiveStreamsTestService.java | 6 ++++++ .../component/reactor/engine/ReactorStreamsService.java | 5 +++++ .../engine/ReactorStreamsServiceBackpressureTest.java | 4 ++-- .../reactor/engine/ReactorStreamsServiceTest.java | 14 +++++++------- .../component/rxjava/engine/RxJavaCamelProcessor.java | 2 +- .../component/rxjava/engine/RxJavaStreamsService.java | 5 +++++ .../component/rxjava/engine/RxJavaStreamsServiceTest.java | 10 ++++++---- 19 files changed, 67 insertions(+), 51 deletions(-) diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java index 0d0ffe7..875df93 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsCamelSubscriber.java @@ -37,12 +37,12 @@ public class ReactiveStreamsCamelSubscriber implements Subscriber<Exchange>, Clo */ private static final long UNBOUNDED_REQUESTS = Long.MAX_VALUE; + private final String name; + private ReactiveStreamsConsumer consumer; private Subscription subscription; - private String name; - private long requested; private long inflightCount; diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java index a380934..a9752c4 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsHelper.java @@ -110,7 +110,6 @@ public final class ReactiveStreamsHelper { } } - @SuppressWarnings("unchecked") public static CamelReactiveStreamsServiceFactory resolveServiceFactory(CamelContext context, String serviceType) { try { FactoryFinder finder = context.adapt(ExtendedCamelContext.class).getFactoryFinder(ReactiveStreamsConstants.SERVICE_PATH); diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java index baea25f..c0d31f0 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java @@ -23,6 +23,7 @@ import org.apache.camel.Service; import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; +import org.apache.camel.spi.HasCamelContext; import org.apache.camel.spi.HasId; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -30,7 +31,7 @@ import org.reactivestreams.Subscriber; /** * The interface to which any implementation of the reactive-streams engine should comply. */ -public interface CamelReactiveStreamsService extends Service, HasId { +public interface CamelReactiveStreamsService extends Service, HasId, HasCamelContext { /* * Main API methods. diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java index 445cdd7..28327cf 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java @@ -45,13 +45,13 @@ public class CamelPublisher implements Publisher<Exchange>, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CamelPublisher.class); - private ExecutorService workerPool; + private final ExecutorService workerPool; - private String name; + private final String name; - private ReactiveStreamsBackpressureStrategy backpressureStrategy; + private final List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>(); - private List<CamelSubscription> subscriptions = new CopyOnWriteArrayList<>(); + private ReactiveStreamsBackpressureStrategy backpressureStrategy; private ReactiveStreamsProducer producer; diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java index ff60d4f..3b9f0e9 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java @@ -43,17 +43,17 @@ public class CamelSubscription implements Subscription { private static final Logger LOG = LoggerFactory.getLogger(CamelSubscription.class); - private String id; + private final String id; - private ExecutorService workerPool; + private final ExecutorService workerPool; - private String streamName; + private final String streamName; - private CamelPublisher publisher; + private final CamelPublisher publisher; - private ReactiveStreamsBackpressureStrategy backpressureStrategy; + private final Subscriber<? super Exchange> subscriber; - private Subscriber<? super Exchange> subscriber; + private ReactiveStreamsBackpressureStrategy backpressureStrategy; /** * The lock is used just for the time necessary to read/write shared variables. diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java index b34cd76..a9238e4 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java @@ -88,6 +88,11 @@ public class DefaultCamelReactiveStreamsService extends ServiceSupport implement } @Override + public CamelContext getCamelContext() { + return context; + } + + @Override protected void doInit() { if (this.workerPool == null) { this.workerPool = context.getExecutorServiceManager().newThreadPool( diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java index 74a937c..b929166 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DelayedMonoPublisher.java @@ -36,15 +36,15 @@ public class DelayedMonoPublisher<T> implements Publisher<T> { private static final Logger LOG = LoggerFactory.getLogger(DelayedMonoPublisher.class); - private ExecutorService workerPool; + private final ExecutorService workerPool; - private volatile T data; + private final List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>(); - private volatile Throwable exception; + private final AtomicBoolean flushing = new AtomicBoolean(false); - private List<MonoSubscription> subscriptions = new CopyOnWriteArrayList<>(); + private volatile T data; - private AtomicBoolean flushing = new AtomicBoolean(false); + private volatile Throwable exception; public DelayedMonoPublisher(ExecutorService workerPool) { this.workerPool = workerPool; @@ -130,7 +130,7 @@ public class DelayedMonoPublisher<T> implements Publisher<T> { private volatile boolean requested; - private Subscriber<? super T> subscriber; + private final Subscriber<? super T> subscriber; private MonoSubscription(Subscriber<? super T> subscriber) { this.subscriber = subscriber; diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java index 30dac80..6910c90 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.java @@ -30,7 +30,7 @@ import org.reactivestreams.Subscription; * It calls the dispatch callback if defined. */ public class UnwrappingPublisher implements Publisher<Exchange> { - private Publisher<Exchange> delegate; + private final Publisher<Exchange> delegate; public UnwrappingPublisher(Publisher<Exchange> delegate) { Objects.requireNonNull(delegate, "delegate publisher cannot be null"); diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java index 2f11562..5dc7bd8 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/BodyConverter.java @@ -33,17 +33,10 @@ public final class BodyConverter<T> implements Function<Exchange, T> { @Override public T apply(Exchange exchange) { - T answer; - - if (exchange.hasOut()) { - answer = exchange.getOut().getBody(type); - } else { - answer = exchange.getIn().getBody(type); - } - - return answer; + return exchange.getMessage().getBody(type); } + @SuppressWarnings("unchecked") public static <C> BodyConverter<C> forType(Class<C> type) { return BodyConverter.class.cast( CACHE.computeIfAbsent(type, BodyConverter::new) diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java index 1e94b13..154b08a 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java @@ -34,10 +34,10 @@ public class ConvertingPublisher<R> implements Publisher<R> { private static final Logger LOG = LoggerFactory.getLogger(ConvertingPublisher.class); - private Publisher<Exchange> delegate; + private final Publisher<Exchange> delegate; - private Class<R> type; - private BodyConverter<R> converter; + private final Class<R> type; + private final BodyConverter<R> converter; public ConvertingPublisher(Publisher<Exchange> delegate, Class<R> type) { Objects.requireNonNull(delegate, "delegate publisher cannot be null"); diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java index 90e830a..bd25706 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingSubscriber.java @@ -29,11 +29,11 @@ import org.reactivestreams.Subscription; */ public class ConvertingSubscriber<R> implements Subscriber<R> { - private Class<R> type; + private final Class<R> type; - private Subscriber<Exchange> delegate; + private final Subscriber<Exchange> delegate; - private CamelContext context; + private final CamelContext context; public ConvertingSubscriber(Subscriber<Exchange> delegate, CamelContext context, Class<R> type) { Objects.requireNonNull(delegate, "delegate subscriber cannot be null"); diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java index 4775232..3040889 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java @@ -27,7 +27,7 @@ import org.reactivestreams.Subscription; */ public class MonoPublisher<T> implements Publisher<T> { - private T item; + private final T item; public MonoPublisher(T item) { this.item = item; diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java index 65e54e2..eaf72e4 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java @@ -18,6 +18,7 @@ package org.apache.camel.component.reactive.streams.support; import java.util.function.Function; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; @@ -38,6 +39,11 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService { } @Override + public CamelContext getCamelContext() { + return null; + } + + @Override public void start() { } diff --git a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java index 7640274..4c76e9c 100644 --- a/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java +++ b/components/camel-reactor/src/main/java/org/apache/camel/component/reactor/engine/ReactorStreamsService.java @@ -64,6 +64,11 @@ final class ReactorStreamsService extends ServiceSupport implements CamelReactiv return ReactorStreamsConstants.SERVICE_NAME; } + @Override + public CamelContext getCamelContext() { + return context; + } + // ****************************************** // Lifecycle // ****************************************** diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java index f0b9c2b..ad0e128 100644 --- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java +++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java @@ -108,7 +108,7 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService Thread.sleep(200); // add other time to ensure no other items arrive Assert.assertEquals(2, queue.size()); - int sum = queue.stream().reduce((i, j) -> i + j).get(); + int sum = queue.stream().reduce(Integer::sum).get(); Assert.assertEquals(3, sum); // 1 + 2 = 3 subscriber.cancel(); @@ -158,7 +158,7 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService // Assert.assertEquals(2, queue.size()); Assert.assertEquals(3, queue.size()); - int sum = queue.stream().reduce((i, j) -> i + j).get(); + int sum = queue.stream().reduce(Integer::sum).get(); // Assert.assertEquals(21, sum); // 1 + 20 = 21 Assert.assertEquals(23, sum); // 1 + 2 + 20 = 23 diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java index 690c578..6ef2a2f 100644 --- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java +++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTest.java @@ -291,7 +291,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport Flux.just(1, 2, 3) .flatMap(e -> crs.to("bean:hello", e, String.class)) - .doOnNext(res -> values.add(res)) + .doOnNext(values::add) .doOnNext(res -> latch.countDown()) .subscribe(); @@ -308,9 +308,9 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport Flux.just(1, 2, 3) .flatMap(e -> crs.to("bean:hello", e)) - .map(e -> e.getMessage()) + .map(Exchange::getMessage) .map(e -> e.getBody(String.class)) - .doOnNext(res -> values.add(res)) + .doOnNext(values::add) .doOnNext(res -> latch.countDown()) .subscribe(); @@ -328,7 +328,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport Flux.just(1, 2, 3) .flatMap(fun) - .doOnNext(res -> values.add(res)) + .doOnNext(values::add) .doOnNext(res -> latch.countDown()) .subscribe(); @@ -346,9 +346,9 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport Flux.just(1, 2, 3) .flatMap(fun) - .map(e -> e.getMessage()) + .map(Exchange::getMessage) .map(e -> e.getBody(String.class)) - .doOnNext(res -> values.add(res)) + .doOnNext(values::add) .doOnNext(res -> latch.countDown()) .subscribe(); @@ -380,7 +380,7 @@ public class ReactorStreamsServiceTest extends ReactorStreamsServiceTestSupport int idx = 1; for (Exchange ex : mock.getExchanges()) { - Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class)); + Assert.assertEquals(Integer.valueOf(idx++), ex.getIn().getBody(Integer.class)); } } diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java index 6110ff4..1088c17 100644 --- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java +++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaCamelProcessor.java @@ -38,7 +38,7 @@ final class RxJavaCamelProcessor implements Closeable { private final String name; private final RxJavaStreamsService service; private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter; - private FlowableProcessor<Exchange> publisher; + private final FlowableProcessor<Exchange> publisher; private ReactiveStreamsProducer camelProducer; RxJavaCamelProcessor(RxJavaStreamsService service, String name) { diff --git a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java index 75e8ec9..860a055 100644 --- a/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java +++ b/components/camel-rxjava/src/main/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsService.java @@ -64,6 +64,11 @@ final class RxJavaStreamsService extends ServiceSupport implements CamelReactive return RxJavaStreamsConstants.SERVICE_NAME; } + @Override + public CamelContext getCamelContext() { + return context; + } + // ****************************************** // Lifecycle // ****************************************** diff --git a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java index cc98a98..dd068f6 100644 --- a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java +++ b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTest.java @@ -162,7 +162,9 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { CountDownLatch latch = new CountDownLatch(3); Flowable.fromPublisher(timer).map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class)) - .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())).doOnNext(res -> latch.countDown()).subscribe(); + .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) + .doOnNext(res -> latch.countDown()) + .subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } @@ -255,7 +257,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { AtomicInteger value = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(1); - Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(e -> e.getMessage()).map(e -> e.getBody(String.class)) + Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(Exchange::getMessage).map(e -> e.getBody(String.class)) .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); @@ -282,7 +284,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { CountDownLatch latch = new CountDownLatch(1); Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello"); - Flowable.just(1, 2, 3).flatMap(fun::apply).map(e -> e.getMessage()).map(e -> e.getBody(String.class)) + Flowable.just(1, 2, 3).flatMap(fun::apply).map(Exchange::getMessage).map(e -> e.getBody(String.class)) .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); @@ -310,7 +312,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { int idx = 1; for (Exchange ex : mock.getExchanges()) { - Assert.assertEquals(new Integer(idx++), ex.getIn().getBody(Integer.class)); + Assert.assertEquals(Integer.valueOf(idx++), ex.getIn().getBody(Integer.class)); } }