Repository: cxf
Updated Branches:
  refs/heads/master bd6852d25 -> 18720623e


[CXF-6889] Simplifying the rx client code


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/18720623
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/18720623
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/18720623

Branch: refs/heads/master
Commit: 18720623e9d86e68566346981cebaefc7995d910
Parents: bd6852d
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Mon Jul 10 16:22:26 2017 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Mon Jul 10 16:22:26 2017 +0100

----------------------------------------------------------------------
 .../apache/cxf/jaxrs/client/AsyncClient.java    | 32 -------
 .../client/CompletionStageRxInvokerImpl.java    | 30 ++++++-
 .../jaxrs/client/JaxrsClientStageCallback.java  | 89 --------------------
 .../org/apache/cxf/jaxrs/client/WebClient.java  | 16 +---
 .../client/JaxrsClientObservableCallback.java   | 47 -----------
 .../rx/client/ObservableRxInvokerImpl.java      | 46 +++++-----
 6 files changed, 55 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
deleted file mode 100644
index 2e1075e..0000000
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/AsyncClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.jaxrs.client;
-
-import java.lang.reflect.Type;
-
-//Work in progress. May be removed once the Rx client work is finalized
-public interface AsyncClient {
-    void prepareAsyncClient(String httpMethod,
-                            Object body,
-                            Class<?> requestClass,
-                            Type inType,
-                            Class<?> respClass,
-                            Type outType,
-                            JaxrsClientCallback<?> cb);
-}

http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
index 528d840..381966c 100644
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/CompletionStageRxInvokerImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.cxf.jaxrs.client;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutorService;
 
@@ -142,22 +143,43 @@ public class CompletionStageRxInvokerImpl implements 
CompletionStageRxInvoker {
 
     @Override
     public <T> CompletionStage<T> method(String name, Entity<?> entity, 
Class<T> responseType) {
-        return wc.doInvokeAsyncStage(name, entity, responseType, responseType, 
ex);
+        if (ex == null) {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
entity, responseType));
+        } else {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
entity, responseType), ex);
+        }
+        
+        //return wc.doInvokeAsyncStage(name, entity, responseType, 
responseType, ex);
     }
 
     @Override
     public <T> CompletionStage<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
-        return wc.doInvokeAsyncStage(name, entity, responseType.getRawType(), 
responseType.getType(), ex);
+        if (ex == null) {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
entity, responseType));
+        } else {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
entity, responseType), ex);
+        }
+        //return wc.doInvokeAsyncStage(name, entity, 
responseType.getRawType(), responseType.getType(), ex);
     }
 
     @Override
     public <T> CompletionStage<T> method(String name, Class<T> responseType) {
-        return wc.doInvokeAsyncStage(name, null, responseType, responseType, 
ex);
+        if (ex == null) {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
responseType));
+        } else {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
responseType), ex);
+        }
+        //return wc.doInvokeAsyncStage(name, null, responseType, responseType, 
ex);
     }
 
     @Override
     public <T> CompletionStage<T> method(String name, GenericType<T> 
responseType) {
-        return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), 
responseType.getType(), ex);
+        if (ex == null) {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
responseType));
+        } else {
+            return CompletableFuture.supplyAsync(() -> wc.sync().method(name, 
responseType), ex);
+        }
+        //return wc.doInvokeAsyncStage(name, null, responseType.getRawType(), 
responseType.getType(), ex);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
deleted file mode 100644
index 578ce12..0000000
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.jaxrs.client;
-
-import java.lang.reflect.Type;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.function.Supplier;
-
-public class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T>  {
-    private CompletableFuture<T> cf;
-
-    public JaxrsClientStageCallback(Class<?> responseClass,
-                             Type outGenericType,
-                             Executor ex) {
-        super(null, responseClass, outGenericType);
-
-        Supplier<T> supplier = new SupplierImpl();
-        cf = ex == null ? CompletableFuture.supplyAsync(supplier)
-            : CompletableFuture.supplyAsync(supplier, ex);
-    }
-
-    public CompletionStage<T> getCompletionStage() {
-        return cf;
-    }
-
-    @Override
-    public void handleResponse(Map<String, Object> ctx, Object[] res) {
-        context = ctx;
-        result = res;
-        done = true;
-        synchronized (this) {
-            notifyAll();
-        }
-    }
-
-    @Override
-    public void handleException(Map<String, Object> ctx, final Throwable ex) {
-        context = ctx;
-        exception = ex;
-        cf.completeExceptionally(ex);
-        done = true;
-        synchronized (this) {
-            notifyAll();
-        }
-    }
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-        boolean result = super.cancel(mayInterruptIfRunning);
-        if (result) {
-            cf.cancel(mayInterruptIfRunning);
-        }
-        return result;
-    }
-
-    private class SupplierImpl implements Supplier<T> {
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public T get() {
-            try {
-                return (T)JaxrsClientStageCallback.this.get()[0];
-            } catch (Exception ex) {
-                cf.completeExceptionally(ex);
-                return null;
-            }
-        }
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
index 44de0da..92f9714 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
@@ -28,7 +28,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
@@ -82,7 +81,7 @@ import org.apache.cxf.message.Message;
  * Http-centric web client
  *
  */
-public class WebClient extends AbstractClient implements AsyncClient {
+public class WebClient extends AbstractClient {
     private static final String REQUEST_CLASS = "request.class";
     private static final String REQUEST_TYPE = "request.type";
     private static final String REQUEST_ANNS = "request.annotations";
@@ -930,18 +929,7 @@ public class WebClient extends AbstractClient implements 
AsyncClient {
         return cb.createFuture();
     }
 
-    protected <T> CompletionStage<T> doInvokeAsyncStage(String httpMethod,
-                                          Object body,
-                                          Class<?> respClass,
-                                          Type outType,
-                                          ExecutorService ex) {
-        JaxrsClientStageCallback<T> cb = new 
JaxrsClientStageCallback<T>(respClass, outType, ex);
-        prepareAsyncClient(httpMethod, body, null, null, respClass, outType, 
cb);
-        return cb.getCompletionStage();
-    }
-
-    @Override
-    public void prepareAsyncClient(String httpMethod,
+    protected void prepareAsyncClient(String httpMethod,
                                    Object body,
                                    Class<?> requestClass,
                                    Type inType,

http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
deleted file mode 100644
index e25ce2e..0000000
--- 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/JaxrsClientObservableCallback.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.jaxrs.rx.client;
-
-import java.lang.reflect.Type;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-
-import org.apache.cxf.jaxrs.client.JaxrsClientCallback;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-public class JaxrsClientObservableCallback<T> extends JaxrsClientCallback<T> {
-    private Observable<T> observable;
-
-    public JaxrsClientObservableCallback(Class<?> responseClass,
-                                  Type outGenericType,
-                                  Executor ex) {
-        super(null, responseClass, outGenericType);
-        Future<T> f = super.createFuture();
-        observable = ex == null ? Observable.from(f)
-            : Observable.from(f, Schedulers.from(ex));
-    }
-
-    public Observable<T> getObservable() {
-        return observable;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/18720623/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
index 7487070..3a79226 100644
--- 
a/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
+++ 
b/rt/rs/extensions/rx/src/main/java/org/apache/cxf/jaxrs/rx/client/ObservableRxInvokerImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.cxf.jaxrs.rx.client;
 
-import java.lang.reflect.Type;
 import java.util.concurrent.ExecutorService;
 
 import javax.ws.rs.HttpMethod;
@@ -26,16 +25,18 @@ import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.Response;
 
-import org.apache.cxf.jaxrs.client.AsyncClient;
+import org.apache.cxf.jaxrs.client.WebClient;
 
 import rx.Observable;
+import rx.Scheduler;
+import rx.schedulers.Schedulers;
 
 public class ObservableRxInvokerImpl implements ObservableRxInvoker {
-    private ExecutorService ex;
-    private AsyncClient wc;
-    public ObservableRxInvokerImpl(AsyncClient wc, ExecutorService ex) {
+    private Scheduler sc;
+    private WebClient wc;
+    public ObservableRxInvokerImpl(WebClient wc, ExecutorService ex) {
         this.wc = wc;
-        this.ex = ex;
+        this.sc = ex == null ? null : Schedulers.from(ex);
     }
 
     @Override
@@ -145,31 +146,38 @@ public class ObservableRxInvokerImpl implements 
ObservableRxInvoker {
 
     @Override
     public <T> Observable<T> method(String name, Entity<?> entity, Class<T> 
responseType) {
-        return doInvokeAsync(name, entity, responseType, responseType);
+        if (sc == null) {
+            return Observable.from(wc.async().method(name, entity, 
responseType));
+        } else {
+            return Observable.from(wc.async().method(name, entity, 
responseType), sc);
+        }
     }
 
     @Override
     public <T> Observable<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
-        return doInvokeAsync(name, entity, responseType.getRawType(), 
responseType.getType());
+        if (sc == null) {
+            return Observable.from(wc.async().method(name, entity, 
responseType));
+        } else {
+            return Observable.from(wc.async().method(name, entity, 
responseType), sc);
+        }
     }
 
     @Override
     public <T> Observable<T> method(String name, Class<T> responseType) {
-        return doInvokeAsync(name, null, responseType, responseType);
+        if (sc == null) {
+            return Observable.from(wc.async().method(name, responseType));
+        } else {
+            return Observable.from(wc.async().method(name, responseType), sc);
+        }
     }
 
     @Override
     public <T> Observable<T> method(String name, GenericType<T> responseType) {
-        return doInvokeAsync(name, null, responseType.getRawType(), 
responseType.getType());
-    }
-
-    protected <T> Observable<T> doInvokeAsync(String httpMethod,
-                                              Object body,
-                                              Class<?> respClass,
-                                              Type outType) {
-        JaxrsClientObservableCallback<T> cb = new 
JaxrsClientObservableCallback<T>(respClass, outType, ex);
-        wc.prepareAsyncClient(httpMethod, body, null, null, respClass, 
outType, cb);
-        return cb.getObservable();
+        if (sc == null) {
+            return Observable.from(wc.async().method(name, responseType));
+        } else {
+            return Observable.from(wc.async().method(name, responseType), sc);
+        }
     }
 
 }

Reply via email to