This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.2.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 8825ecfeeeb7448cb9fd3ad4555b07b60de82ff5 Author: Andriy Redko <drr...@gmail.com> AuthorDate: Sun Jul 19 09:07:34 2020 -0400 CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (#683) CXF-8303: MP: Context propagation impossible using AsyncInvocationInterceptorFactory (cherry picked from commit 2244024ba6d438f44adffd56b1dc8e02cb45cf71) --- .../microprofile/client/MPRestClientCallback.java | 15 +- .../client/MicroProfileClientFactoryBean.java | 2 +- .../org/apache/cxf/microprofile/client/Utils.java | 133 ++++++++++ .../client/proxy/MicroProfileClientProxyImpl.java | 10 + systests/microprofile/client/async/pom.xml | 6 + .../rest/client/AsyncThreadingTest.java | 291 +++++++++++++++++++++ 6 files changed, 442 insertions(+), 15 deletions(-) diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java index a37dde9..83226db 100644 --- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java +++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MPRestClientCallback.java @@ -20,10 +20,6 @@ package org.apache.cxf.microprofile.client; import java.lang.reflect.Type; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import javax.ws.rs.client.InvocationCallback; @@ -32,25 +28,16 @@ import org.apache.cxf.jaxrs.client.JaxrsClientCallback; import org.apache.cxf.message.Message; public class MPRestClientCallback<T> extends JaxrsClientCallback<T> { - private final ExecutorService executor; - public MPRestClientCallback(InvocationCallback<T> handler, Message outMessage, Class<?> responseClass, Type outGenericType) { super(handler, responseClass, outGenericType); - ExecutorService es = outMessage.get(ExecutorService.class); - if (es == null) { - es = AccessController.doPrivileged((PrivilegedAction<ExecutorService>)() -> { - return ForkJoinPool.commonPool(); - }); - } - executor = es; } @SuppressWarnings("unchecked") @Override public Future<T> createFuture() { - return delegate.thenApplyAsync(res -> (T)res[0], executor); + return delegate.thenApply(res -> (T)res[0]); } } \ No newline at end of file diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java index 479cabb..7d19a7a 100644 --- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java +++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/MicroProfileClientFactoryBean.java @@ -56,7 +56,7 @@ public class MicroProfileClientFactoryBean extends JAXRSClientFactoryBean { super(); this.configuration = configuration.getConfiguration(); this.comparator = MicroProfileClientProviderFactory.createComparator(this); - this.executorService = executorService; + this.executorService = (executorService == null) ? Utils.defaultExecutorService() : executorService; super.setAddress(baseUri); super.setServiceClass(aClass); super.setProviderComparator(comparator); diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java new file mode 100644 index 0000000..7b1edad --- /dev/null +++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/Utils.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.microprofile.client; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cxf.jaxrs.client.AbstractClient; +import org.apache.cxf.jaxrs.ext.MessageContext; + +public final class Utils { + + private Utils() { + } + + public static ExecutorService getExecutorService(MessageContext mc) { + ExecutorService es = (ExecutorService) mc.get(AbstractClient.EXECUTOR_SERVICE_PROPERTY); + if (es == null) { + es = getCommonPool(); + } + return es; + } + + public static ExecutorService defaultExecutorService() { + return new LazyForkJoinExecutor(); + } + + private static class LazyForkJoinExecutor implements ExecutorService { + @Override + public void execute(Runnable command) { + getCommonPool().execute(command); + } + + @Override + public void shutdown() { + getCommonPool().shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + return getCommonPool().shutdownNow(); + } + + @Override + public boolean isShutdown() { + return getCommonPool().isShutdown(); + } + + @Override + public boolean isTerminated() { + return getCommonPool().isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return getCommonPool().awaitTermination(timeout, unit); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return getCommonPool().submit(task); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return getCommonPool().submit(task, result); + } + + @Override + public Future<?> submit(Runnable task) { + return getCommonPool().submit(task); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { + return getCommonPool().invokeAll(tasks); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) throws InterruptedException { + return getCommonPool().invokeAll(tasks, timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + return getCommonPool().invokeAny(tasks); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, + TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return getCommonPool().invokeAny(tasks, timeout, unit); + } + } + + private static ExecutorService getCommonPool() { + if (System.getSecurityManager() != null) { + return AccessController.doPrivileged((PrivilegedAction<ExecutorService>) () -> { + return ForkJoinPool.commonPool(); + }); + } else { + return ForkJoinPool.commonPool(); + } + } +} \ No newline at end of file diff --git a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java index 2bd02ee..8c584b4 100644 --- a/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java +++ b/rt/rs/microprofile-client/src/main/java/org/apache/cxf/microprofile/client/proxy/MicroProfileClientProxyImpl.java @@ -63,6 +63,11 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl { boolean isRoot, boolean inheritHeaders, ExecutorService executorService, Configuration configuration, Object... varValues) { super(new LocalClientState(baseURI, configuration.getProperties()), loader, cri, isRoot, inheritHeaders, varValues); + + if (executorService == null) { + throw new IllegalArgumentException("The executorService is required and must be provided"); + } + cfg.getRequestContext().put(EXECUTOR_SERVICE_PROPERTY, executorService); cfg.getRequestContext().putAll(configuration.getProperties()); } @@ -71,6 +76,11 @@ public class MicroProfileClientProxyImpl extends ClientProxyImpl { boolean isRoot, boolean inheritHeaders, ExecutorService executorService, Configuration configuration, Object... varValues) { super(initialState, loader, cri, isRoot, inheritHeaders, varValues); + + if (executorService == null) { + throw new IllegalArgumentException("The executorService is required and must be provided"); + } + cfg.getRequestContext().put(EXECUTOR_SERVICE_PROPERTY, executorService); cfg.getRequestContext().putAll(configuration.getProperties()); } diff --git a/systests/microprofile/client/async/pom.xml b/systests/microprofile/client/async/pom.xml index b52826f..43dc497 100644 --- a/systests/microprofile/client/async/pom.xml +++ b/systests/microprofile/client/async/pom.xml @@ -88,6 +88,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jsonb_1.0_spec</artifactId> + <version>1.1</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>javax.json</groupId> <artifactId>javax.json-api</artifactId> <version>${cxf.json.api.1.1.version}</version> diff --git a/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java b/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java new file mode 100644 index 0000000..e6ee0d4 --- /dev/null +++ b/systests/microprofile/client/async/src/test/java/org/apache/cxf/systest/microprofile/rest/client/AsyncThreadingTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.microprofile.rest.client; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; + +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.junit.WireMockRule; + +import org.apache.johnzon.jaxrs.jsonb.jaxrs.JsonbJaxrsProvider; +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptor; +import org.eclipse.microprofile.rest.client.ext.AsyncInvocationInterceptorFactory; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertThrows; + +@RunWith(Parameterized.class) +public class AsyncThreadingTest { + private static final ThreadLocal<String> CONTEXT = new ThreadLocal<>(); + + @Rule + public WireMockRule wireMockRule = new WireMockRule(WireMockConfiguration.options().dynamicPort()); + + private final ExecutorService executorService; + private final String prefix; + private EchoResource echo; + + public AsyncThreadingTest(final ExecutorService executorService, final String prefix) { + this.executorService = executorService; + this.prefix = prefix; + } + + @Parameters(name = "Using pool: {1}") + public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] + { + {cachedExecutor(), "mp-async-"}, + {null, "ForkJoinPool.commonPool-worker-"} + }); + } + + @Before + public void setUp() { + final RestClientBuilder builder = RestClientBuilder + .newBuilder() + .register(JsonbJaxrsProvider.class) + .register(AsyncInvocationInterceptorFactoryImpl.class) + .baseUri(getBaseUri()); + + if (executorService == null /* use default one */) { + echo = builder.build(EchoResource.class); + } else { + echo = builder.executorService(executorService).build(EchoResource.class); + } + } + + @After + public void tearDown() { + CONTEXT.remove(); + } + + @Test + public void testAsynchronousNotFoundCall() throws Exception { + wireMockRule.stubFor(get(urlEqualTo("/echo")) + .willReturn(aResponse() + .withStatus(404))); + + final CompletableFuture<Echo> future = echo + .getAsync() + .toCompletableFuture() + .handle((r, ex) -> { + try { + Thread.sleep(500); + assertThat(Thread.currentThread().getName(), startsWith(prefix)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + if (ex instanceof CompletionException) { + throw (CompletionException)ex; + } else { + return r; + } + }); + + // Simulate some processing pause + assertNull(future.getNow(null)); + + final ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get(5L, TimeUnit.SECONDS)); + assertEquals(WebApplicationException.class, ex.getCause().getClass()); + } + + @Test + public void testAsynchronousCall() throws Exception { + wireMockRule.stubFor(get(urlEqualTo("/echo")) + .willReturn(aResponse() + .withHeader("Content-Type", MediaType.APPLICATION_JSON) + .withBody("{ \"message\": \"echo\" }"))); + + final CompletableFuture<Echo> future = echo + .getAsync() + .toCompletableFuture() + .thenApply(s -> { + try { + Thread.sleep(500); + assertThat(Thread.currentThread().getName(), startsWith(prefix)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return s; + }); + + assertNull(future.getNow(null)); + + final Echo result = future.get(5L, TimeUnit.SECONDS); + assertThat(result.getMessage(), equalTo("echo")); + } + + @Test + public void testAsynchronousCallAndContextPropagation() throws Exception { + wireMockRule.stubFor(get(urlEqualTo("/echo")) + .willReturn(aResponse() + .withHeader("Content-Type", MediaType.APPLICATION_JSON) + .withBody("{ \"message\": \"echo\" }"))); + + CONTEXT.set("context-value"); + + final CompletableFuture<Echo> future = echo + .getAsync() + .toCompletableFuture() + .thenApply(s -> { + try { + Thread.sleep(500); + assertThat(Thread.currentThread().getName(), startsWith(prefix)); + assertThat(CONTEXT.get(), equalTo("context-value")); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return s; + }); + + final Echo result = future.get(5L, TimeUnit.SECONDS); + assertThat(result.getMessage(), equalTo("echo")); + } + + @Test + public void testAsynchronousCallMany() throws InterruptedException, ExecutionException, TimeoutException { + wireMockRule.stubFor(get(urlEqualTo("/echo")) + .willReturn(aResponse() + .withHeader("Content-Type", MediaType.APPLICATION_JSON) + .withBody("{ \"message\": \"echo\" }"))); + + final Collection<CompletableFuture<Echo>> futures = new ArrayList<>(); + for (int i = 0; i < 20; ++i) { + futures.add( + echo + .getAsync() + .toCompletableFuture() + .thenApply(s -> { + try { + Thread.sleep(500); + assertThat(Thread.currentThread().getName(), startsWith(prefix)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return s; + }) + ); + } + + CompletableFuture + .allOf(futures.toArray(new CompletableFuture[0])) + .join(); + + for (final CompletableFuture<Echo> future: futures) { + assertThat(future.get().getMessage(), equalTo("echo")); + } + } + + private URI getBaseUri() { + return URI.create("http://localhost:" + wireMockRule.port() + "/echo"); + } + + public static class Echo { + private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } + + @Path("/") + public interface EchoResource { + @GET + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + CompletionStage<Echo> getAsync(); + } + + public static class AsyncInvocationInterceptorFactoryImpl implements AsyncInvocationInterceptorFactory { + @Override + public AsyncInvocationInterceptor newInterceptor() { + return new AsyncInvocationInterceptorImpl(); + } + } + + public static class AsyncInvocationInterceptorImpl implements AsyncInvocationInterceptor { + private String context; + + @Override + public void prepareContext() { + context = CONTEXT.get(); + } + + @Override + public void applyContext() { + CONTEXT.set(context); + } + } + + private static ExecutorService cachedExecutor() { + return Executors.newCachedThreadPool(new ThreadFactory() { + private AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "mp-async-" + counter.incrementAndGet()); + } + }); + } +}