This is an automated email from the ASF dual-hosted git repository.

sergeyb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 169427ac954fc606329eac7f18fccd6acff98a07
Author: Sergey Beryozkin <[email protected]>
AuthorDate: Fri Dec 22 15:32:34 2017 +0000

    [CXF-7535] Preparing ReactorInvoker to handle Flowable sequence
---
 .../cxf/jaxrs/reactor/server/ReactorInvoker.java   | 27 ++++++++++++----------
 .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java    | 18 +++++++++++++--
 .../jaxrs/reactive/JAXRSRxJava2FlowableTest.java   | 11 +++++++++
 .../jaxrs/reactive/RxJava2FlowableService.java     | 16 ++++++++++++-
 .../cxf/systest/jaxrs/reactor/FluxReactorTest.java |  3 +++
 .../cxf/systest/jaxrs/reactor/FluxService.java     |  9 ++++++++
 6 files changed, 69 insertions(+), 15 deletions(-)

diff --git 
a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
 
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
index 6779e04..79b0428 100644
--- 
a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
+++ 
b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java
@@ -18,10 +18,12 @@
  */
 package org.apache.cxf.jaxrs.reactor.server;
 
-import java.util.function.Consumer;
+import java.util.concurrent.CancellationException;
+
 import org.apache.cxf.jaxrs.JAXRSInvoker;
 import org.apache.cxf.jaxrs.impl.AsyncResponseImpl;
 import org.apache.cxf.message.Message;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -32,25 +34,26 @@ public class ReactorInvoker extends JAXRSInvoker {
             final Flux<?> flux = (Flux<?>) result;
             final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
             flux.doOnNext(asyncResponse::resume)
-                    .doOnError(asyncResponse::resume)
-                    .doOnComplete(asyncResponse::onComplete)
+                    .doOnError(t -> handleThrowable(asyncResponse, t))
                     .subscribe();
             return asyncResponse;
         } else if (result instanceof Mono) {
-            // mono is only 0 or 1 element, so when something comes in need to 
complete the async
             final Mono<?> flux = (Mono<?>) result;
             final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
-            flux.doOnNext((Consumer<Object>) o -> {
-                asyncResponse.resume(o);
-                asyncResponse.onComplete();
-            })
-            .doOnError((Consumer<Throwable>) throwable -> {
-                asyncResponse.resume(throwable);
-                asyncResponse.onComplete();
-            })
+            flux.doOnNext(asyncResponse::resume)
+                .doOnError(t -> handleThrowable(asyncResponse, t))
                 .subscribe();
             return asyncResponse;
         }
         return null;
     }
+    
+    private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable 
t) {
+        if (t instanceof CancellationException) {
+            asyncResponse.cancel();
+        } else {
+            asyncResponse.resume(t);
+        }
+        return null;
+    }
 }
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
index 44ac9c7..6c99a2c 100644
--- 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.jaxrs.rx2.server;
 
+import java.util.concurrent.CancellationException;
+
 import javax.ws.rs.core.MediaType;
 
 import org.apache.cxf.jaxrs.JAXRSInvoker;
@@ -27,6 +29,7 @@ import org.apache.cxf.message.Message;
 
 import io.reactivex.Flowable;
 import io.reactivex.Observable;
+import io.reactivex.Single;
 
 //Work in Progress
 public class ReactiveIOInvoker extends JAXRSInvoker {
@@ -34,6 +37,8 @@ public class ReactiveIOInvoker extends JAXRSInvoker {
     protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object 
result) {
         if (result instanceof Flowable) {
             return handleFlowable(inMessage, (Flowable<?>)result);
+        } else if (result instanceof Single) {
+            return handleSingle(inMessage, (Single<?>)result);
         } else if (result instanceof Observable) {
             return handleObservable(inMessage, (Observable<?>)result);
         } else {
@@ -41,6 +46,12 @@ public class ReactiveIOInvoker extends JAXRSInvoker {
         }
     }
     
+    protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> 
single) {
+        final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
+        single.subscribe(v -> asyncResponse.resume(v), t -> 
handleThrowable(asyncResponse, t));
+        return asyncResponse;
+    }
+
     protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> 
f) {
         final AsyncResponseImpl asyncResponse = new 
AsyncResponseImpl(inMessage);
         if (isUseStreamingSubscriberIfPossible() && isJsonResponse(inMessage)) 
{
@@ -62,8 +73,11 @@ public class ReactiveIOInvoker extends JAXRSInvoker {
     }
 
     private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable 
t) {
-        //TODO: if it is a Cancelation exception => asyncResponse.cancel(); 
-        asyncResponse.resume(t);
+        if (t instanceof CancellationException) {
+            asyncResponse.cancel();
+        } else {
+            asyncResponse.resume(t);
+        }
         return null;
     }
 
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
index 479f95b..ddfa9f4 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2FlowableTest.java
@@ -111,6 +111,17 @@ public class JAXRSRxJava2FlowableTest extends 
AbstractBusClientServerTestBase {
     }
     
     @Test
+    public void testGetHelloWorldJsonSingle() throws Exception {
+        String address = "http://localhost:"; + PORT + 
"/rx22/flowable/textJsonSingle";
+        WebClient wc = WebClient.create(address,
+                                        Collections.singletonList(new 
JacksonJsonProvider()));
+    
+        HelloWorldBean bean = 
wc.accept("application/json").get(HelloWorldBean.class);
+        assertEquals("Hello", bean.getGreeting());
+        assertEquals("World", bean.getAudience());
+    }
+    
+    @Test
     public void testGetHelloWorldAsyncObservable() throws Exception {
         String address = "http://localhost:"; + PORT + 
"/rx2/flowable/textAsync";
         WebClient wc = WebClient.create(address,
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
index d4be1ea..b89f368 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableService.java
@@ -22,6 +22,7 @@ package org.apache.cxf.systest.jaxrs.reactive;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -34,6 +35,7 @@ import 
org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber;
 
 import io.reactivex.BackpressureStrategy;
 import io.reactivex.Flowable;
+import io.reactivex.Single;
 import io.reactivex.schedulers.Schedulers;
 
 
@@ -71,7 +73,7 @@ public class RxJava2FlowableService {
     @Path("textJsonImplicitListAsyncStream")
     public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) 
{
         Flowable.just("Hello", "Ciao")
-            .map(s -> new HelloWorldBean(s))
+            .map(HelloWorldBean::new)
             .subscribeOn(Schedulers.computation())
             .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar));
     }
@@ -92,6 +94,18 @@ public class RxJava2FlowableService {
         }, BackpressureStrategy.MISSING);
     }
     
+    @GET
+    @Produces("application/json")
+    @Path("textJsonSingle")
+    public Single<HelloWorldBean> getJsonSingle() {
+        CompletableFuture<HelloWorldBean> completableFuture = CompletableFuture
+            .supplyAsync(() -> {
+                sleep();
+                return new HelloWorldBean("Hello");
+            });
+        return Single.fromFuture(completableFuture);
+    }
+    
     private static void sleep() {
         try {
             Thread.sleep(1000);
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
index f6ac2ac..3065dd5 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java
@@ -62,6 +62,9 @@ public class FluxReactorTest extends 
AbstractBusClientServerTestBase {
     @Test
     public void testTextJsonImplicitListAsyncStream() throws Exception {
         String address = "http://localhost:"; + PORT + 
"/reactor/flux/textJsonImplicitListAsyncStream";
+        doTestTextJsonImplicitListAsyncStream(address);
+    }
+    private void doTestTextJsonImplicitListAsyncStream(String address) throws 
Exception {
         List<HelloWorldBean> holder = new ArrayList<>();
         ClientBuilder.newClient()
                 .register(new JacksonJsonProvider())
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
index 79a6c7f..d96d713 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java
@@ -47,4 +47,13 @@ public class FluxService {
                 .subscribeOn(Schedulers.parallel())
                 .subscribe(new JsonStreamingAsyncSubscriber<>(ar));
     }
+    
+    @GET
+    @Produces("application/json")
+    @Path("textJsonImplicitListAsyncStream2")
+    public Flux<HelloWorldBean> getJsonImplicitListStreamingAsync2() {
+        return Flux.just("Hello", "Ciao")
+                .map(HelloWorldBean::new)
+                .subscribeOn(Schedulers.parallel());
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to