This is an automated email from the ASF dual-hosted git repository. sergeyb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 8efd3625123ea0eb9c864d2064757756c9290ad7 Author: Sergey Beryozkin <[email protected]> AuthorDate: Fri Dec 22 17:33:11 2017 +0000 [CXF-7535] Introducing the common invoker --- .../server/AbstractReactiveInvoker.java} | 44 ++++++++++------------ rt/rs/extensions/reactor/pom.xml | 7 ++++ .../cxf/jaxrs/reactor/server/ReactorInvoker.java | 26 ++++++------- .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 30 +-------------- .../cxf/systest/jaxrs/reactor/FluxReactorTest.java | 5 +++ .../cxf/systest/jaxrs/reactor/FluxService.java | 2 +- .../cxf/systest/jaxrs/reactor/MonoService.java | 2 +- .../cxf/systest/jaxrs/reactor/ReactorServer.java | 34 +++++++++++++---- 8 files changed, 74 insertions(+), 76 deletions(-) diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java similarity index 51% copy from rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java copy to rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java index 79b0428..237a4dc 100644 --- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java +++ b/rt/rs/extensions/reactivestreams/src/main/java/org/apache/cxf/jaxrs/reactivestreams/server/AbstractReactiveInvoker.java @@ -16,39 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cxf.jaxrs.reactor.server; +package org.apache.cxf.jaxrs.reactivestreams.server; import java.util.concurrent.CancellationException; +import javax.ws.rs.core.MediaType; + import org.apache.cxf.jaxrs.JAXRSInvoker; import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; import org.apache.cxf.message.Message; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -public class ReactorInvoker extends JAXRSInvoker { - @Override - protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { - if (result instanceof Flux) { - final Flux<?> flux = (Flux<?>) result; - final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - flux.doOnNext(asyncResponse::resume) - .doOnError(t -> handleThrowable(asyncResponse, t)) - .subscribe(); - return asyncResponse; - } else if (result instanceof Mono) { - final Mono<?> flux = (Mono<?>) result; - final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - flux.doOnNext(asyncResponse::resume) - .doOnError(t -> handleThrowable(asyncResponse, t)) - .subscribe(); - return asyncResponse; - } - return null; - } +public abstract class AbstractReactiveInvoker extends JAXRSInvoker { + private boolean useStreamingSubscriberIfPossible; + - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { + protected Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { if (t instanceof CancellationException) { asyncResponse.cancel(); } else { @@ -56,4 +39,17 @@ public class ReactorInvoker extends JAXRSInvoker { } return null; } + + protected boolean isJsonResponse(Message inMessage) { + return MediaType.APPLICATION_JSON.equals(inMessage.getExchange().get(Message.CONTENT_TYPE)); + } + + + public boolean isUseStreamingSubscriberIfPossible() { + return useStreamingSubscriberIfPossible; + } + + public void setUseStreamingSubscriberIfPossible(boolean useStreamingSubscriberIfPossible) { + this.useStreamingSubscriberIfPossible = useStreamingSubscriberIfPossible; + } } diff --git a/rt/rs/extensions/reactor/pom.xml b/rt/rs/extensions/reactor/pom.xml index 40de637..952f73b 100644 --- a/rt/rs/extensions/reactor/pom.xml +++ b/rt/rs/extensions/reactor/pom.xml @@ -43,6 +43,13 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.cxf</groupId> + <artifactId>cxf-rt-rs-extension-reactivestreams</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> + <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> diff --git a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java index 79b0428..9e204a0 100644 --- a/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java +++ b/rt/rs/extensions/reactor/src/main/java/org/apache/cxf/jaxrs/reactor/server/ReactorInvoker.java @@ -18,29 +18,33 @@ */ package org.apache.cxf.jaxrs.reactor.server; -import java.util.concurrent.CancellationException; - -import org.apache.cxf.jaxrs.JAXRSInvoker; import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; +import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker; +import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import org.apache.cxf.message.Message; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class ReactorInvoker extends JAXRSInvoker { +public class ReactorInvoker extends AbstractReactiveInvoker { + @Override protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { if (result instanceof Flux) { final Flux<?> flux = (Flux<?>) result; final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - flux.doOnNext(asyncResponse::resume) + if (isUseStreamingSubscriberIfPossible() && isJsonResponse(inMessage)) { + flux.subscribe(new JsonStreamingAsyncSubscriber<>(asyncResponse)); + } else { + flux.doOnNext(asyncResponse::resume) .doOnError(t -> handleThrowable(asyncResponse, t)) .subscribe(); + } return asyncResponse; } else if (result instanceof Mono) { - final Mono<?> flux = (Mono<?>) result; + final Mono<?> mono = (Mono<?>) result; final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - flux.doOnNext(asyncResponse::resume) + mono.doOnNext(asyncResponse::resume) .doOnError(t -> handleThrowable(asyncResponse, t)) .subscribe(); return asyncResponse; @@ -48,12 +52,4 @@ public class ReactorInvoker extends JAXRSInvoker { return null; } - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - if (t instanceof CancellationException) { - asyncResponse.cancel(); - } else { - asyncResponse.resume(t); - } - return null; - } } diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java index 6c99a2c..e113d40 100644 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java @@ -18,12 +18,8 @@ */ package org.apache.cxf.jaxrs.rx2.server; -import java.util.concurrent.CancellationException; - -import javax.ws.rs.core.MediaType; - -import org.apache.cxf.jaxrs.JAXRSInvoker; import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; +import org.apache.cxf.jaxrs.reactivestreams.server.AbstractReactiveInvoker; import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import org.apache.cxf.message.Message; @@ -31,9 +27,7 @@ import io.reactivex.Flowable; import io.reactivex.Observable; import io.reactivex.Single; -//Work in Progress -public class ReactiveIOInvoker extends JAXRSInvoker { - private boolean useStreamingSubscriberIfPossible; +public class ReactiveIOInvoker extends AbstractReactiveInvoker { protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { if (result instanceof Flowable) { return handleFlowable(inMessage, (Flowable<?>)result); @@ -62,30 +56,10 @@ public class ReactiveIOInvoker extends JAXRSInvoker { return asyncResponse; } - protected boolean isJsonResponse(Message inMessage) { - return MediaType.APPLICATION_JSON.equals(inMessage.getExchange().get(Message.CONTENT_TYPE)); - } - protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) { final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); return asyncResponse; } - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - if (t instanceof CancellationException) { - asyncResponse.cancel(); - } else { - asyncResponse.resume(t); - } - return null; - } - - public boolean isUseStreamingSubscriberIfPossible() { - return useStreamingSubscriberIfPossible; - } - - public void setUseStreamingSubscriberIfPossible(boolean useStreamingSubscriberIfPossible) { - this.useStreamingSubscriberIfPossible = useStreamingSubscriberIfPossible; - } } diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java index 3065dd5..46a83b1 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxReactorTest.java @@ -64,6 +64,11 @@ public class FluxReactorTest extends AbstractBusClientServerTestBase { String address = "http://localhost:" + PORT + "/reactor/flux/textJsonImplicitListAsyncStream"; doTestTextJsonImplicitListAsyncStream(address); } + @Test + public void testTextJsonImplicitListAsyncStream2() throws Exception { + String address = "http://localhost:" + PORT + "/reactor2/flux/textJsonImplicitListAsyncStream2"; + doTestTextJsonImplicitListAsyncStream(address); + } private void doTestTextJsonImplicitListAsyncStream(String address) throws Exception { List<HelloWorldBean> holder = new ArrayList<>(); ClientBuilder.newClient() diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java index d96d713..fb8d12b 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/FluxService.java @@ -28,7 +28,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; -@Path("/reactor/flux") +@Path("/flux") public class FluxService { @GET diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java index 8ded540..1eab9f4 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/MonoService.java @@ -29,7 +29,7 @@ import org.apache.cxf.jaxrs.reactivestreams.server.JsonStreamingAsyncSubscriber; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -@Path("/reactor/mono") +@Path("/mono") public class MonoService { @GET diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java index 831d903..c056dbc 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactor/ReactorServer.java @@ -33,8 +33,9 @@ import org.apache.cxf.testutil.common.AbstractBusTestServerBase; public class ReactorServer extends AbstractBusTestServerBase { public static final String PORT = allocatePort(ReactorServer.class); - org.apache.cxf.endpoint.Server server; - + org.apache.cxf.endpoint.Server server1; + org.apache.cxf.endpoint.Server server2; + @Override protected void run() { Bus bus = BusFactory.getDefaultBus(); @@ -52,15 +53,34 @@ public class ReactorServer extends AbstractBusTestServerBase { new SingletonResourceProvider(new FluxService(), true)); sf.setResourceProvider(MonoService.class, new SingletonResourceProvider(new MonoService(), true)); - sf.setAddress("http://localhost:" + PORT + "/"); - server = sf.create(); + sf.setAddress("http://localhost:" + PORT + "/reactor"); + server1 = sf.create(); + + JAXRSServerFactoryBean sf2 = new JAXRSServerFactoryBean(); + ReactorInvoker invoker2 = new ReactorInvoker(); + invoker2.setUseStreamingSubscriberIfPossible(true); + sf2.setInvoker(invoker2); + StreamingResponseProvider<HelloWorldBean> streamProvider2 = new StreamingResponseProvider<HelloWorldBean>(); + streamProvider2.setProduceMediaTypes(Collections.singletonList("application/json")); + sf2.setProvider(streamProvider2); + sf2.setProvider(new JacksonJsonProvider()); + sf2.getOutInterceptors().add(new LoggingOutInterceptor()); + sf2.setResourceClasses(FluxService.class); + sf2.setResourceProvider(FluxService.class, + new SingletonResourceProvider(new FluxService(), true)); + sf2.setAddress("http://localhost:" + PORT + "/reactor2"); + server2 = sf2.create(); } @Override public void tearDown() throws Exception { - server.stop(); - server.destroy(); - server = null; + server1.stop(); + server1.destroy(); + server1 = null; + + server2.stop(); + server2.destroy(); + server2 = null; } public static void main(String[] args) { -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
