CAMEL-10807: Create camel-reactor component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f8f6b698 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f8f6b698 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f8f6b698 Branch: refs/heads/master Commit: f8f6b698f16640b4806501637b791ace54c50b02 Parents: 2956fca Author: lburgazzoli <lburgazz...@gmail.com> Authored: Tue May 2 13:56:24 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Fri May 5 13:10:06 2017 +0200 ---------------------------------------------------------------------- .../src/main/docs/reactor-component.adoc | 16 ++ .../ReactorStreamsServiceBackpressureTest.java | 34 +-- .../ReactiveStreamsAutoConfigurationTest.java | 9 +- .../test/ReactiveStreamsNamedEngineTest.java | 206 +++++++++++++++++++ .../test/ReactiveStreamsRegistryEngineTest.java | 200 ++++++++++++++++++ 5 files changed, 448 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/components/camel-reactor/src/main/docs/reactor-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-reactor/src/main/docs/reactor-component.adoc b/components/camel-reactor/src/main/docs/reactor-component.adoc new file mode 100644 index 0000000..25dcf27 --- /dev/null +++ b/components/camel-reactor/src/main/docs/reactor-component.adoc @@ -0,0 +1,16 @@ +## Reactor Component + +*Available as of Camel version 2.20* + +Maven users will need to add the following dependency to their `pom.xml` +for this component: + +[source,xml] +------------------------------------------------------------ +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-reactor</artifactId> + <version>x.x.x</version> + <!-- use the same version as your Camel core version --> +</dependency> +------------------------------------------------------------ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java index 6346530..e85e68a 100644 --- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java +++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.reactor.engine; +import java.time.Duration; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,9 +40,9 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService @Override public void configure() throws Exception { from("timer:gen?period=20&repeatCount=20") - .setBody() - .header(Exchange.TIMER_COUNTER) - .to("reactive-streams:integers"); + .setBody() + .header(Exchange.TIMER_COUNTER) + .to("reactive-streams:integers"); } }); @@ -50,10 +51,10 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService CountDownLatch latch = new CountDownLatch(1); Flux.range(0, 50) - .zipWith(integers, (l, i) -> i) - .timeoutMillis(2000, Flux.empty()) - .doOnComplete(latch::countDown) - .subscribe(queue::add); + .zipWith(integers, (l, i) -> i) + .timeout(Duration.ofMillis(2000), Flux.empty()) + .doOnComplete(latch::countDown) + .subscribe(queue::add); context.start(); @@ -75,9 +76,9 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService @Override public void configure() throws Exception { from("timer:gen?period=20&repeatCount=20") - .setBody() - .header(Exchange.TIMER_COUNTER) - .to("reactive-streams:integers"); + .setBody() + .header(Exchange.TIMER_COUNTER) + .to("reactive-streams:integers"); } }); @@ -122,9 +123,9 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService @Override public void configure() throws Exception { from("timer:gen?period=20&repeatCount=20") - .setBody() - .header(Exchange.TIMER_COUNTER) - .to("reactive-streams:integers"); + .setBody() + .header(Exchange.TIMER_COUNTER) + .to("reactive-streams:integers"); } }); @@ -153,12 +154,13 @@ public class ReactorStreamsServiceBackpressureTest extends ReactorStreamsService Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); Thread.sleep(200); // add other time to ensure no other items arrive - // TODO the chain caches two elements instead of one: change it if you find an EmitterProcessor without prefetch -// Assert.assertEquals(2, queue.size()); + + // TODO: the chain caches two elements instead of one: change it if you find an EmitterProcessor without prefetch + // Assert.assertEquals(2, queue.size()); Assert.assertEquals(3, queue.size()); int sum = queue.stream().reduce((i, j) -> i + j).get(); -// Assert.assertEquals(21, sum); // 1 + 20 = 21 + // Assert.assertEquals(21, sum); // 1 + 20 = 21 Assert.assertEquals(23, sum); // 1 + 2 + 20 = 23 subscriber.cancel(); http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java index 5e1c4ba..65c75c3 100644 --- a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java +++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java @@ -61,15 +61,22 @@ import static org.junit.Assert.assertTrue; public class ReactiveStreamsAutoConfigurationTest { @Autowired private CamelContext context; + @Autowired + private CamelReactiveStreamsService reactiveStreamsService; @Test - public void testAutoConfiguration() throws InterruptedException { + public void testConfiguration() throws InterruptedException { CamelReactiveStreamsService service = CamelReactiveStreams.get(context); assertTrue(service instanceof DefaultCamelReactiveStreamsService); + assertEquals(service, reactiveStreamsService); ReactiveStreamsComponent component = context.getComponent(ReactiveStreamsConstants.SCHEME, ReactiveStreamsComponent.class); assertEquals("rs-test", component.getInternalEngineConfiguration().getThreadPoolName()); + } + @Test + public void testService() throws InterruptedException { + CamelReactiveStreamsService service = CamelReactiveStreams.get(context); CountDownLatch latch = new CountDownLatch(1); String[] res = new String[1]; Throwable[] error = new Throwable[1]; http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java new file mode 100644 index 0000000..8088d15 --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams.springboot.test; + +import java.util.function.Function; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; +import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.component.reactive.streams.engine.CamelSubscriber; +import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService; +import org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsComponentAutoConfiguration; +import org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsServiceAutoConfiguration; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +import org.junit.Assert; + +@RunWith(SpringRunner.class) +@SpringBootApplication +@DirtiesContext +@SpringBootTest( + classes = { + ReactiveStreamsServiceAutoConfiguration.class, + ReactiveStreamsComponentAutoConfiguration.class, + CamelAutoConfiguration.class + }, + properties = { + "camel.component.reactive-streams.service-type=my-engine" + } +) +public class ReactiveStreamsNamedEngineTest { + @Autowired + private CamelContext context; + @Autowired + private CamelReactiveStreamsService reactiveStreamsService; + + @Test + public void testAutoConfiguration() throws InterruptedException { + CamelReactiveStreamsService service = CamelReactiveStreams.get(context); + Assert.assertTrue(service instanceof MyEngine); + Assert.assertEquals(service, reactiveStreamsService); + } + + @Component("my-engine") + static class MyEngine implements CamelReactiveStreamsService { + + @Override + public Publisher<Exchange> fromStream(String s) { + return null; + } + + @Override + public <T> Publisher<T> fromStream(String s, Class<T> aClass) { + return null; + } + + @Override + public Subscriber<Exchange> streamSubscriber(String s) { + return null; + } + + @Override + public <T> Subscriber<T> streamSubscriber(String s, Class<T> aClass) { + return null; + } + + @Override + public Publisher<Exchange> toStream(String s, Object o) { + return null; + } + + @Override + public Function<?, ? extends Publisher<Exchange>> toStream(String s) { + return null; + } + + @Override + public <T> Publisher<T> toStream(String s, Object o, Class<T> aClass) { + return null; + } + + @Override + public <T> Function<Object, Publisher<T>> toStream(String s, Class<T> aClass) { + return null; + } + + @Override + public Publisher<Exchange> from(String s) { + return null; + } + + @Override + public <T> Publisher<T> from(String s, Class<T> aClass) { + return null; + } + + @Override + public Subscriber<Exchange> subscriber(String s) { + return null; + } + + @Override + public <T> Subscriber<T> subscriber(String s, Class<T> aClass) { + return null; + } + + @Override + public Publisher<Exchange> to(String s, Object o) { + return null; + } + + @Override + public Function<Object, Publisher<Exchange>> to(String s) { + return null; + } + + @Override + public <T> Publisher<T> to(String s, Object o, Class<T> aClass) { + return null; + } + + @Override + public <T> Function<Object, Publisher<T>> to(String s, Class<T> aClass) { + return null; + } + + @Override + public void process(String s, Function<? super Publisher<Exchange>, ?> function) { + + } + + @Override + public <T> void process(String s, Class<T> aClass, Function<? super Publisher<T>, ?> function) { + + } + + @Override + public void attachCamelProducer(String s, ReactiveStreamsProducer producer) { + + } + + @Override + public void detachCamelProducer(String s) { + + } + + @Override + public void sendCamelExchange(String s, Exchange exchange) { + + } + + @Override + public CamelSubscriber attachCamelConsumer(String s, ReactiveStreamsConsumer consumer) { + return null; + } + + @Override + public void detachCamelConsumer(String s) { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public String getId() { + return "my-engine"; + } + } +} + http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java ---------------------------------------------------------------------- diff --git a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java new file mode 100644 index 0000000..421033f --- /dev/null +++ b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams.springboot.test; + +import java.util.function.Function; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; +import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.component.reactive.streams.engine.CamelSubscriber; +import org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsComponentAutoConfiguration; +import org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsServiceAutoConfiguration; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootApplication +@DirtiesContext +@SpringBootTest( + classes = { + ReactiveStreamsServiceAutoConfiguration.class, + ReactiveStreamsComponentAutoConfiguration.class, + CamelAutoConfiguration.class + } +) +public class ReactiveStreamsRegistryEngineTest { + @Autowired + private CamelContext context; + @Autowired + private CamelReactiveStreamsService reactiveStreamsService; + + @Test + public void testAutoConfiguration() throws InterruptedException { + CamelReactiveStreamsService service = CamelReactiveStreams.get(context); + Assert.assertTrue(service instanceof MyEngine); + Assert.assertEquals(service, reactiveStreamsService); + } + + @Component("my-engine") + static class MyEngine implements CamelReactiveStreamsService { + + @Override + public Publisher<Exchange> fromStream(String s) { + return null; + } + + @Override + public <T> Publisher<T> fromStream(String s, Class<T> aClass) { + return null; + } + + @Override + public Subscriber<Exchange> streamSubscriber(String s) { + return null; + } + + @Override + public <T> Subscriber<T> streamSubscriber(String s, Class<T> aClass) { + return null; + } + + @Override + public Publisher<Exchange> toStream(String s, Object o) { + return null; + } + + @Override + public Function<?, ? extends Publisher<Exchange>> toStream(String s) { + return null; + } + + @Override + public <T> Publisher<T> toStream(String s, Object o, Class<T> aClass) { + return null; + } + + @Override + public <T> Function<Object, Publisher<T>> toStream(String s, Class<T> aClass) { + return null; + } + + @Override + public Publisher<Exchange> from(String s) { + return null; + } + + @Override + public <T> Publisher<T> from(String s, Class<T> aClass) { + return null; + } + + @Override + public Subscriber<Exchange> subscriber(String s) { + return null; + } + + @Override + public <T> Subscriber<T> subscriber(String s, Class<T> aClass) { + return null; + } + + @Override + public Publisher<Exchange> to(String s, Object o) { + return null; + } + + @Override + public Function<Object, Publisher<Exchange>> to(String s) { + return null; + } + + @Override + public <T> Publisher<T> to(String s, Object o, Class<T> aClass) { + return null; + } + + @Override + public <T> Function<Object, Publisher<T>> to(String s, Class<T> aClass) { + return null; + } + + @Override + public void process(String s, Function<? super Publisher<Exchange>, ?> function) { + + } + + @Override + public <T> void process(String s, Class<T> aClass, Function<? super Publisher<T>, ?> function) { + + } + + @Override + public void attachCamelProducer(String s, ReactiveStreamsProducer producer) { + + } + + @Override + public void detachCamelProducer(String s) { + + } + + @Override + public void sendCamelExchange(String s, Exchange exchange) { + + } + + @Override + public CamelSubscriber attachCamelConsumer(String s, ReactiveStreamsConsumer consumer) { + return null; + } + + @Override + public void detachCamelConsumer(String s) { + + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public String getId() { + return "my-engine"; + } + } +} +