Repository: cxf Updated Branches: refs/heads/master bd6852d25 -> 18720623e
[CXF-6889] Simplifying the rx client code Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/18720623 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/18720623 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/18720623 Branch: refs/heads/master Commit: 18720623e9d86e68566346981cebaefc7995d910 Parents: bd6852d Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Mon Jul 10 16:22:26 2017 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Mon Jul 10 16:22:26 2017 +0100 ---------------------------------------------------------------------- .../apache/cxf/jaxrs/client/AsyncClient.java | 32 ------- .../client/CompletionStageRxInvokerImpl.java | 30 ++++++- .../jaxrs/client/JaxrsClientStageCallback.java | 89 -------------------- .../org/apache/cxf/jaxrs/client/WebClient.java | 16 +--- .../client/JaxrsClientObservableCallback.java | 47 ----------- .../rx/client/ObservableRxInvokerImpl.java | 46 +++++----- 6 files changed, 55 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java deleted file mode 100644 index 2e1075e..0000000 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.jaxrs.client; - -import java.lang.reflect.Type; - -//Work in progress. May be removed once the Rx client work is finalized -public interface AsyncClient { - void prepareAsyncClient(String httpMethod, - Object body, - Class<?> requestClass, - Type inType, - Class<?> respClass, - Type outType, - JaxrsClientCallback<?> cb); -} http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java index 528d840..381966c 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java @@ -18,6 +18,7 @@ */ package org.apache.cxf.jaxrs.client; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; @@ -142,22 +143,43 @@ public class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker { @Override public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) { - return wc.doInvokeAsyncStage(name, entity, responseType, responseType, ex); + if (ex == null) { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, entity, responseType)); + } else { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, entity, responseType), ex); + } + + //return wc.doInvokeAsyncStage(name, entity, responseType, responseType, ex); } @Override public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) { - return wc.doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex); + if (ex == null) { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, entity, responseType)); + } else { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, entity, responseType), ex); + } + //return wc.doInvokeAsyncStage(name, entity, responseType.getRawType(), responseType.getType(), ex); } @Override public <T> CompletionStage<T> method(String name, Class<T> responseType) { - return wc.doInvokeAsyncStage(name, null, responseType, responseType, ex); + if (ex == null) { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, responseType)); + } else { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, responseType), ex); + } + //return wc.doInvokeAsyncStage(name, null, responseType, responseType, ex); } @Override public <T> CompletionStage<T> method(String name, GenericType<T> responseType) { - return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex); + if (ex == null) { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, responseType)); + } else { + return CompletableFuture.supplyAsync(() -> wc.sync().method(name, responseType), ex); + } + //return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), responseType.getType(), ex); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java deleted file mode 100644 index 578ce12..0000000 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.jaxrs.client; - -import java.lang.reflect.Type; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.function.Supplier; - -public class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T> { - private CompletableFuture<T> cf; - - public JaxrsClientStageCallback(Class<?> responseClass, - Type outGenericType, - Executor ex) { - super(null, responseClass, outGenericType); - - Supplier<T> supplier = new SupplierImpl(); - cf = ex == null ? CompletableFuture.supplyAsync(supplier) - : CompletableFuture.supplyAsync(supplier, ex); - } - - public CompletionStage<T> getCompletionStage() { - return cf; - } - - @Override - public void handleResponse(Map<String, Object> ctx, Object[] res) { - context = ctx; - result = res; - done = true; - synchronized (this) { - notifyAll(); - } - } - - @Override - public void handleException(Map<String, Object> ctx, final Throwable ex) { - context = ctx; - exception = ex; - cf.completeExceptionally(ex); - done = true; - synchronized (this) { - notifyAll(); - } - } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean result = super.cancel(mayInterruptIfRunning); - if (result) { - cf.cancel(mayInterruptIfRunning); - } - return result; - } - - private class SupplierImpl implements Supplier<T> { - - @SuppressWarnings("unchecked") - @Override - public T get() { - try { - return (T)JaxrsClientStageCallback.this.get()[0]; - } catch (Exception ex) { - cf.completeExceptionally(ex); - return null; - } - } - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java index 44de0da..92f9714 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java @@ -28,7 +28,6 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -82,7 +81,7 @@ import org.apache.cxf.message.Message; * Http-centric web client * */ -public class WebClient extends AbstractClient implements AsyncClient { +public class WebClient extends AbstractClient { private static final String REQUEST_CLASS = "request.class"; private static final String REQUEST_TYPE = "request.type"; private static final String REQUEST_ANNS = "request.annotations"; @@ -930,18 +929,7 @@ public class WebClient extends AbstractClient implements AsyncClient { return cb.createFuture(); } - protected <T> CompletionStage<T> doInvokeAsyncStage(String httpMethod, - Object body, - Class<?> respClass, - Type outType, - ExecutorService ex) { - JaxrsClientStageCallback<T> cb = new JaxrsClientStageCallback<T>(respClass, outType, ex); - prepareAsyncClient(httpMethod, body, null, null, respClass, outType, cb); - return cb.getCompletionStage(); - } - - @Override - public void prepareAsyncClient(String httpMethod, + protected void prepareAsyncClient(String httpMethod, Object body, Class<?> requestClass, Type inType, http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java deleted file mode 100644 index e25ce2e..0000000 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cxf.jaxrs.rx.client; - -import java.lang.reflect.Type; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; - -import org.apache.cxf.jaxrs.client.JaxrsClientCallback; - -import rx.Observable; -import rx.schedulers.Schedulers; - -public class JaxrsClientObservableCallback<T> extends JaxrsClientCallback<T> { - private Observable<T> observable; - - public JaxrsClientObservableCallback(Class<?> responseClass, - Type outGenericType, - Executor ex) { - super(null, responseClass, outGenericType); - Future<T> f = super.createFuture(); - observable = ex == null ? Observable.from(f) - : Observable.from(f, Schedulers.from(ex)); - } - - public Observable<T> getObservable() { - return observable; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java index 7487070..3a79226 100644 --- a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java +++ b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java @@ -18,7 +18,6 @@ */ package org.apache.cxf.jaxrs.rx.client; -import java.lang.reflect.Type; import java.util.concurrent.ExecutorService; import javax.ws.rs.HttpMethod; @@ -26,16 +25,18 @@ import javax.ws.rs.client.Entity; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.Response; -import org.apache.cxf.jaxrs.client.AsyncClient; +import org.apache.cxf.jaxrs.client.WebClient; import rx.Observable; +import rx.Scheduler; +import rx.schedulers.Schedulers; public class ObservableRxInvokerImpl implements ObservableRxInvoker { - private ExecutorService ex; - private AsyncClient wc; - public ObservableRxInvokerImpl(AsyncClient wc, ExecutorService ex) { + private Scheduler sc; + private WebClient wc; + public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) { this.wc = wc; - this.ex = ex; + this.sc = ex == null ? null : Schedulers.from(ex); } @Override @@ -145,31 +146,38 @@ public class ObservableRxInvokerImpl implements ObservableRxInvoker { @Override public <T> Observable<T> method(String name, Entity<?> entity, Class<T> responseType) { - return doInvokeAsync(name, entity, responseType, responseType); + if (sc == null) { + return Observable.from(wc.async().method(name, entity, responseType)); + } else { + return Observable.from(wc.async().method(name, entity, responseType), sc); + } } @Override public <T> Observable<T> method(String name, Entity<?> entity, GenericType<T> responseType) { - return doInvokeAsync(name, entity, responseType.getRawType(), responseType.getType()); + if (sc == null) { + return Observable.from(wc.async().method(name, entity, responseType)); + } else { + return Observable.from(wc.async().method(name, entity, responseType), sc); + } } @Override public <T> Observable<T> method(String name, Class<T> responseType) { - return doInvokeAsync(name, null, responseType, responseType); + if (sc == null) { + return Observable.from(wc.async().method(name, responseType)); + } else { + return Observable.from(wc.async().method(name, responseType), sc); + } } @Override public <T> Observable<T> method(String name, GenericType<T> responseType) { - return doInvokeAsync(name, null, responseType.getRawType(), responseType.getType()); - } - - protected <T> Observable<T> doInvokeAsync(String httpMethod, - Object body, - Class<?> respClass, - Type outType) { - JaxrsClientObservableCallback<T> cb = new JaxrsClientObservableCallback<T>(respClass, outType, ex); - wc.prepareAsyncClient(httpMethod, body, null, null, respClass, outType, cb); - return cb.getObservable(); + if (sc == null) { + return Observable.from(wc.async().method(name, responseType)); + } else { + return Observable.from(wc.async().method(name, responseType), sc); + } } }