Repository: cxf Updated Branches: refs/heads/master fea260561 -> dc6fe383b
CXF-6360: Integration with Apache HTrace. Fixing asynchronous invocation flow by detaching span in request filter (work in progress) Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/dc6fe383 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/dc6fe383 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/dc6fe383 Branch: refs/heads/master Commit: dc6fe383b095c6823e5de707b1c091556c9c378a Parents: fea2605 Author: reta <[email protected]> Authored: Tue Aug 11 19:23:23 2015 -0400 Committer: reta <[email protected]> Committed: Tue Aug 11 19:23:23 2015 -0400 ---------------------------------------------------------------------- .../java/demo/jaxrs/tracing/server/Catalog.java | 14 ++++-- .../tracing/htrace/AbstractHTraceProvider.java | 53 +++++++++++++++++--- .../cxf/systest/jaxrs/tracing/BookStore.java | 26 ++++++++++ .../jaxrs/tracing/htrace/HTraceTracingTest.java | 24 +++++++++ 4 files changed, 106 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/dc6fe383/distribution/src/main/release/samples/jax_rs/tracing_htrace/src/main/java/demo/jaxrs/tracing/server/Catalog.java ---------------------------------------------------------------------- diff --git a/distribution/src/main/release/samples/jax_rs/tracing_htrace/src/main/java/demo/jaxrs/tracing/server/Catalog.java b/distribution/src/main/release/samples/jax_rs/tracing_htrace/src/main/java/demo/jaxrs/tracing/server/Catalog.java index 1761bb8..d761d1a 100644 --- a/distribution/src/main/release/samples/jax_rs/tracing_htrace/src/main/java/demo/jaxrs/tracing/server/Catalog.java +++ b/distribution/src/main/release/samples/jax_rs/tracing_htrace/src/main/java/demo/jaxrs/tracing/server/Catalog.java @@ -22,12 +22,12 @@ package demo.jaxrs.tracing.server; import java.io.IOException; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.json.Json; -import javax.json.JsonArray; import javax.json.JsonObject; import javax.ws.rs.DELETE; import javax.ws.rs.FormParam; @@ -37,6 +37,8 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -97,8 +99,14 @@ public class Catalog { @GET @Produces(MediaType.APPLICATION_JSON) - public JsonArray getBooks() throws IOException { - return store.scan(); + public void getBooks(@Suspended final AsyncResponse response) throws IOException { + executor.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + response.resume(store.scan()); + return null; + } + }); } @GET http://git-wip-us.apache.org/repos/asf/cxf/blob/dc6fe383/integration/tracing/tracing-htrace/src/main/java/org/apache/cxf/tracing/htrace/AbstractHTraceProvider.java ---------------------------------------------------------------------- diff --git a/integration/tracing/tracing-htrace/src/main/java/org/apache/cxf/tracing/htrace/AbstractHTraceProvider.java b/integration/tracing/tracing-htrace/src/main/java/org/apache/cxf/tracing/htrace/AbstractHTraceProvider.java index dd9f3ed..fba817b 100644 --- a/integration/tracing/tracing-htrace/src/main/java/org/apache/cxf/tracing/htrace/AbstractHTraceProvider.java +++ b/integration/tracing/tracing-htrace/src/main/java/org/apache/cxf/tracing/htrace/AbstractHTraceProvider.java @@ -18,11 +18,16 @@ */ package org.apache.cxf.tracing.htrace; +import java.lang.annotation.Annotation; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import javax.ws.rs.container.ResourceInfo; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; + import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.helpers.CastUtils; import org.apache.cxf.tracing.AbstractTracingProvider; @@ -38,6 +43,7 @@ public abstract class AbstractHTraceProvider extends AbstractTracingProvider { protected static final String TRACE_SPAN = "org.apache.cxf.tracing.htrace.span"; private final Sampler< ? > sampler; + @Context private ResourceInfo resourceInfo; public AbstractHTraceProvider(final Sampler< ? > sampler) { this.sampler = sampler; @@ -54,15 +60,24 @@ public abstract class AbstractHTraceProvider extends AbstractTracingProvider { final long spanId = getFirstValueOrDefault(requestHeaders, getSpanIdHeader(), Tracer.DONT_TRACE.spanId); + TraceScope traceScope = null; if (traceId == Tracer.DONT_TRACE.traceId || spanId == Tracer.DONT_TRACE.spanId) { - return Trace.startSpan(path, (Sampler< TraceInfo >)sampler); - } + traceScope = Trace.startSpan(path, (Sampler< TraceInfo >)sampler); + } else { + traceScope = Trace.startSpan(path, new MilliSpan + .Builder() + .spanId(spanId) + .traceId(traceId) + .build()); + } - return Trace.startSpan(path, new MilliSpan - .Builder() - .spanId(spanId) - .traceId(traceId) - .build()); + // If the JAX-RS resource is using asynchronous processing mode, the trace + // scope will be closed in another thread and as such should be detached. + if (isAsyncResponse()) { + traceScope.detach(); + } + + return traceScope; } protected void stopTraceSpan(final Map<String, List<String>> requestHeaders, @@ -78,10 +93,32 @@ public abstract class AbstractHTraceProvider extends AbstractTracingProvider { } if (span != null) { - span.close(); + // If the JAX-RS resource is using asynchronous processing mode, the trace + // scope has been created in another thread and should be re-attached to the current + // one. + if (span.isDetached()) { + final TraceScope continueSpan = Trace.continueSpan(span.getSpan()); + continueSpan.close(); + } else { + span.close(); + } } } + private boolean isAsyncResponse() { + if (resourceInfo != null) { + for (final Annotation[] annotations: resourceInfo.getResourceMethod().getParameterAnnotations()) { + for (final Annotation annotation: annotations) { + if (annotation.annotationType().equals(Suspended.class)) { + return true; + } + } + } + } + + return false; + } + private static Long getFirstValueOrDefault(final Map<String, List<String>> headers, final String header, final long defaultValue) { List<String> value = headers.get(header); http://git-wip-us.apache.org/repos/asf/cxf/blob/dc6fe383/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/BookStore.java ---------------------------------------------------------------------- diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/BookStore.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/BookStore.java index 161202c..186b364 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/BookStore.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/BookStore.java @@ -29,6 +29,8 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -55,6 +57,30 @@ public class BookStore { } @GET + @Path("/books/async") + @Produces(MediaType.APPLICATION_JSON) + public void getBooksAsync(@Suspended final AsyncResponse response) { + executor.submit( + tracer.wrap("Processing books", new Traceable<Void>() { + @Override + public Void call(final TracerContext context) throws Exception { + // Simulate some running job + Thread.sleep(100); + + response.resume( + Arrays.asList( + new Book("Apache CXF in Action", UUID.randomUUID().toString()), + new Book("Mastering Apache CXF", UUID.randomUUID().toString()) + ) + ); + + return null; + } + }) + ); + } + + @GET @Path("/book/{id}") @Produces(MediaType.APPLICATION_JSON) public Book getBook(@PathParam("id") final String id) { http://git-wip-us.apache.org/repos/asf/cxf/blob/dc6fe383/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java ---------------------------------------------------------------------- diff --git a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java index 4e9ff85..35938a4 100644 --- a/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java +++ b/systests/tracing/src/test/java/org/apache/cxf/systest/jaxrs/tracing/htrace/HTraceTracingTest.java @@ -160,6 +160,30 @@ public class HTraceTracingTest extends AbstractBusClientServerTestBase { assertTrue(r.getHeaders().containsKey(TracerHeaders.DEFAULT_HEADER_SPAN_ID)); } + @Test + public void testThatNewInnerSpanIsCreatedUsingAsyncInvocation() { + final Response r = createWebClient("/bookstore/books/async") + .header(TracerHeaders.DEFAULT_HEADER_TRACE_ID, 10L) + .header(TracerHeaders.DEFAULT_HEADER_SPAN_ID, 20L) + .get(); + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + + assertThat(TestSpanReceiver.getAllSpans().size(), equalTo(1)); + assertThat(TestSpanReceiver.getAllSpans().get(0).getDescription(), equalTo("bookstore/books/async")); + + assertThat((String)r.getHeaders().getFirst(TracerHeaders.DEFAULT_HEADER_TRACE_ID), equalTo("10")); + assertThat((String)r.getHeaders().getFirst(TracerHeaders.DEFAULT_HEADER_SPAN_ID), equalTo("20")); + } + + @Test + public void testThatNewSpanIsCreatedUsingAsyncInvocation() { + final Response r = createWebClient("/bookstore/books/async").get(); + assertEquals(Status.OK.getStatusCode(), r.getStatus()); + + assertThat(TestSpanReceiver.getAllSpans().size(), equalTo(1)); + assertThat(TestSpanReceiver.getAllSpans().get(0).getDescription(), equalTo("bookstore/books/async")); + } + protected WebClient createWebClient(final String url, final Object ... providers) { return WebClient .create("http://localhost:" + PORT + url, Arrays.asList(providers))
