Repository: cxf Updated Branches: refs/heads/master 8595d5ce7 -> 26f430253
[CXF-6881] Prototyping the boilerplate CompletionStage invoker code Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/26f43025 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/26f43025 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/26f43025 Branch: refs/heads/master Commit: 26f43025348e101e981e3f64ecc3ddf483ad7fb3 Parents: 8595d5c Author: Sergey Beryozkin <[email protected]> Authored: Tue Sep 27 23:33:07 2016 +0100 Committer: Sergey Beryozkin <[email protected]> Committed: Tue Sep 27 23:33:07 2016 +0100 ---------------------------------------------------------------------- .../cxf/jaxrs/client/JaxrsClientCallback.java | 233 ++++++++++++++++++- .../org/apache/cxf/jaxrs/client/WebClient.java | 174 ++++++++++++++ .../client/spec/InvocationBuilderImpl.java | 3 +- .../cxf/systest/jaxrs/JAXRSAsyncClientTest.java | 24 ++ 4 files changed, 429 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/26f43025/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java index 390ab23..a1e7049 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/JaxrsClientCallback.java @@ -22,10 +22,17 @@ package org.apache.cxf.jaxrs.client; import java.lang.reflect.Type; import java.util.Map; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import javax.ws.rs.client.InvocationCallback; @@ -65,7 +72,11 @@ class JaxrsClientCallback<T> extends ClientCallback { } public Future<T> createFuture() { - return new JaxrsResponseCallback<T>(this); + return new JaxrsResponseFuture<T>(this); + } + + public CompletionStage<T> createCompletionStage() { + return null; } @@ -97,9 +108,9 @@ class JaxrsClientCallback<T> extends ClientCallback { - static class JaxrsResponseCallback<T> implements Future<T> { + static class JaxrsResponseFuture<T> implements Future<T> { JaxrsClientCallback<T> callback; - JaxrsResponseCallback(JaxrsClientCallback<T> cb) { + JaxrsResponseFuture(JaxrsClientCallback<T> cb) { callback = cb; } @@ -148,4 +159,220 @@ class JaxrsClientCallback<T> extends ClientCallback { return callback.isDone(); } } + static class JaxrsResponseStage<T> implements CompletionStage<T> { + JaxrsClientCallback<T> callback; + JaxrsResponseStage(JaxrsClientCallback<T> cb) { + callback = cb; + } + @Override + public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> thenAccept(Consumer<? super T> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> thenRun(Runnable action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> thenRunAsync(Runnable action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, + BiFunction<? super T, ? super U, ? extends V> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, + BiFunction<? super T, ? super U, ? extends V> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, + BiFunction<? super T, ? super U, ? extends V> fn, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, + BiConsumer<? super T, ? super U> action, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, + Function<? super T, U> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, + Function<? super T, U> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, + Function<? super T, U> fn, Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, + Consumer<? super T> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, + Consumer<? super T> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, + Consumer<? super T> action, Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) { + // TODO Auto-generated method stub + return null; + } + @Override + public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, + Executor executor) { + // TODO Auto-generated method stub + return null; + } + @Override + public CompletableFuture<T> toCompletableFuture() { + // TODO Auto-generated method stub + return null; + } + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cxf/blob/26f43025/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java index 892ce20..356edfe 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/WebClient.java @@ -28,12 +28,14 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import javax.ws.rs.HttpMethod; import javax.ws.rs.ProcessingException; import javax.ws.rs.WebApplicationException; import javax.ws.rs.client.AsyncInvoker; +import javax.ws.rs.client.CompletionStageRxInvoker; import javax.ws.rs.client.Entity; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.SyncInvoker; @@ -946,6 +948,44 @@ public class WebClient extends AbstractClient { return cb.createFuture(); } + + protected <T> CompletionStage<T> doInvokeAsyncStage(String httpMethod, + Object body, + Class<?> requestClass, + Type inType, + Class<?> respClass, + Type outType) { + Annotation[] inAnns = null; + if (body instanceof Entity) { + Entity<?> entity = (Entity<?>)body; + setEntityHeaders(entity); + body = entity.getEntity(); + requestClass = body.getClass(); + inType = body.getClass(); + inAnns = entity.getAnnotations(); + } + + if (body instanceof GenericEntity) { + GenericEntity<?> genericEntity = (GenericEntity<?>)body; + body = genericEntity.getEntity(); + requestClass = genericEntity.getRawType(); + inType = genericEntity.getType(); + } + + MultivaluedMap<String, String> headers = prepareHeaders(respClass, body); + resetResponse(); + + Message m = finalizeMessage(httpMethod, headers, body, requestClass, inType, + inAnns, respClass, outType, null, null); + + m.getExchange().setSynchronous(false); + JaxrsClientCallback<T> cb = new JaxrsClientCallback<T>(null, respClass, outType); + m.getExchange().put(JaxrsClientCallback.class, cb); + + doRunInterceptorChain(m); + + return cb.createCompletionStage(); + } private MultivaluedMap<String, String> prepareHeaders(Class<?> responseClass, Object body) { @@ -1244,6 +1284,11 @@ public class WebClient extends AbstractClient { return new SyncInvokerImpl(); } + // Link to JAX-RS 2.1 CompletionStageRxInvoker + public CompletionStageRxInvoker rx() { + return new CompletionStageRxInvokerImpl(); + } + private void setEntityHeaders(Entity<?> entity) { type(entity.getMediaType()); if (entity.getLanguage() != null) { @@ -1567,4 +1612,133 @@ public class WebClient extends AbstractClient { return invoke(method, entity, genericType); } } + + class CompletionStageRxInvokerImpl implements CompletionStageRxInvoker { + + @Override + public CompletionStage<Response> get() { + return get(Response.class); + } + + @Override + public <T> CompletionStage<T> get(Class<T> responseType) { + return method(HttpMethod.GET, responseType); + } + + @Override + public <T> CompletionStage<T> get(GenericType<T> responseType) { + return method(HttpMethod.GET, responseType); + } + + @Override + public CompletionStage<Response> put(Entity<?> entity) { + return put(entity, Response.class); + } + + @Override + public <T> CompletionStage<T> put(Entity<?> entity, Class<T> responseType) { + return method(HttpMethod.PUT, entity, responseType); + } + + @Override + public <T> CompletionStage<T> put(Entity<?> entity, GenericType<T> responseType) { + return method(HttpMethod.PUT, entity, responseType); + } + + @Override + public CompletionStage<Response> post(Entity<?> entity) { + return post(entity, Response.class); + } + + @Override + public <T> CompletionStage<T> post(Entity<?> entity, Class<T> responseType) { + return method(HttpMethod.POST, entity, responseType); + } + + @Override + public <T> CompletionStage<T> post(Entity<?> entity, GenericType<T> responseType) { + return method(HttpMethod.POST, entity, responseType); + } + + @Override + public CompletionStage<Response> delete() { + return delete(Response.class); + } + + @Override + public <T> CompletionStage<T> delete(Class<T> responseType) { + return method(HttpMethod.DELETE, responseType); + } + + @Override + public <T> CompletionStage<T> delete(GenericType<T> responseType) { + return method(HttpMethod.DELETE, responseType); + } + + @Override + public CompletionStage<Response> head() { + return method(HttpMethod.HEAD); + } + + @Override + public CompletionStage<Response> options() { + return options(Response.class); + } + + @Override + public <T> CompletionStage<T> options(Class<T> responseType) { + return method(HttpMethod.OPTIONS, responseType); + } + + @Override + public <T> CompletionStage<T> options(GenericType<T> responseType) { + return method(HttpMethod.OPTIONS, responseType); + } + + @Override + public CompletionStage<Response> trace() { + return trace(Response.class); + } + + @Override + public <T> CompletionStage<T> trace(Class<T> responseType) { + return method("TRACE", responseType); + } + + @Override + public <T> CompletionStage<T> trace(GenericType<T> responseType) { + return method("TRACE", responseType); + } + + @Override + public CompletionStage<Response> method(String name) { + return method(name, Response.class); + } + + @Override + public CompletionStage<Response> method(String name, Entity<?> entity) { + return method(name, entity, Response.class); + } + + @Override + public <T> CompletionStage<T> method(String name, Entity<?> entity, Class<T> responseType) { + return doInvokeAsyncStage(name, entity, null, null, responseType, responseType); + } + + @Override + public <T> CompletionStage<T> method(String name, Entity<?> entity, GenericType<T> responseType) { + return doInvokeAsyncStage(name, entity, null, null, responseType.getRawType(), responseType.getType()); + } + + @Override + public <T> CompletionStage<T> method(String name, Class<T> responseType) { + return doInvokeAsyncStage(name, null, null, null, responseType, responseType); + } + + @Override + public <T> CompletionStage<T> method(String name, GenericType<T> responseType) { + return doInvokeAsyncStage(name, null, null, null, responseType.getRawType(), responseType.getType()); + } + + } } http://git-wip-us.apache.org/repos/asf/cxf/blob/26f43025/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java ---------------------------------------------------------------------- diff --git a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java index 10f5d9b..899f1a8 100644 --- a/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java +++ b/rt/rs/client/src/main/java/org/apache/cxf/jaxrs/client/spec/InvocationBuilderImpl.java @@ -377,8 +377,7 @@ public class InvocationBuilderImpl implements Invocation.Builder { @Override public CompletionStageRxInvoker rx() { - // TODO: Implementation required (JAX-RS 2.1) - return null; + return webClient.rx(); } @Override http://git-wip-us.apache.org/repos/asf/cxf/blob/26f43025/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java ---------------------------------------------------------------------- diff --git a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java index 8713229..8f14c8c 100644 --- a/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java +++ b/systests/jaxrs/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientTest.java @@ -27,8 +27,10 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Consumer; import javax.ws.rs.Consumes; import javax.ws.rs.NotFoundException; @@ -59,6 +61,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase { @@ -264,6 +267,7 @@ public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase { assertTrue(((GenericInvocationCallback)callback).getResult().readEntity(Boolean.class)); } + @Test public void testAsyncProxyPrimitiveResponse() throws Exception { String address = "http://localhost:" + PORT; @@ -365,6 +369,26 @@ public class JAXRSAsyncClientTest extends AbstractBusClientServerTestBase { } + @Test + @Ignore + public void testGetBookAsyncStage() throws Exception { + String address = "http://localhost:" + PORT + "/bookstore/books"; + WebClient wc = createWebClient(address); + CompletionStage<Book> stage = wc.path("123").rx().get(Book.class); + Holder<Book> h = new Holder<Book>(); + Consumer<Book> action = new Consumer<Book>() { + + @Override + public void accept(Book t) { + h.value = t; + + } + + }; + stage.thenAccept(action); + assertEquals(123L, h.value.getId()); + } + private WebClient createWebClient(String address) { List<Object> providers = new ArrayList<Object>(); return WebClient.create(address, providers);
