This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push: new 6b35f06d15 Fix gRPC plugin not working for server side in some case (#457) 6b35f06d15 is described below commit 6b35f06d1510e2be6184f3cc87331d9226365718 Author: Kanro <hi...@live.cn> AuthorDate: Wed Feb 15 16:38:24 2023 +0800 Fix gRPC plugin not working for server side in some case (#457) --- CHANGES.md | 1 + .../AbstractServerImplBuilderInstrumentation.java | 7 ++--- .../AbstractServerImplBuilderInterceptor.java | 10 ++++-- .../plugin/grpc/v1/server/ServerInterceptor.java | 36 ++++++++++++++++++++-- .../plugin/grpc/v1/server/TracingServerCall.java | 4 ++- .../grpc/v1/server/TracingServerCallListener.java | 30 +++--------------- 6 files changed, 53 insertions(+), 35 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e0a9a6bc76..23f4db54dc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,6 +15,7 @@ Release Notes. * Add plugin to support ClickHouse JDBC driver (0.3.2.*). * Refactor kotlin coroutine plugin with CoroutineContext. * Fix OracleURLParser ignoring actual port when :SID is absent. +* Change gRPC instrumentation point to fix plugin not working for server side. #### Documentation * Update docs of Tracing APIs, reorganize the API docs into six parts. diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java index e0823d9301..f32987f4bc 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/define/AbstractServerImplBuilderInstrumentation.java @@ -26,15 +26,14 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInst import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; import static net.bytebuddy.matcher.ElementMatchers.named; -import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; +import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { public static final String ENHANCE_CLASS = "io.grpc.internal.AbstractServerImplBuilder"; - public static final String ENHANCE_METHOD = "addService"; + public static final String ENHANCE_METHOD = "build"; public static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.grpc.v1.server.AbstractServerImplBuilderInterceptor"; - public static final String ARGUMENT_TYPE = "io.grpc.ServerServiceDefinition"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { @@ -47,7 +46,7 @@ public class AbstractServerImplBuilderInstrumentation extends ClassInstanceMetho new InstanceMethodsInterceptPoint() { @Override public ElementMatcher<MethodDescription> getMethodsMatcher() { - return named(ENHANCE_METHOD).and(takesArgumentWithType(0, ARGUMENT_TYPE)); + return named(ENHANCE_METHOD).and(takesNoArguments()); } @Override diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java index 605c785bd6..c02cf566de 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/AbstractServerImplBuilderInterceptor.java @@ -18,8 +18,7 @@ package org.apache.skywalking.apm.plugin.grpc.v1.server; -import io.grpc.ServerInterceptors; -import io.grpc.ServerServiceDefinition; +import io.grpc.ServerBuilder; import java.lang.reflect.Method; @@ -34,7 +33,12 @@ public class AbstractServerImplBuilderInterceptor implements InstanceMethodsArou @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) { - allArguments[0] = ServerInterceptors.intercept((ServerServiceDefinition) allArguments[0], new ServerInterceptor()); + if (objInst.getSkyWalkingDynamicField() == null) { + ServerBuilder<?> builder = (ServerBuilder) objInst; + ServerInterceptor interceptor = new ServerInterceptor(); + builder.intercept(interceptor); + objInst.setSkyWalkingDynamicField(interceptor); + } } @Override diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java index e5c94fbf6b..f970d813f0 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java @@ -18,14 +18,27 @@ package org.apache.skywalking.apm.plugin.grpc.v1.server; +import io.grpc.Context; +import io.grpc.Contexts; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import org.apache.skywalking.apm.agent.core.context.CarrierItem; import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextManager; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; import org.apache.skywalking.apm.util.StringUtil; +import static org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil.formatOperationName; + public class ServerInterceptor implements io.grpc.ServerInterceptor { + + static final Context.Key<ContextSnapshot> CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot"); + static final Context.Key<AbstractSpan> ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span"); + @Override public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call, Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) { @@ -38,7 +51,26 @@ public class ServerInterceptor implements io.grpc.ServerInterceptor { next.setHeadValue(contextValue); } } - return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call), headers), call - .getMethodDescriptor(), contextCarrier); + + final AbstractSpan span = ContextManager + .createEntrySpan(formatOperationName(call.getMethodDescriptor()), contextCarrier); + span.setComponent(ComponentsDefine.GRPC); + span.setLayer(SpanLayer.RPC_FRAMEWORK); + ContextSnapshot contextSnapshot = ContextManager.capture(); + AbstractSpan asyncSpan = span.prepareForAsync(); + + Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan); + + ServerCall.Listener<REQUEST> listener = Contexts.interceptCall( + context, + new TracingServerCall<>(call), + headers, + (serverCall, metadata) -> new TracingServerCallListener<>( + handler.startCall(serverCall, metadata), + serverCall.getMethodDescriptor() + ) + ); + ContextManager.stopSpan(asyncSpan); + return listener; } } diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java index 66935421b6..3361c4b7db 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java @@ -49,7 +49,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - + ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); try { super.sendMessage(message); } catch (Throwable t) { @@ -68,6 +68,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); + ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); switch (status.getCode()) { case OK: break; @@ -93,6 +94,7 @@ public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.S break; } Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name()); + ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish(); try { super.close(status, trailers); diff --git a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java index 3b0c830a94..cb19e86c6c 100644 --- a/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java +++ b/apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java @@ -21,9 +21,7 @@ package org.apache.skywalking.apm.plugin.grpc.v1.server; import io.grpc.ForwardingServerCallListener; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; -import org.apache.skywalking.apm.agent.core.context.ContextCarrier; import org.apache.skywalking.apm.agent.core.context.ContextManager; -import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; import org.apache.skywalking.apm.network.trace.component.ComponentsDefine; @@ -36,18 +34,11 @@ import org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil; public class TracingServerCallListener<REQUEST> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> { private final MethodDescriptor.MethodType methodType; private final String operationPrefix; - private final String operation; - private final ContextCarrier contextCarrier; - private AbstractSpan asyncSpan; - private ContextSnapshot contextSnapshot; - - protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor, ContextCarrier contextCarrier) { + protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor) { super(delegate); this.methodType = descriptor.getType(); this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER; - this.operation = OperationNameFormatUtil.formatOperationName(descriptor); - this.contextCarrier = contextCarrier; } @Override @@ -57,7 +48,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(contextSnapshot); + ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); try { super.onMessage(message); } catch (Throwable t) { @@ -73,13 +64,10 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList @Override public void onCancel() { - if (contextSnapshot == null) { - return; - } final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(contextSnapshot); + ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); try { super.onCancel(); } catch (Throwable t) { @@ -87,7 +75,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList throw t; } finally { ContextManager.stopSpan(); - asyncSpan.asyncFinish(); + ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish(); } } @@ -96,7 +84,7 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME); span.setComponent(ComponentsDefine.GRPC); span.setLayer(SpanLayer.RPC_FRAMEWORK); - ContextManager.continued(contextSnapshot); + ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get()); try { super.onHalfClose(); } catch (Throwable t) { @@ -110,18 +98,10 @@ public class TracingServerCallListener<REQUEST> extends ForwardingServerCallList @Override public void onComplete() { super.onComplete(); - asyncSpan.asyncFinish(); } @Override public void onReady() { - final AbstractSpan span = ContextManager.createEntrySpan(operation, contextCarrier); - span.setComponent(ComponentsDefine.GRPC); - span.setLayer(SpanLayer.RPC_FRAMEWORK); - contextSnapshot = ContextManager.capture(); - asyncSpan = span.prepareForAsync(); - ContextManager.stopSpan(asyncSpan); - super.onReady(); } } \ No newline at end of file