CAMEL-10807: Create camel-reactor component

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f8f6b698
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f8f6b698
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f8f6b698

Branch: refs/heads/master
Commit: f8f6b698f16640b4806501637b791ace54c50b02
Parents: 2956fca
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Tue May 2 13:56:24 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Fri May 5 13:10:06 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/reactor-component.adoc        |  16 ++
 .../ReactorStreamsServiceBackpressureTest.java  |  34 +--
 .../ReactiveStreamsAutoConfigurationTest.java   |   9 +-
 .../test/ReactiveStreamsNamedEngineTest.java    | 206 +++++++++++++++++++
 .../test/ReactiveStreamsRegistryEngineTest.java | 200 ++++++++++++++++++
 5 files changed, 448 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/components/camel-reactor/src/main/docs/reactor-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-reactor/src/main/docs/reactor-component.adoc 
b/components/camel-reactor/src/main/docs/reactor-component.adoc
new file mode 100644
index 0000000..25dcf27
--- /dev/null
+++ b/components/camel-reactor/src/main/docs/reactor-component.adoc
@@ -0,0 +1,16 @@
+## Reactor Component
+
+*Available as of Camel version 2.20*
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,xml]
+------------------------------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-reactor</artifactId>
+    <version>x.x.x</version>
+    <!-- use the same version as your Camel core version -->
+</dependency>
+------------------------------------------------------------
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
 
b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
index 6346530..e85e68a 100644
--- 
a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
+++ 
b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceBackpressureTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.reactor.engine;
 
+import java.time.Duration;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -39,9 +40,9 @@ public class ReactorStreamsServiceBackpressureTest extends 
ReactorStreamsService
             @Override
             public void configure() throws Exception {
                 from("timer:gen?period=20&repeatCount=20")
-                        .setBody()
-                        .header(Exchange.TIMER_COUNTER)
-                        .to("reactive-streams:integers");
+                    .setBody()
+                    .header(Exchange.TIMER_COUNTER)
+                    .to("reactive-streams:integers");
             }
         });
 
@@ -50,10 +51,10 @@ public class ReactorStreamsServiceBackpressureTest extends 
ReactorStreamsService
         CountDownLatch latch = new CountDownLatch(1);
 
         Flux.range(0, 50)
-                .zipWith(integers, (l, i) -> i)
-                .timeoutMillis(2000, Flux.empty())
-                .doOnComplete(latch::countDown)
-                .subscribe(queue::add);
+            .zipWith(integers, (l, i) -> i)
+            .timeout(Duration.ofMillis(2000), Flux.empty())
+            .doOnComplete(latch::countDown)
+            .subscribe(queue::add);
 
         context.start();
 
@@ -75,9 +76,9 @@ public class ReactorStreamsServiceBackpressureTest extends 
ReactorStreamsService
             @Override
             public void configure() throws Exception {
                 from("timer:gen?period=20&repeatCount=20")
-                        .setBody()
-                        .header(Exchange.TIMER_COUNTER)
-                        .to("reactive-streams:integers");
+                    .setBody()
+                    .header(Exchange.TIMER_COUNTER)
+                    .to("reactive-streams:integers");
             }
         });
 
@@ -122,9 +123,9 @@ public class ReactorStreamsServiceBackpressureTest extends 
ReactorStreamsService
             @Override
             public void configure() throws Exception {
                 from("timer:gen?period=20&repeatCount=20")
-                        .setBody()
-                        .header(Exchange.TIMER_COUNTER)
-                        .to("reactive-streams:integers");
+                    .setBody()
+                    .header(Exchange.TIMER_COUNTER)
+                    .to("reactive-streams:integers");
             }
         });
 
@@ -153,12 +154,13 @@ public class ReactorStreamsServiceBackpressureTest 
extends ReactorStreamsService
         Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS));
 
         Thread.sleep(200); // add other time to ensure no other items arrive
-        // TODO the chain caches two elements instead of one: change it if you 
find an EmitterProcessor without prefetch
-//        Assert.assertEquals(2, queue.size());
+
+        // TODO: the chain caches two elements instead of one: change it if 
you find an EmitterProcessor without prefetch
+        // Assert.assertEquals(2, queue.size());
         Assert.assertEquals(3, queue.size());
 
         int sum = queue.stream().reduce((i, j) -> i + j).get();
-//        Assert.assertEquals(21, sum); // 1 + 20 = 21
+        // Assert.assertEquals(21, sum); // 1 + 20 = 21
         Assert.assertEquals(23, sum); // 1 + 2 + 20 = 23
 
         subscriber.cancel();

http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
 
b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
index 5e1c4ba..65c75c3 100644
--- 
a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
+++ 
b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsAutoConfigurationTest.java
@@ -61,15 +61,22 @@ import static org.junit.Assert.assertTrue;
 public class ReactiveStreamsAutoConfigurationTest {
     @Autowired
     private CamelContext context;
+    @Autowired
+    private CamelReactiveStreamsService reactiveStreamsService;
 
     @Test
-    public void testAutoConfiguration() throws InterruptedException {
+    public void testConfiguration() throws InterruptedException {
         CamelReactiveStreamsService service = 
CamelReactiveStreams.get(context);
         assertTrue(service instanceof DefaultCamelReactiveStreamsService);
+        assertEquals(service, reactiveStreamsService);
 
         ReactiveStreamsComponent component = 
context.getComponent(ReactiveStreamsConstants.SCHEME, 
ReactiveStreamsComponent.class);
         assertEquals("rs-test", 
component.getInternalEngineConfiguration().getThreadPoolName());
+    }
 
+    @Test
+    public void testService() throws InterruptedException {
+        CamelReactiveStreamsService service = 
CamelReactiveStreams.get(context);
         CountDownLatch latch = new CountDownLatch(1);
         String[] res = new String[1];
         Throwable[] error = new Throwable[1];

http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java
 
b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java
new file mode 100644
index 0000000..8088d15
--- /dev/null
+++ 
b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsNamedEngineTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.springboot.test;
+
+import java.util.function.Function;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import 
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
+import 
org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService;
+import 
org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsComponentAutoConfiguration;
+import 
org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsServiceAutoConfiguration;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.stereotype.Component;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import org.junit.Assert;
+
+@RunWith(SpringRunner.class)
+@SpringBootApplication
+@DirtiesContext
+@SpringBootTest(
+    classes = {
+        ReactiveStreamsServiceAutoConfiguration.class,
+        ReactiveStreamsComponentAutoConfiguration.class,
+        CamelAutoConfiguration.class
+    },
+    properties = {
+        "camel.component.reactive-streams.service-type=my-engine"
+    }
+)
+public class ReactiveStreamsNamedEngineTest {
+    @Autowired
+    private CamelContext context;
+    @Autowired
+    private CamelReactiveStreamsService reactiveStreamsService;
+
+    @Test
+    public void testAutoConfiguration() throws InterruptedException {
+        CamelReactiveStreamsService service = 
CamelReactiveStreams.get(context);
+        Assert.assertTrue(service instanceof MyEngine);
+        Assert.assertEquals(service, reactiveStreamsService);
+    }
+
+    @Component("my-engine")
+    static class MyEngine implements CamelReactiveStreamsService {
+
+        @Override
+        public Publisher<Exchange> fromStream(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> fromStream(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Subscriber<Exchange> streamSubscriber(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Subscriber<T> streamSubscriber(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Publisher<Exchange> toStream(String s, Object o) {
+            return null;
+        }
+
+        @Override
+        public Function<?, ? extends Publisher<Exchange>> toStream(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> toStream(String s, Object o, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public <T> Function<Object, Publisher<T>> toStream(String s, Class<T> 
aClass) {
+            return null;
+        }
+
+        @Override
+        public Publisher<Exchange> from(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> from(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Subscriber<Exchange> subscriber(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Subscriber<T> subscriber(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Publisher<Exchange> to(String s, Object o) {
+            return null;
+        }
+
+        @Override
+        public Function<Object, Publisher<Exchange>> to(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> to(String s, Object o, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public <T> Function<Object, Publisher<T>> to(String s, Class<T> 
aClass) {
+            return null;
+        }
+
+        @Override
+        public void process(String s, Function<? super Publisher<Exchange>, ?> 
function) {
+
+        }
+
+        @Override
+        public <T> void process(String s, Class<T> aClass, Function<? super 
Publisher<T>, ?> function) {
+
+        }
+
+        @Override
+        public void attachCamelProducer(String s, ReactiveStreamsProducer 
producer) {
+
+        }
+
+        @Override
+        public void detachCamelProducer(String s) {
+
+        }
+
+        @Override
+        public void sendCamelExchange(String s, Exchange exchange) {
+
+        }
+
+        @Override
+        public CamelSubscriber attachCamelConsumer(String s, 
ReactiveStreamsConsumer consumer) {
+            return null;
+        }
+
+        @Override
+        public void detachCamelConsumer(String s) {
+
+        }
+
+        @Override
+        public void start() throws Exception {
+
+        }
+
+        @Override
+        public void stop() throws Exception {
+
+        }
+
+        @Override
+        public String getId() {
+            return "my-engine";
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/f8f6b698/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java
----------------------------------------------------------------------
diff --git 
a/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java
 
b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java
new file mode 100644
index 0000000..421033f
--- /dev/null
+++ 
b/platforms/spring-boot/components-starter/camel-reactive-streams-starter/src/test/java/org/apache/camel/component/reactive/streams/springboot/test/ReactiveStreamsRegistryEngineTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.reactive.streams.springboot.test;
+
+import java.util.function.Function;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import 
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
+import 
org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsComponentAutoConfiguration;
+import 
org.apache.camel.component.reactive.streams.springboot.ReactiveStreamsServiceAutoConfiguration;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.stereotype.Component;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootApplication
+@DirtiesContext
+@SpringBootTest(
+    classes = {
+        ReactiveStreamsServiceAutoConfiguration.class,
+        ReactiveStreamsComponentAutoConfiguration.class,
+        CamelAutoConfiguration.class
+    }
+)
+public class ReactiveStreamsRegistryEngineTest {
+    @Autowired
+    private CamelContext context;
+    @Autowired
+    private CamelReactiveStreamsService reactiveStreamsService;
+
+    @Test
+    public void testAutoConfiguration() throws InterruptedException {
+        CamelReactiveStreamsService service = 
CamelReactiveStreams.get(context);
+        Assert.assertTrue(service instanceof MyEngine);
+        Assert.assertEquals(service, reactiveStreamsService);
+    }
+
+    @Component("my-engine")
+    static class MyEngine implements CamelReactiveStreamsService {
+
+        @Override
+        public Publisher<Exchange> fromStream(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> fromStream(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Subscriber<Exchange> streamSubscriber(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Subscriber<T> streamSubscriber(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Publisher<Exchange> toStream(String s, Object o) {
+            return null;
+        }
+
+        @Override
+        public Function<?, ? extends Publisher<Exchange>> toStream(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> toStream(String s, Object o, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public <T> Function<Object, Publisher<T>> toStream(String s, Class<T> 
aClass) {
+            return null;
+        }
+
+        @Override
+        public Publisher<Exchange> from(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> from(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Subscriber<Exchange> subscriber(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Subscriber<T> subscriber(String s, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public Publisher<Exchange> to(String s, Object o) {
+            return null;
+        }
+
+        @Override
+        public Function<Object, Publisher<Exchange>> to(String s) {
+            return null;
+        }
+
+        @Override
+        public <T> Publisher<T> to(String s, Object o, Class<T> aClass) {
+            return null;
+        }
+
+        @Override
+        public <T> Function<Object, Publisher<T>> to(String s, Class<T> 
aClass) {
+            return null;
+        }
+
+        @Override
+        public void process(String s, Function<? super Publisher<Exchange>, ?> 
function) {
+
+        }
+
+        @Override
+        public <T> void process(String s, Class<T> aClass, Function<? super 
Publisher<T>, ?> function) {
+
+        }
+
+        @Override
+        public void attachCamelProducer(String s, ReactiveStreamsProducer 
producer) {
+
+        }
+
+        @Override
+        public void detachCamelProducer(String s) {
+
+        }
+
+        @Override
+        public void sendCamelExchange(String s, Exchange exchange) {
+
+        }
+
+        @Override
+        public CamelSubscriber attachCamelConsumer(String s, 
ReactiveStreamsConsumer consumer) {
+            return null;
+        }
+
+        @Override
+        public void detachCamelConsumer(String s) {
+
+        }
+
+        @Override
+        public void start() throws Exception {
+
+        }
+
+        @Override
+        public void stop() throws Exception {
+
+        }
+
+        @Override
+        public String getId() {
+            return "my-engine";
+        }
+    }
+}
+

Reply via email to