Repository: cxf Updated Branches: refs/heads/master a55635031 -> 0c5d4032d
[CXF-7487] Basic RxJava2 Observable support, Flowable to follow later Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0c5d4032 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0c5d4032 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0c5d4032 Branch: refs/heads/master Commit: 0c5d4032d3ba7837e662abc192a27ae2814b21c9 Parents: a556350 Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Mon Aug 28 17:36:37 2017 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Mon Aug 28 17:36:37 2017 +0100 ---------------------------------------------------------------------- parent/pom.xml | 10 +- rt/rs/extensions/rx/pom.xml | 6 + .../jaxrs/rx2/client/ObservableRxInvoker.java | 107 +++++++++++ .../rx2/client/ObservableRxInvokerImpl.java | 180 +++++++++++++++++++ .../rx2/client/ObservableRxInvokerProvider.java | 45 +++++ .../cxf/jaxrs/rx2/server/ObservableInvoker.java | 43 +++++ systests/jaxrs/pom.xml | 7 +- .../jaxrs/reactive/JAXRSObservableTest.java | 141 --------------- .../jaxrs/reactive/JAXRSRxJava2Test.java | 68 +++++++ .../systest/jaxrs/reactive/JAXRSRxJavaTest.java | 141 +++++++++++++++ .../jaxrs/reactive/ObservableServer.java | 79 -------- .../jaxrs/reactive/ObservableService.java | 122 ------------- .../reactive/RxJava2ObservableService.java | 43 +++++ .../systest/jaxrs/reactive/RxJava2Server.java | 73 ++++++++ .../jaxrs/reactive/RxJavaObservableService.java | 122 +++++++++++++ .../systest/jaxrs/reactive/RxJavaServer.java | 79 ++++++++ 16 files changed, 921 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 424e28a..8d8e77b 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -101,7 +101,8 @@ <cxf.log4j.version>1.2.17</cxf.log4j.version> <cxf.lucene.version>4.9.0</cxf.lucene.version> <cxf.mina.version>2.0.14</cxf.mina.version> - <cxf.rx.java.version>1.2.7</cxf.rx.java.version> + <cxf.rxjava.version>1.3.0</cxf.rxjava.version> + <cxf.rxjava2.version>2.1.3</cxf.rxjava2.version> <cxf.javax.annotation-api.version>1.3</cxf.javax.annotation-api.version> <cxf.jcache.version>1.0.0</cxf.jcache.version> <cxf.geronimo.jms.version>1.1.1</cxf.geronimo.jms.version> @@ -819,7 +820,12 @@ <dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> - <version>${cxf.rx.java.version}</version> + <version>${cxf.rxjava.version}</version> + </dependency> + <dependency> + <groupId>io.reactivex.rxjava2</groupId> + <artifactId>rxjava</artifactId> + <version>${cxf.rxjava2.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/pom.xml ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/pom.xml b/rt/rs/extensions/rx/pom.xml index 13bf166..d6b0360 100644 --- a/rt/rs/extensions/rx/pom.xml +++ b/rt/rs/extensions/rx/pom.xml @@ -48,6 +48,12 @@ <optional>true</optional> </dependency> <dependency> + <groupId>io.reactivex.rxjava2</groupId> + <artifactId>rxjava</artifactId> + <scope>provided</scope> + <optional>true</optional> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java new file mode 100644 index 0000000..41d1ec9 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvoker.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.client; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.RxInvoker; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; + +import io.reactivex.Observable; + + +@SuppressWarnings("rawtypes") +public interface ObservableRxInvoker extends RxInvoker<Observable> { + + @Override + Observable<Response> get(); + + @Override + <T> Observable<T> get(Class<T> responseType); + + @Override + <T> Observable<T> get(GenericType<T> responseType); + + @Override + Observable<Response> put(Entity<?> entity); + + @Override + <T> Observable<T> put(Entity<?> entity, Class<T> clazz); + + @Override + <T> Observable<T> put(Entity<?> entity, GenericType<T> type); + + @Override + Observable<Response> post(Entity<?> entity); + + @Override + <T> Observable<T> post(Entity<?> entity, Class<T> clazz); + + @Override + <T> Observable<T> post(Entity<?> entity, GenericType<T> type); + + @Override + Observable<Response> delete(); + + @Override + <T> Observable<T> delete(Class<T> responseType); + + @Override + <T> Observable<T> delete(GenericType<T> responseType); + + @Override + Observable<Response> head(); + + @Override + Observable<Response> options(); + + @Override + <T> Observable<T> options(Class<T> responseType); + + @Override + <T> Observable<T> options(GenericType<T> responseType); + + @Override + Observable<Response> trace(); + + @Override + <T> Observable<T> trace(Class<T> responseType); + + @Override + <T> Observable<T> trace(GenericType<T> responseType); + + @Override + Observable<Response> method(String name); + + @Override + <T> Observable<T> method(String name, Class<T> responseType); + + @Override + <T> Observable<T> method(String name, GenericType<T> responseType); + + @Override + Observable<Response> method(String name, Entity<?> entity); + + @Override + <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType); + + @Override + <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType); +} + http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java new file mode 100644 index 0000000..2c1f966 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerImpl.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.client; + +import java.util.concurrent.ExecutorService; + +import javax.ws.rs.HttpMethod; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.Response; + +import org.apache.cxf.jaxrs.client.WebClient; + +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.schedulers.Schedulers; + + +public class ObservableRxInvokerImpl implements ObservableRxInvoker { + private Scheduler sc; + private WebClient wc; + public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) { + this.wc = wc; + this.sc = ex == null ? null : Schedulers.from(ex); + } + + @Override + public Observable<Response> get() { + return get(Response.class); + } + + @Override + public <T> Observable<T> get(Class<T> responseType) { + return method(HttpMethod.GET, responseType); + } + + @Override + public <T> Observable<T> get(GenericType<T> responseType) { + return method(HttpMethod.GET, responseType); + } + + @Override + public Observable<Response> put(Entity<?> entity) { + return put(entity, Response.class); + } + + @Override + public <T> Observable<T> put(Entity<?> entity, Class<T> responseType) { + return method(HttpMethod.PUT, entity, responseType); + } + + @Override + public <T> Observable<T> put(Entity<?> entity, GenericType<T> responseType) { + return method(HttpMethod.PUT, entity, responseType); + } + + @Override + public Observable<Response> post(Entity<?> entity) { + return post(entity, Response.class); + } + + @Override + public <T> Observable<T> post(Entity<?> entity, Class<T> responseType) { + return method(HttpMethod.POST, entity, responseType); + } + + @Override + public <T> Observable<T> post(Entity<?> entity, GenericType<T> responseType) { + return method(HttpMethod.POST, entity, responseType); + } + + @Override + public Observable<Response> delete() { + return delete(Response.class); + } + + @Override + public <T> Observable<T> delete(Class<T> responseType) { + return method(HttpMethod.DELETE, responseType); + } + + @Override + public <T> Observable<T> delete(GenericType<T> responseType) { + return method(HttpMethod.DELETE, responseType); + } + + @Override + public Observable<Response> head() { + return method(HttpMethod.HEAD); + } + + @Override + public Observable<Response> options() { + return options(Response.class); + } + + @Override + public <T> Observable<T> options(Class<T> responseType) { + return method(HttpMethod.OPTIONS, responseType); + } + + @Override + public <T> Observable<T> options(GenericType<T> responseType) { + return method(HttpMethod.OPTIONS, responseType); + } + + @Override + public Observable<Response> trace() { + return trace(Response.class); + } + + @Override + public <T> Observable<T> trace(Class<T> responseType) { + return method("TRACE", responseType); + } + + @Override + public <T> Observable<T> trace(GenericType<T> responseType) { + return method("TRACE", responseType); + } + + @Override + public Observable<Response> method(String name) { + return method(name, Response.class); + } + + @Override + public Observable<Response> method(String name, Entity<?> entity) { + return method(name, entity, Response.class); + } + + @Override + public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) { + if (sc == null) { + return Observable.fromFuture(wc.async().method(name, entity, responseType)); + } + return Observable.fromFuture(wc.async().method(name, entity, responseType), sc); + } + + @Override + public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) { + if (sc == null) { + return Observable.fromFuture(wc.async().method(name, entity, responseType)); + } + return Observable.fromFuture(wc.async().method(name, entity, responseType), sc); + } + + @Override + public <T> Observable<T> method(String name, Class<T> responseType) { + if (sc == null) { + return Observable.fromFuture(wc.async().method(name, responseType)); + } + return Observable.fromFuture(wc.async().method(name, responseType), sc); + } + + @Override + public <T> Observable<T> method(String name, GenericType<T> responseType) { + if (sc == null) { + return Observable.fromFuture(wc.async().method(name, responseType)); + } + return Observable.fromFuture(wc.async().method(name, responseType), sc); + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java new file mode 100644 index 0000000..221bc48 --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/client/ObservableRxInvokerProvider.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.client; + +import java.util.concurrent.ExecutorService; + +import javax.ws.rs.client.RxInvokerProvider; +import javax.ws.rs.client.SyncInvoker; +import javax.ws.rs.ext.Provider; + +import org.apache.cxf.jaxrs.client.SyncInvokerImpl; + +@Provider +public class ObservableRxInvokerProvider implements RxInvokerProvider<ObservableRxInvoker> { + + @Override + public ObservableRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) { + // TODO: At the moment we still delegate if possible to the async HTTP conduit. + // Investigate if letting the RxJava thread pool deal with the sync invocation + // is indeed more effective + return new ObservableRxInvokerImpl(((SyncInvokerImpl)syncInvoker).getWebClient(), executorService); + } + + @Override + public boolean isProviderFor(Class<?> rxCls) { + return ObservableRxInvoker.class == rxCls; + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java new file mode 100644 index 0000000..8047c6a --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.server; + +import org.apache.cxf.jaxrs.JAXRSInvoker; +import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; +import org.apache.cxf.message.Message; + +import io.reactivex.Observable; + +public class ObservableInvoker extends JAXRSInvoker { + protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { + if (result instanceof Observable) { + final Observable<?> obs = (Observable<?>)result; + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + return asyncResponse; + } + return null; + } + + private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { + //TODO: if it is a Cancelation exception => asyncResponse.cancel(); + asyncResponse.resume(t); + return null; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/pom.xml ---------------------------------------------------------------------- diff --git a/systests/jaxrs/pom.xml b/systests/jaxrs/pom.xml index b515011..542201a 100644 --- a/systests/jaxrs/pom.xml +++ b/systests/jaxrs/pom.xml @@ -60,7 +60,12 @@ <dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava</artifactId> - <version>${cxf.rx.java.version}</version> + <version>${cxf.rxjava.version}</version> + </dependency> + <dependency> + <groupId>io.reactivex.rxjava2</groupId> + <artifactId>rxjava</artifactId> + <version>${cxf.rxjava2.version}</version> </dependency> <!-- <dependency> http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java deleted file mode 100644 index 39d8fd5..0000000 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSObservableTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.systest.jaxrs.reactive; - -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutionException; - -import javax.ws.rs.NotFoundException; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.Invocation; -import javax.ws.rs.core.GenericType; - -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; - -import org.apache.cxf.jaxrs.client.WebClient; -import org.apache.cxf.jaxrs.model.AbstractResourceInfo; -import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker; -import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider; -import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; - -import org.junit.BeforeClass; -import org.junit.Test; - -import rx.Observable; - -public class JAXRSObservableTest extends AbstractBusClientServerTestBase { - public static final String PORT = ObservableServer.PORT; - @BeforeClass - public static void startServers() throws Exception { - AbstractResourceInfo.clearAllMaps(); - assertTrue("server did not launch correctly", - launchServer(ObservableServer.class, true)); - createStaticBus(); - } - @Test - public void testGetHelloWorldText() throws Exception { - String address = "http://localhost:" + PORT + "/observable/text"; - WebClient wc = WebClient.create(address); - String text = wc.accept("text/plain").get(String.class); - assertEquals("Hello, world!", text); - } - @Test - public void testGetHelloWorldAsyncText() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textAsync"; - WebClient wc = WebClient.create(address); - String text = wc.accept("text/plain").get(String.class); - assertEquals("Hello, world!", text); - } - - @Test - public void testGetHelloWorldJson() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textJson"; - WebClient wc = WebClient.create(address, - Collections.singletonList(new JacksonJsonProvider())); - HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class); - assertEquals("Hello", bean.getGreeting()); - assertEquals("World", bean.getAudience()); - } - @Test - public void testGetHelloWorldJsonList() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textJsonList"; - doTestGetHelloWorldJsonList(address); - } - @Test - public void testGetHelloWorldJsonImplicitListAsync() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync"; - doTestGetHelloWorldJsonList(address); - } - @Test - public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream"; - doTestGetHelloWorldJsonList(address); - } - private void doTestGetHelloWorldJsonList(String address) throws Exception { - WebClient wc = WebClient.create(address, - Collections.singletonList(new JacksonJsonProvider())); - WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000); - GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() { - }; - - List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType); - assertEquals(2, beans.size()); - assertEquals("Hello", beans.get(0).getGreeting()); - assertEquals("World", beans.get(0).getAudience()); - assertEquals("Ciao", beans.get(1).getGreeting()); - assertEquals("World", beans.get(1).getAudience()); - } - - @Test - public void testGetHelloWorldAsyncObservable() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textAsync"; - WebClient wc = WebClient.create(address, - Collections.singletonList(new ObservableRxInvokerProvider())); - Observable<String> obs = wc.accept("text/plain") - .rx(ObservableRxInvoker.class) - .get(String.class); - obs.map(s -> { - return s + s; - }); - - Thread.sleep(3000); - - obs.subscribe(s -> assertDuplicateResponse(s)); - } - @Test - public void testGetHelloWorldAsyncObservable404() throws Exception { - String address = "http://localhost:" + PORT + "/observable/textAsync404"; - Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider()) - .target(address).request(); - b.rx(ObservableRxInvoker.class).get(String.class).subscribe( - s -> { - fail("Exception expected"); - }, - t -> validateT((ExecutionException)t)); - } - - private void validateT(ExecutionException t) { - assertTrue(t.getCause() instanceof NotFoundException); - } - private void assertDuplicateResponse(String s) { - assertEquals("Hello, world!Hello, world!", s); - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java new file mode 100644 index 0000000..ded2799 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJava2Test.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import java.util.LinkedList; +import java.util.List; + +import javax.xml.ws.Holder; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvoker; +import org.apache.cxf.jaxrs.rx2.client.ObservableRxInvokerProvider; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import org.junit.BeforeClass; +import org.junit.Test; + +import io.reactivex.Observable; + +public class JAXRSRxJava2Test extends AbstractBusClientServerTestBase { + public static final String PORT = RxJava2Server.PORT; + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", + launchServer(RxJava2Server.class, true)); + createStaticBus(); + } + @Test + public void testGetHelloWorldJson() throws Exception { + String address = "http://localhost:" + PORT + "/observable2/textJson"; + List<Object> providers = new LinkedList<>(); + providers.add(new JacksonJsonProvider()); + providers.add(new ObservableRxInvokerProvider()); + WebClient wc = WebClient.create(address, providers); + Observable<HelloWorldBean> obs = wc.accept("application/json") + .rx(ObservableRxInvoker.class) + .get(HelloWorldBean.class); + + Holder<HelloWorldBean> holder = new Holder<HelloWorldBean>(); + obs.subscribe(v -> { + holder.value = v; + }); + Thread.sleep(3000); + assertEquals("Hello", holder.value.getGreeting()); + assertEquals("World", holder.value.getAudience()); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java new file mode 100644 index 0000000..9f197d8 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/JAXRSRxJavaTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import javax.ws.rs.NotFoundException; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.GenericType; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.rx.client.ObservableRxInvoker; +import org.apache.cxf.jaxrs.rx.client.ObservableRxInvokerProvider; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import org.junit.BeforeClass; +import org.junit.Test; + +import rx.Observable; + +public class JAXRSRxJavaTest extends AbstractBusClientServerTestBase { + public static final String PORT = RxJavaServer.PORT; + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", + launchServer(RxJavaServer.class, true)); + createStaticBus(); + } + @Test + public void testGetHelloWorldText() throws Exception { + String address = "http://localhost:" + PORT + "/observable/text"; + WebClient wc = WebClient.create(address); + String text = wc.accept("text/plain").get(String.class); + assertEquals("Hello, world!", text); + } + @Test + public void testGetHelloWorldAsyncText() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textAsync"; + WebClient wc = WebClient.create(address); + String text = wc.accept("text/plain").get(String.class); + assertEquals("Hello, world!", text); + } + + @Test + public void testGetHelloWorldJson() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textJson"; + WebClient wc = WebClient.create(address, + Collections.singletonList(new JacksonJsonProvider())); + HelloWorldBean bean = wc.accept("application/json").get(HelloWorldBean.class); + assertEquals("Hello", bean.getGreeting()); + assertEquals("World", bean.getAudience()); + } + @Test + public void testGetHelloWorldJsonList() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textJsonList"; + doTestGetHelloWorldJsonList(address); + } + @Test + public void testGetHelloWorldJsonImplicitListAsync() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsync"; + doTestGetHelloWorldJsonList(address); + } + @Test + public void testGetHelloWorldJsonImplicitListAsyncStream() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textJsonImplicitListAsyncStream"; + doTestGetHelloWorldJsonList(address); + } + private void doTestGetHelloWorldJsonList(String address) throws Exception { + WebClient wc = WebClient.create(address, + Collections.singletonList(new JacksonJsonProvider())); + WebClient.getConfig(wc).getHttpConduit().getClient().setReceiveTimeout(10000000); + GenericType<List<HelloWorldBean>> genericResponseType = new GenericType<List<HelloWorldBean>>() { + }; + + List<HelloWorldBean> beans = wc.accept("application/json").get(genericResponseType); + assertEquals(2, beans.size()); + assertEquals("Hello", beans.get(0).getGreeting()); + assertEquals("World", beans.get(0).getAudience()); + assertEquals("Ciao", beans.get(1).getGreeting()); + assertEquals("World", beans.get(1).getAudience()); + } + + @Test + public void testGetHelloWorldAsyncObservable() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textAsync"; + WebClient wc = WebClient.create(address, + Collections.singletonList(new ObservableRxInvokerProvider())); + Observable<String> obs = wc.accept("text/plain") + .rx(ObservableRxInvoker.class) + .get(String.class); + obs.map(s -> { + return s + s; + }); + + Thread.sleep(3000); + + obs.subscribe(s -> assertDuplicateResponse(s)); + } + @Test + public void testGetHelloWorldAsyncObservable404() throws Exception { + String address = "http://localhost:" + PORT + "/observable/textAsync404"; + Invocation.Builder b = ClientBuilder.newClient().register(new ObservableRxInvokerProvider()) + .target(address).request(); + b.rx(ObservableRxInvoker.class).get(String.class).subscribe( + s -> { + fail("Exception expected"); + }, + t -> validateT((ExecutionException)t)); + } + + private void validateT(ExecutionException t) { + assertTrue(t.getCause() instanceof NotFoundException); + } + private void assertDuplicateResponse(String s) { + assertEquals("Hello, world!Hello, world!", s); + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java deleted file mode 100644 index 03f89ef..0000000 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableServer.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.systest.jaxrs.reactive; - -import java.util.Collections; - -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; - -import org.apache.cxf.Bus; -import org.apache.cxf.BusFactory; -import org.apache.cxf.ext.logging.LoggingOutInterceptor; -import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; -import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.rx.server.ObservableInvoker; -import org.apache.cxf.testutil.common.AbstractBusTestServerBase; - - -public class ObservableServer extends AbstractBusTestServerBase { - public static final String PORT = allocatePort(ObservableServer.class); - - org.apache.cxf.endpoint.Server server; - public ObservableServer() { - } - - protected void run() { - Bus bus = BusFactory.getDefaultBus(); - // Make sure default JSONProvider is not loaded - bus.setProperty("skip.default.json.provider.registration", true); - JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new ObservableInvoker()); - sf.setProvider(new JacksonJsonProvider()); - StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); - streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); - sf.setProvider(streamProvider); - sf.getOutInterceptors().add(new LoggingOutInterceptor()); - sf.setResourceClasses(ObservableService.class); - sf.setResourceProvider(ObservableService.class, - new SingletonResourceProvider(new ObservableService(), true)); - sf.setAddress("http://localhost:" + PORT + "/"); - server = sf.create(); - } - - public void tearDown() throws Exception { - server.stop(); - server.destroy(); - server = null; - } - - public static void main(String[] args) { - try { - ObservableServer s = new ObservableServer(); - s.start(); - } catch (Exception ex) { - ex.printStackTrace(); - System.exit(-1); - } finally { - System.out.println("done!"); - } - } - -} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java deleted file mode 100644 index 00783fd..0000000 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/ObservableService.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.systest.jaxrs.reactive; - - -import java.util.Arrays; -import java.util.List; - -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.container.Suspended; - -import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber; -import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber; -import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -@Path("/observable") -public class ObservableService { - - @GET - @Produces("text/plain") - @Path("text") - public Observable<String> getText() { - return Observable.just("Hello, world!"); - } - - @GET - @Produces("text/plain") - @Path("textAsync") - public void getTextAsync(@Suspended final AsyncResponse ar) { - Observable.just("Hello, ").map(s -> s + "world!") - .subscribe(new StringAsyncSubscriber(ar)); - - } - - @GET - @Produces("application/json") - @Path("textJson") - public Observable<HelloWorldBean> getJson() { - return Observable.just(new HelloWorldBean()); - } - - @GET - @Produces("application/json") - @Path("textJsonImplicitListAsync") - public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) { - final HelloWorldBean bean1 = new HelloWorldBean(); - final HelloWorldBean bean2 = new HelloWorldBean("Ciao"); - new Thread(new Runnable() { - public void run() { - try { - Thread.sleep(2000); - } catch (InterruptedException ex) { - // ignore - } - Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar)); - } - }).start(); - - } - @GET - @Produces("application/json") - @Path("textJsonImplicitListAsyncStream") - public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { - Observable.just("Hello", "Ciao") - .map(s -> new HelloWorldBean(s)) - .subscribeOn(Schedulers.computation()) - .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar)); - } - @GET - @Produces("application/json") - @Path("textJsonList") - public Observable<List<HelloWorldBean>> getJsonList() { - HelloWorldBean bean1 = new HelloWorldBean(); - HelloWorldBean bean2 = new HelloWorldBean(); - bean2.setGreeting("Ciao"); - return Observable.just(Arrays.asList(bean1, bean2)); - } - - private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> { - - private StringBuilder sb = new StringBuilder(); - StringAsyncSubscriber(AsyncResponse ar) { - super(ar); - } - @Override - public void onCompleted() { - super.resume(sb.toString()); - } - - @Override - public void onNext(String s) { - sb.append(s); - } - - } -} - - http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java new file mode 100644 index 0000000..28d053e --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableService.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import io.reactivex.Observable; + + +@Path("/observable2") +public class RxJava2ObservableService { + + + @GET + @Produces("application/json") + @Path("textJson") + public Observable<HelloWorldBean> getJson() { + return Observable.just(new HelloWorldBean()); + } + +} + + http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java new file mode 100644 index 0000000..f9ab3ae --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2Server.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + + +public class RxJava2Server extends AbstractBusTestServerBase { + public static final String PORT = allocatePort(RxJava2Server.class); + + org.apache.cxf.endpoint.Server server; + public RxJava2Server() { + } + + protected void run() { + Bus bus = BusFactory.getDefaultBus(); + // Make sure default JSONProvider is not loaded + bus.setProperty("skip.default.json.provider.registration", true); + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + sf.setInvoker(new ObservableInvoker()); + sf.setProvider(new JacksonJsonProvider()); + sf.getOutInterceptors().add(new LoggingOutInterceptor()); + sf.setResourceClasses(RxJava2ObservableService.class); + sf.setResourceProvider(RxJava2ObservableService.class, + new SingletonResourceProvider(new RxJava2ObservableService(), true)); + sf.setAddress("http://localhost:" + PORT + "/"); + server = sf.create(); + } + + public void tearDown() throws Exception { + server.stop(); + server.destroy(); + server = null; + } + + public static void main(String[] args) { + try { + RxJava2Server s = new RxJava2Server(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + +} http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java new file mode 100644 index 0000000..de0f91f --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaObservableService.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + + +import java.util.Arrays; +import java.util.List; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; + +import org.apache.cxf.jaxrs.rx.server.AbstractAsyncSubscriber; +import org.apache.cxf.jaxrs.rx.server.JsonStreamingAsyncSubscriber; +import org.apache.cxf.jaxrs.rx.server.ListAsyncSubscriber; + +import rx.Observable; +import rx.schedulers.Schedulers; + + +@Path("/observable") +public class RxJavaObservableService { + + @GET + @Produces("text/plain") + @Path("text") + public Observable<String> getText() { + return Observable.just("Hello, world!"); + } + + @GET + @Produces("text/plain") + @Path("textAsync") + public void getTextAsync(@Suspended final AsyncResponse ar) { + Observable.just("Hello, ").map(s -> s + "world!") + .subscribe(new StringAsyncSubscriber(ar)); + + } + + @GET + @Produces("application/json") + @Path("textJson") + public Observable<HelloWorldBean> getJson() { + return Observable.just(new HelloWorldBean()); + } + + @GET + @Produces("application/json") + @Path("textJsonImplicitListAsync") + public void getJsonImplicitListAsync(@Suspended AsyncResponse ar) { + final HelloWorldBean bean1 = new HelloWorldBean(); + final HelloWorldBean bean2 = new HelloWorldBean("Ciao"); + new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(2000); + } catch (InterruptedException ex) { + // ignore + } + Observable.just(bean1, bean2).subscribe(new ListAsyncSubscriber<HelloWorldBean>(ar)); + } + }).start(); + + } + @GET + @Produces("application/json") + @Path("textJsonImplicitListAsyncStream") + public void getJsonImplicitListStreamingAsync(@Suspended AsyncResponse ar) { + Observable.just("Hello", "Ciao") + .map(s -> new HelloWorldBean(s)) + .subscribeOn(Schedulers.computation()) + .subscribe(new JsonStreamingAsyncSubscriber<HelloWorldBean>(ar)); + } + @GET + @Produces("application/json") + @Path("textJsonList") + public Observable<List<HelloWorldBean>> getJsonList() { + HelloWorldBean bean1 = new HelloWorldBean(); + HelloWorldBean bean2 = new HelloWorldBean(); + bean2.setGreeting("Ciao"); + return Observable.just(Arrays.asList(bean1, bean2)); + } + + private class StringAsyncSubscriber extends AbstractAsyncSubscriber<String> { + + private StringBuilder sb = new StringBuilder(); + StringAsyncSubscriber(AsyncResponse ar) { + super(ar); + } + @Override + public void onCompleted() { + super.resume(sb.toString()); + } + + @Override + public void onNext(String s) { + sb.append(s); + } + + } +} + + http://git-wip-us.apache.org/repos/asf/cxf/blob/0c5d4032/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java new file mode 100644 index 0000000..70f58b3 --- /dev/null +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJavaServer.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs.reactive; + +import java.util.Collections; + +import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.ext.logging.LoggingOutInterceptor; +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; +import org.apache.cxf.jaxrs.rx.server.ObservableInvoker; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + + +public class RxJavaServer extends AbstractBusTestServerBase { + public static final String PORT = allocatePort(RxJavaServer.class); + + org.apache.cxf.endpoint.Server server; + public RxJavaServer() { + } + + protected void run() { + Bus bus = BusFactory.getDefaultBus(); + // Make sure default JSONProvider is not loaded + bus.setProperty("skip.default.json.provider.registration", true); + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + sf.setInvoker(new ObservableInvoker()); + sf.setProvider(new JacksonJsonProvider()); + StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); + streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); + sf.setProvider(streamProvider); + sf.getOutInterceptors().add(new LoggingOutInterceptor()); + sf.setResourceClasses(RxJavaObservableService.class); + sf.setResourceProvider(RxJavaObservableService.class, + new SingletonResourceProvider(new RxJavaObservableService(), true)); + sf.setAddress("http://localhost:" + PORT + "/"); + server = sf.create(); + } + + public void tearDown() throws Exception { + server.stop(); + server.destroy(); + server = null; + } + + public static void main(String[] args) { + try { + RxJavaServer s = new RxJavaServer(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + +}