Repository: cxf
Updated Branches:
  refs/heads/master 1116dff4e -> 6958418ec


[CXF-6889] Continuing prototyping CompletionStage invoker code


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6958418e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6958418e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6958418e

Branch: refs/heads/master
Commit: 6958418ec4989c4391389ee6bfd1d9055ec127a5
Parents: 1116dff
Author: Sergey Beryozkin <sberyoz...@gmail.com>
Authored: Mon Oct 10 13:05:16 2016 +0100
Committer: Sergey Beryozkin <sberyoz...@gmail.com>
Committed: Mon Oct 10 13:05:16 2016 +0100

----------------------------------------------------------------------
 .../cxf/jaxrs/client/JaxrsClientCallback.java   | 228 -------------------
 .../jaxrs/client/JaxrsClientStageCallback.java  |  91 ++++++++
 .../org/apache/cxf/jaxrs/client/WebClient.java  |  27 ++-
 .../client/spec/InvocationBuilderImpl.java      |   5 +-
 .../cxf/systest/jaxrs/JAXRSAsyncClientTest.java |  18 +-
 5 files changed, 114 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
index a1e7049..0d10ad5 100644
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java
@@ -22,17 +22,10 @@ package org.apache.cxf.jaxrs.client;
 import java.lang.reflect.Type;
 import java.util.Map;
 import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 import javax.ws.rs.client.InvocationCallback;
 
@@ -75,11 +68,6 @@ class JaxrsClientCallback<T> extends ClientCallback {
         return new JaxrsResponseFuture<T>(this);
     }
     
-    public CompletionStage<T> createCompletionStage() {
-        return null;
-    }
-    
-    
     @SuppressWarnings("unchecked")
     public void handleResponse(Map<String, Object> ctx, Object[] res) {
         context = ctx;
@@ -159,220 +147,4 @@ class JaxrsClientCallback<T> extends ClientCallback {
             return callback.isDone();
         }
     }
-    static class JaxrsResponseStage<T> implements CompletionStage<T> {
-        JaxrsClientCallback<T> callback;
-        JaxrsResponseStage(JaxrsClientCallback<T> cb) {
-            callback = cb;
-        }
-        @Override
-        public <U> CompletionStage<U> thenApply(Function<? super T, ? extends 
U> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? 
extends U> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? 
extends U> fn, Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> thenAccept(Consumer<? super T> action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> 
action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> 
action, Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> thenRun(Runnable action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> thenRunAsync(Runnable action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> thenRunAsync(Runnable action, Executor 
executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends 
U> other,
-                                                     BiFunction<? super T, ? 
super U, ? extends V> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? 
extends U> other,
-                                                          BiFunction<? super 
T, ? super U, ? extends V> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? 
extends U> other,
-                                                          BiFunction<? super 
T, ? super U, ? extends V> fn,
-                                                          Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? 
extends U> other,
-                                                        BiConsumer<? super T, 
? super U> action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? 
extends U> other,
-                                                             BiConsumer<? 
super T, ? super U> action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? 
extends U> other,
-                                                             BiConsumer<? 
super T, ? super U> action,
-                                                             Executor 
executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, 
Runnable action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> 
other, Runnable action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> 
other, Runnable action,
-                                                       Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> applyToEither(CompletionStage<? extends 
T> other,
-                                                    Function<? super T, U> fn) 
{
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? 
extends T> other,
-                                                         Function<? super T, 
U> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? 
extends T> other,
-                                                         Function<? super T, 
U> fn, Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> acceptEither(CompletionStage<? extends T> 
other,
-                                                  Consumer<? super T> action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> acceptEitherAsync(CompletionStage<? 
extends T> other,
-                                                       Consumer<? super T> 
action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> acceptEitherAsync(CompletionStage<? 
extends T> other,
-                                                       Consumer<? super T> 
action, Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> runAfterEither(CompletionStage<?> other, 
Runnable action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> 
other, Runnable action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> 
other, Runnable action,
-                                                         Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> thenCompose(Function<? super T, ? 
extends CompletionStage<U>> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? 
extends CompletionStage<U>> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? 
extends CompletionStage<U>> fn,
-                                                       Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<T> exceptionally(Function<Throwable, ? extends 
T> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super 
Throwable> action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? 
super Throwable> action) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? 
super Throwable> action,
-                                                    Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, 
? extends U> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> handleAsync(BiFunction<? super T, 
Throwable, ? extends U> fn) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public <U> CompletionStage<U> handleAsync(BiFunction<? super T, 
Throwable, ? extends U> fn,
-                                                  Executor executor) {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        @Override
-        public CompletableFuture<T> toCompletableFuture() {
-            // TODO Auto-generated method stub
-            return null;
-        }
-        
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
new file mode 100644
index 0000000..822d235
--- /dev/null
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.jaxrs.client;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.function.Supplier;
+
+class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T>  {
+    private CompletableFuture<T> cf;
+    
+    JaxrsClientStageCallback(Class<?> responseClass, 
+                             Type outGenericType,
+                             Executor ex) {
+        super(null, responseClass, outGenericType);
+        
+        Supplier<T> supplier = new SupplierImpl();
+        cf = ex == null ? CompletableFuture.supplyAsync(supplier) 
+            : CompletableFuture.supplyAsync(supplier, ex);
+    }
+    
+    public CompletionStage<T> getCompletionStage() {
+        return cf;
+    }
+    
+    @Override
+    public void handleResponse(Map<String, Object> ctx, Object[] res) {
+        context = ctx;
+        result = res;
+        //consumer.accept((T)res[0]);
+        done = true;
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+
+    @Override
+    public void handleException(Map<String, Object> ctx, final Throwable ex) {
+        context = ctx;
+        exception = ex;
+        //handler.failed(exception);
+        done = true;
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        boolean result = super.cancel(mayInterruptIfRunning);
+        if (result) {
+            //handler.failed(new CancellationException());
+        }
+        return result;
+    }
+
+    private class SupplierImpl implements Supplier<T> {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public T get() {
+            try {
+                return (T)JaxrsClientStageCallback.this.get()[0];
+            } catch (Exception ex) {
+                //handler.failed((InterruptedException)ex);
+                //throw ex;
+                return null;
+            }
+        }
+        
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
index 356edfe..e522de2 100644
--- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
+++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java
@@ -29,6 +29,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import javax.ws.rs.HttpMethod;
@@ -954,7 +955,8 @@ public class WebClient extends AbstractClient {
                                           Class<?> requestClass,
                                           Type inType,
                                           Class<?> respClass,
-                                          Type outType) {
+                                          Type outType,
+                                          ExecutorService ex) {
         Annotation[] inAnns = null;
         if (body instanceof Entity) {
             Entity<?> entity = (Entity<?>)body;
@@ -979,12 +981,12 @@ public class WebClient extends AbstractClient {
                                     inAnns, respClass, outType, null, null);
         
         m.getExchange().setSynchronous(false);
-        JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(null, 
respClass, outType);
+        JaxrsClientStageCallback<T> cb = new 
JaxrsClientStageCallback<T>(respClass, outType, ex);
         m.getExchange().put(JaxrsClientCallback.class, cb);
         
         doRunInterceptorChain(m);
         
-        return cb.createCompletionStage();
+        return cb.getCompletionStage();
     }
 
     
@@ -1286,7 +1288,10 @@ public class WebClient extends AbstractClient {
     
     // Link to JAX-RS 2.1 CompletionStageRxInvoker
     public CompletionStageRxInvoker rx() {
-        return new CompletionStageRxInvokerImpl();
+        return new CompletionStageRxInvokerImpl(null);
+    }
+    public CompletionStageRxInvoker rx(ExecutorService ex) {
+        return new CompletionStageRxInvokerImpl(ex);
     }
     
     private void setEntityHeaders(Entity<?> entity) {
@@ -1614,7 +1619,11 @@ public class WebClient extends AbstractClient {
     }
     
     class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker {
-
+        private ExecutorService ex;
+        CompletionStageRxInvokerImpl(ExecutorService ex) {
+            this.ex = ex;
+        }
+        
         @Override
         public CompletionStage<Response> get() {
             return get(Response.class);
@@ -1722,22 +1731,22 @@ public class WebClient extends AbstractClient {
 
         @Override
         public <T> CompletionStage<T> method(String name, Entity<?> entity, 
Class<T> responseType) {
-            return doInvokeAsyncStage(name, entity, null, null, responseType, 
responseType);
+            return doInvokeAsyncStage(name, entity, null, null, responseType, 
responseType, ex);
         }
 
         @Override
         public <T> CompletionStage<T> method(String name, Entity<?> entity, 
GenericType<T> responseType) {
-            return doInvokeAsyncStage(name, entity, null, null, 
responseType.getRawType(), responseType.getType());
+            return doInvokeAsyncStage(name, entity, null, null, 
responseType.getRawType(), responseType.getType(), ex);
         }
 
         @Override
         public <T> CompletionStage<T> method(String name, Class<T> 
responseType) {
-            return doInvokeAsyncStage(name, null, null, null, responseType, 
responseType);
+            return doInvokeAsyncStage(name, null, null, null, responseType, 
responseType, ex);
         }
 
         @Override
         public <T> CompletionStage<T> method(String name, GenericType<T> 
responseType) {
-            return doInvokeAsyncStage(name, null, null, null, 
responseType.getRawType(), responseType.getType());
+            return doInvokeAsyncStage(name, null, null, null, 
responseType.getRawType(), responseType.getType(), ex);
         }
              
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
----------------------------------------------------------------------
diff --git 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
index 899f1a8..04980ee 100644
--- 
a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
+++ 
b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java
@@ -382,16 +382,17 @@ public class InvocationBuilderImpl implements 
Invocation.Builder {
 
     @Override
     public CompletionStageRxInvoker rx(ExecutorService executorService) {
-        // TODO: Implementation required (JAX-RS 2.1)
-        return null;
+        return webClient.rx(executorService);
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public <T extends RxInvoker> T rx(Class<T> clazz) {
         // TODO: Implementation required (JAX-RS 2.1)
         return null;
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
     public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService 
executorService) {
         // TODO: Implementation required (JAX-RS 2.1)

http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java
----------------------------------------------------------------------
diff --git 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java
 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java
index 8f14c8c..568e520 100644
--- 
a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java
+++ 
b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.function.Consumer;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.NotFoundException;
@@ -61,7 +60,6 @@ import 
org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
 
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase {
@@ -370,25 +368,13 @@ public class JAXRSAsyncClientTest extends 
AbstractBusClientServerTestBase {
     
     
     @Test
-    @Ignore
     public void testGetBookAsyncStage() throws Exception {
         String address = "http://localhost:"; + PORT + "/bookstore/books";
         WebClient wc = createWebClient(address);
         CompletionStage<Book> stage = wc.path("123").rx().get(Book.class);
-        Holder<Book> h = new Holder<Book>();
-        Consumer<Book> action = new Consumer<Book>() {
-
-            @Override
-            public void accept(Book t) {
-                h.value = t;
-                
-            }
-            
-        };
-        stage.thenAccept(action);
-        assertEquals(123L, h.value.getId());
+        Book book = stage.toCompletableFuture().join();
+        assertEquals(123L, book.getId());
     }
-    
     private WebClient createWebClient(String address) {
         List<Object> providers = new ArrayList<Object>();
         return WebClient.create(address, providers);

Reply via email to