Repository: cxf Updated Branches: refs/heads/master 0f3e34689 -> 18dd0e1c7
Having a single invoker only for RxJava2 Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/18dd0e1c Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/18dd0e1c Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/18dd0e1c Branch: refs/heads/master Commit: 18dd0e1c754fd30c9ab479cf123db5ab8f6810e7 Parents: 0f3e346 Author: Sergey Beryozkin <[email protected]> Authored: Fri Sep 1 11:00:53 2017 +0100 Committer: Sergey Beryozkin <[email protected]> Committed: Fri Sep 1 11:00:53 2017 +0100 ---------------------------------------------------------------------- .../cxf/jaxrs/rx2/server/FlowableInvoker.java | 43 --------------- .../cxf/jaxrs/rx2/server/ObservableInvoker.java | 43 --------------- .../cxf/jaxrs/rx2/server/ReactiveIOInvoker.java | 57 ++++++++++++++++++++ .../jaxrs/reactive/RxJava2FlowableServer.java | 4 +- .../jaxrs/reactive/RxJava2ObservableServer.java | 4 +- 5 files changed, 61 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java deleted file mode 100644 index 1ff7491..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/FlowableInvoker.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx2.server; - -import org.apache.cxf.jaxrs.JAXRSInvoker; -import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; -import org.apache.cxf.message.Message; - -import io.reactivex.Flowable; - -public class FlowableInvoker extends JAXRSInvoker { - protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { - if (result instanceof Flowable) { - final Flowable<?> f = (Flowable<?>)result; - final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); - return asyncResponse; - } - return null; - } - - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - //TODO: if it is a Cancelation exception => asyncResponse.cancel(); - asyncResponse.resume(t); - return null; - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java deleted file mode 100644 index 8047c6a..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ObservableInvoker.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.rx2.server; - -import org.apache.cxf.jaxrs.JAXRSInvoker; -import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; -import org.apache.cxf.message.Message; - -import io.reactivex.Observable; - -public class ObservableInvoker extends JAXRSInvoker { - protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { - if (result instanceof Observable) { - final Observable<?> obs = (Observable<?>)result; - final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); - obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); - return asyncResponse; - } - return null; - } - - private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { - //TODO: if it is a Cancelation exception => asyncResponse.cancel(); - asyncResponse.resume(t); - return null; - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java new file mode 100644 index 0000000..c529d4a --- /dev/null +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx2/server/ReactiveIOInvoker.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.jaxrs.rx2.server; + +import org.apache.cxf.jaxrs.JAXRSInvoker; +import org.apache.cxf.jaxrs.impl.AsyncResponseImpl; +import org.apache.cxf.message.Message; + +import io.reactivex.Flowable; +import io.reactivex.Observable; + +//Work in Progress +public class ReactiveIOInvoker extends JAXRSInvoker { + protected AsyncResponseImpl checkFutureResponse(Message inMessage, Object result) { + if (result instanceof Flowable) { + return handleFlowable(inMessage, (Flowable<?>)result); + } else if (result instanceof Observable) { + return handleObservable(inMessage, (Observable<?>)result); + } else { + return null; + } + } + + protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + f.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + return asyncResponse; + } + + protected AsyncResponseImpl handleObservable(Message inMessage, Observable<?> obs) { + final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage); + obs.subscribe(v -> asyncResponse.resume(v), t -> handleThrowable(asyncResponse, t)); + return asyncResponse; + } + + private Object handleThrowable(AsyncResponseImpl asyncResponse, Throwable t) { + //TODO: if it is a Cancelation exception => asyncResponse.cancel(); + asyncResponse.resume(t); + return null; + } +} http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java index fe41958..8558bed 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2FlowableServer.java @@ -29,7 +29,7 @@ import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; import org.apache.cxf.jaxrs.provider.StreamingResponseProvider; -import org.apache.cxf.jaxrs.rx2.server.FlowableInvoker; +import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -45,7 +45,7 @@ public class RxJava2FlowableServer extends AbstractBusTestServerBase { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new FlowableInvoker()); + sf.setInvoker(new ReactiveIOInvoker()); sf.setProvider(new JacksonJsonProvider()); StreamingResponseProvider<HelloWorldBean> streamProvider = new StreamingResponseProvider<HelloWorldBean>(); streamProvider.setProduceMediaTypes(Collections.singletonList("application/json")); http://git-wip-us.apache.org/repos/asf/cxf/blob/18dd0e1c/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java index 48df030..a8849d1 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/reactive/RxJava2ObservableServer.java @@ -26,7 +26,7 @@ import org.apache.cxf.BusFactory; import org.apache.cxf.ext.logging.LoggingOutInterceptor; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; -import org.apache.cxf.jaxrs.rx2.server.ObservableInvoker; +import org.apache.cxf.jaxrs.rx2.server.ReactiveIOInvoker; import org.apache.cxf.testutil.common.AbstractBusTestServerBase; @@ -42,7 +42,7 @@ public class RxJava2ObservableServer extends AbstractBusTestServerBase { // Make sure default JSONProvider is not loaded bus.setProperty("skip.default.json.provider.registration", true); JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); - sf.setInvoker(new ObservableInvoker()); + sf.setInvoker(new ReactiveIOInvoker()); sf.setProvider(new JacksonJsonProvider()); sf.getOutInterceptors().add(new LoggingOutInterceptor()); sf.setResourceClasses(RxJava2ObservableService.class);
