Repository: cxf Updated Branches: refs/heads/master 1116dff4e -> 6958418ec
[CXF-6889] Continuing prototyping CompletionStage invoker code Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6958418e Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6958418e Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6958418e Branch: refs/heads/master Commit: 6958418ec4989c4391389ee6bfd1d9055ec127a5 Parents: 1116dff Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Mon Oct 10 13:05:16 2016 +0100 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Mon Oct 10 13:05:16 2016 +0100 ---------------------------------------------------------------------- .../cxf/jaxrs/client/JaxrsClientCallback.java | 228 ------------------- .../jaxrs/client/JaxrsClientStageCallback.java | 91 ++++++++ .../org/apache/cxf/jaxrs/client/WebClient.java | 27 ++- .../client/spec/InvocationBuilderImpl.java | 5 +- .../cxf/systest/jaxrs/JAXRSAsyncClientTest.java | 18 +- 5 files changed, 114 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java index a1e7049..0d10ad5 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java @@ -22,17 +22,10 @@ package org.apache.cxf.jaxrs.client; import java.lang.reflect.Type; import java.util.Map; import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; import javax.ws.rs.client.InvocationCallback; @@ -75,11 +68,6 @@ class JaxrsClientCallback<T> extends ClientCallback { return new JaxrsResponseFuture<T>(this); } - public CompletionStage<T> createCompletionStage() { - return null; - } - - @SuppressWarnings("unchecked") public void handleResponse(Map<String, Object> ctx, Object[] res) { context = ctx; @@ -159,220 +147,4 @@ class JaxrsClientCallback<T> extends ClientCallback { return callback.isDone(); } } - static class JaxrsResponseStage<T> implements CompletionStage<T> { - JaxrsClientCallback<T> callback; - JaxrsResponseStage(JaxrsClientCallback<T> cb) { - callback = cb; - } - @Override - public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> thenAccept(Consumer<? super T> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> thenRun(Runnable action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> thenRunAsync(Runnable action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, - BiFunction<? super T, ? super U, ? extends V> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, - BiFunction<? super T, ? super U, ? extends V> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, - BiFunction<? super T, ? super U, ? extends V> fn, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, - BiConsumer<? super T, ? super U> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, - BiConsumer<? super T, ? super U> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, - BiConsumer<? super T, ? super U> action, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, - Function<? super T, U> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, - Function<? super T, U> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, - Function<? super T, U> fn, Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, - Consumer<? super T> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, - Consumer<? super T> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, - Consumer<? super T> action, Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) { - // TODO Auto-generated method stub - return null; - } - @Override - public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, - Executor executor) { - // TODO Auto-generated method stub - return null; - } - @Override - public CompletableFuture<T> toCompletableFuture() { - // TODO Auto-generated method stub - return null; - } - - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java new file mode 100644 index 0000000..822d235 --- /dev/null +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientStageCallback.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.jaxrs.client; + +import java.lang.reflect.Type; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +class JaxrsClientStageCallback<T> extends JaxrsClientCallback<T> { + private CompletableFuture<T> cf; + + JaxrsClientStageCallback(Class<?> responseClass, + Type outGenericType, + Executor ex) { + super(null, responseClass, outGenericType); + + Supplier<T> supplier = new SupplierImpl(); + cf = ex == null ? CompletableFuture.supplyAsync(supplier) + : CompletableFuture.supplyAsync(supplier, ex); + } + + public CompletionStage<T> getCompletionStage() { + return cf; + } + + @Override + public void handleResponse(Map<String, Object> ctx, Object[] res) { + context = ctx; + result = res; + //consumer.accept((T)res[0]); + done = true; + synchronized (this) { + notifyAll(); + } + } + + @Override + public void handleException(Map<String, Object> ctx, final Throwable ex) { + context = ctx; + exception = ex; + //handler.failed(exception); + done = true; + synchronized (this) { + notifyAll(); + } + } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean result = super.cancel(mayInterruptIfRunning); + if (result) { + //handler.failed(new CancellationException()); + } + return result; + } + + private class SupplierImpl implements Supplier<T> { + + @SuppressWarnings("unchecked") + @Override + public T get() { + try { + return (T)JaxrsClientStageCallback.this.get()[0]; + } catch (Exception ex) { + //handler.failed((InterruptedException)ex); + //throw ex; + return null; + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java index 356edfe..e522de2 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java @@ -29,6 +29,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import javax.ws.rs.HttpMethod; @@ -954,7 +955,8 @@ public class WebClient extends AbstractClient { Class<?> requestClass, Type inType, Class<?> respClass, - Type outType) { + Type outType, + ExecutorService ex) { Annotation[] inAnns = null; if (body instanceof Entity) { Entity<?> entity = (Entity<?>)body; @@ -979,12 +981,12 @@ public class WebClient extends AbstractClient { inAnns, respClass, outType, null, null); m.getExchange().setSynchronous(false); - JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(null, respClass, outType); + JaxrsClientStageCallback<T> cb = new JaxrsClientStageCallback<T>(respClass, outType, ex); m.getExchange().put(JaxrsClientCallback.class, cb); doRunInterceptorChain(m); - return cb.createCompletionStage(); + return cb.getCompletionStage(); } @@ -1286,7 +1288,10 @@ public class WebClient extends AbstractClient { // Link to JAX-RS 2.1 CompletionStageRxInvoker public CompletionStageRxInvoker rx() { - return new CompletionStageRxInvokerImpl(); + return new CompletionStageRxInvokerImpl(null); + } + public CompletionStageRxInvoker rx(ExecutorService ex) { + return new CompletionStageRxInvokerImpl(ex); } private void setEntityHeaders(Entity<?> entity) { @@ -1614,7 +1619,11 @@ public class WebClient extends AbstractClient { } class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker { - + private ExecutorService ex; + CompletionStageRxInvokerImpl(ExecutorService ex) { + this.ex = ex; + } + @Override public CompletionStage<Response> get() { return get(Response.class); @@ -1722,22 +1731,22 @@ public class WebClient extends AbstractClient { @Override public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) { - return doInvokeAsyncStage(name, entity, null, null, responseType, responseType); + return doInvokeAsyncStage(name, entity, null, null, responseType, responseType, ex); } @Override public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) { - return doInvokeAsyncStage(name, entity, null, null, responseType.getRawType(), responseType.getType()); + return doInvokeAsyncStage(name, entity, null, null, responseType.getRawType(), responseType.getType(), ex); } @Override public <T> CompletionStage<T> method(String name, Class<T> responseType) { - return doInvokeAsyncStage(name, null, null, null, responseType, responseType); + return doInvokeAsyncStage(name, null, null, null, responseType, responseType, ex); } @Override public <T> CompletionStage<T> method(String name, GenericType<T> responseType) { - return doInvokeAsyncStage(name, null, null, null, responseType.getRawType(), responseType.getType()); + return doInvokeAsyncStage(name, null, null, null, responseType.getRawType(), responseType.getType(), ex); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java index 899f1a8..04980ee 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java @@ -382,16 +382,17 @@ public class InvocationBuilderImpl implements Invocation.Builder { @Override public CompletionStageRxInvoker rx(ExecutorService executorService) { - // TODO: Implementation required (JAX-RS 2.1) - return null; + return webClient.rx(executorService); } + @SuppressWarnings("rawtypes") @Override public <T extends RxInvoker> T rx(Class<T> clazz) { // TODO: Implementation required (JAX-RS 2.1) return null; } + @SuppressWarnings("rawtypes") @Override public <T extends RxInvoker> T rx(Class<T> clazz, ExecutorService executorService) { // TODO: Implementation required (JAX-RS 2.1) http://git-wip-us.apache.org/repos/asf/cxf/blob/6958418e/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java index 8f14c8c..568e520 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.function.Consumer; import javax.ws.rs.Consumes; import javax.ws.rs.NotFoundException; @@ -61,7 +60,6 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase { @@ -370,25 +368,13 @@ public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase { @Test - @Ignore public void testGetBookAsyncStage() throws Exception { String address = "http://localhost:" + PORT + "/bookstore/books"; WebClient wc = createWebClient(address); CompletionStage<Book> stage = wc.path("123").rx().get(Book.class); - Holder<Book> h = new Holder<Book>(); - Consumer<Book> action = new Consumer<Book>() { - - @Override - public void accept(Book t) { - h.value = t; - - } - - }; - stage.thenAccept(action); - assertEquals(123L, h.value.getId()); + Book book = stage.toCompletableFuture().join(); + assertEquals(123L, book.getId()); } - private WebClient createWebClient(String address) { List<Object> providers = new ArrayList<Object>(); return WebClient.create(address, providers);