This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 7af3f06713 Fix the thread safety bug of finishing operation for the 
span named "SpringCloudGateway/sendRequest" (#555)
7af3f06713 is described below

commit 7af3f0671399bd56ea03ecf2fa6327ff9e8ca459
Author: gzlicanyi <[email protected]>
AuthorDate: Fri Jun 16 22:25:50 2023 +0800

    Fix the thread safety bug of finishing operation for the span named 
"SpringCloudGateway/sendRequest" (#555)
---
 CHANGES.md                                         |  1 +
 .../gateway/v20x/HttpClientRequestInterceptor.java | 66 ++++++++++------
 ...> AbstractGateway200EnhancePluginDefineV2.java} | 22 ++----
 .../gateway/v20x/define/EnhanceCacheObject.java    |  9 +++
 .../v20x/define/HttpClientInstrumentation.java     | 13 ++--
 .../v20x/HttpClientRequestInterceptorTest.java     | 91 ++++++++++++++++++++++
 6 files changed, 158 insertions(+), 44 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ccff2d786c..1e32c43a88 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
 * Support Jetty 11.x plugin
 * Fix the scenario of using the HBase plugin with spring-data-hadoop.
 * Add RocketMQ 5.x plugin
+* Fix the thread safety bug of finishing operation for the span named 
"SpringCloudGateway/sendRequest"
 
 #### Documentation
 
diff --git 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
index 00613b6024..0bed218b24 100644
--- 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
+++ 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+
 import org.apache.skywalking.apm.agent.core.context.CarrierItem;
 import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
 import org.apache.skywalking.apm.agent.core.context.ContextManager;
@@ -28,8 +29,8 @@ import org.apache.skywalking.apm.agent.core.context.tag.Tags;
 import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
 import 
org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
 import org.reactivestreams.Publisher;
 import reactor.core.publisher.Mono;
@@ -38,13 +39,14 @@ import reactor.ipc.netty.http.client.HttpClientResponse;
 
 import static 
org.apache.skywalking.apm.network.trace.component.ComponentsDefine.SPRING_CLOUD_GATEWAY;
 
-public class HttpClientRequestInterceptor implements 
InstanceMethodsAroundInterceptor {
+public class HttpClientRequestInterceptor implements 
InstanceMethodsAroundInterceptorV2 {
+
     @Override
     public void beforeMethod(final EnhancedInstance objInst,
                              final Method method,
                              final Object[] allArguments,
                              final Class<?>[] argumentsTypes,
-                             final MethodInterceptResult result) throws 
Throwable {
+                             final MethodInvocationContext context) throws 
Throwable {
         
         /*
           In this plug-in, the HttpClientFinalizerSendInterceptor depends on 
the NettyRoutingFilterInterceptor
@@ -54,13 +56,13 @@ public class HttpClientRequestInterceptor implements 
InstanceMethodsAroundInterc
         if (!ContextManager.isActive()) {
             return;
         }
-        
+
         AbstractSpan span = ContextManager.activeSpan();
 
         URL url = new URL((String) allArguments[1]);
         ContextCarrier contextCarrier = new ContextCarrier();
         AbstractSpan abstractSpan = ContextManager.createExitSpan(
-            "SpringCloudGateway/sendRequest", contextCarrier, getPeer(url));
+                "SpringCloudGateway/sendRequest", contextCarrier, 
getPeer(url));
         abstractSpan.prepareForAsync();
         Tags.URL.set(abstractSpan, String.valueOf(allArguments[1]));
         abstractSpan.setLayer(SpanLayer.HTTP);
@@ -80,7 +82,7 @@ public class HttpClientRequestInterceptor implements 
InstanceMethodsAroundInterc
             }
         };
 
-        objInst.setSkyWalkingDynamicField(new EnhanceCacheObject(span, 
abstractSpan));
+        context.setContext(new EnhanceCacheObject(span, abstractSpan));
     }
 
     @Override
@@ -88,28 +90,45 @@ public class HttpClientRequestInterceptor implements 
InstanceMethodsAroundInterc
                               final Method method,
                               final Object[] allArguments,
                               final Class<?>[] argumentsTypes,
-                              final Object ret) {
-        EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject) 
objInst.getSkyWalkingDynamicField();
+                              final Object ret,
+                              MethodInvocationContext context) {
+        EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject) 
context.getContext();
         Mono<HttpClientResponse> responseMono = (Mono<HttpClientResponse>) ret;
         return responseMono.doAfterSuccessOrError(new 
BiConsumer<HttpClientResponse, Throwable>() {
             @Override
             public void accept(final HttpClientResponse httpClientResponse, 
final Throwable throwable) {
+                doAfterSuccessOrError(httpClientResponse, throwable, 
enhanceCacheObject);
+            }
+        });
+    }
 
-                AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
-                if (abstractSpan != null) {
-                    if (throwable != null) {
-                        abstractSpan.log(throwable);
-                    } else if (httpClientResponse.status().code() > 400) {
-                        abstractSpan.errorOccurred();
-                    }
-                    Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan, 
httpClientResponse.status().code());
-                    abstractSpan.asyncFinish();
-                }
+    void doAfterSuccessOrError(HttpClientResponse httpClientResponse, 
Throwable throwable, EnhanceCacheObject enhanceCacheObject) {
+        try {
+            //When executing the beforeMethod method, if the ContextManager is 
inactive, the enhanceCacheObject will be null.
+            if (enhanceCacheObject == null) {
+                return;
+            }
 
-                objInst.setSkyWalkingDynamicField(null);
-                enhanceCacheObject.getFilterSpan().asyncFinish();
+            //The doAfterSuccessOrError method may be executed multiple times.
+            if (enhanceCacheObject.isSpanFinish()) {
+                return;
             }
-        });
+
+            AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
+            if (throwable != null) {
+                abstractSpan.log(throwable);
+            } else if (httpClientResponse.status().code() > 400) {
+                abstractSpan.errorOccurred();
+            }
+            Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan, 
httpClientResponse.status().code());
+
+            abstractSpan.asyncFinish();
+            enhanceCacheObject.getFilterSpan().asyncFinish();
+
+            enhanceCacheObject.setSpanFinish(true);
+        } catch (Throwable e) {
+            //Catch unknown exceptions to avoid interrupting business 
processes.
+        }
     }
 
     private String getPeer(URL url) {
@@ -121,7 +140,8 @@ public class HttpClientRequestInterceptor implements 
InstanceMethodsAroundInterc
                                       final Method method,
                                       final Object[] allArguments,
                                       final Class<?>[] argumentsTypes,
-                                      final Throwable t) {
+                                      final Throwable t,
+                                      MethodInvocationContext context) {
 
     }
 }
diff --git 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/AbstractGateway200EnhancePluginDefineV2.java
similarity index 64%
copy from 
apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
copy to 
apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/AbstractGateway200EnhancePluginDefineV2.java
index 14964a3053..1f07c0ba89 100644
--- 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
+++ 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/AbstractGateway200EnhancePluginDefineV2.java
@@ -17,22 +17,14 @@
 
 package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define;
 
-import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
 
-public class EnhanceCacheObject {
-    private final AbstractSpan filterSpan;
-    private final AbstractSpan sendSpan;
+public abstract class AbstractGateway200EnhancePluginDefineV2 extends 
ClassInstanceMethodsEnhancePluginDefineV2 {
 
-    public EnhanceCacheObject(final AbstractSpan filterSpan, final 
AbstractSpan sendSpan) {
-        this.filterSpan = filterSpan;
-        this.sendSpan = sendSpan;
-    }
-
-    public AbstractSpan getFilterSpan() {
-        return filterSpan;
-    }
-
-    public AbstractSpan getSendSpan() {
-        return sendSpan;
+    @Override
+    protected String[] witnessClasses() {
+        return new String[] {
+            
"org.springframework.cloud.gateway.config.GatewayAutoConfiguration$1"
+        };
     }
 }
diff --git 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
index 14964a3053..e5070ab7a3 100644
--- 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
+++ 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java
@@ -22,6 +22,7 @@ import 
org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
 public class EnhanceCacheObject {
     private final AbstractSpan filterSpan;
     private final AbstractSpan sendSpan;
+    private volatile boolean spanFinish = false;
 
     public EnhanceCacheObject(final AbstractSpan filterSpan, final 
AbstractSpan sendSpan) {
         this.filterSpan = filterSpan;
@@ -35,4 +36,12 @@ public class EnhanceCacheObject {
     public AbstractSpan getSendSpan() {
         return sendSpan;
     }
+
+    public boolean isSpanFinish() {
+        return spanFinish;
+    }
+
+    public void setSpanFinish(boolean spanFinish) {
+        this.spanFinish = spanFinish;
+    }
 }
diff --git 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
index 04031cfd96..b73eaca3f2 100644
--- 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
+++ 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java
@@ -20,13 +20,13 @@ package 
org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define;
 import net.bytebuddy.description.method.MethodDescription;
 import net.bytebuddy.matcher.ElementMatcher;
 import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
-import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import 
org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
 import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static 
org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
 
-public class HttpClientInstrumentation extends 
AbstractGateway200EnhancePluginDefine {
+public class HttpClientInstrumentation extends 
AbstractGateway200EnhancePluginDefineV2 {
 
     @Override
     protected ClassMatch enhanceClass() {
@@ -39,16 +39,16 @@ public class HttpClientInstrumentation extends 
AbstractGateway200EnhancePluginDe
     }
 
     @Override
-    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() 
{
-        return new InstanceMethodsInterceptPoint[] {
-            new InstanceMethodsInterceptPoint() {
+    public InstanceMethodsInterceptV2Point[] 
getInstanceMethodsInterceptV2Points() {
+        return new InstanceMethodsInterceptV2Point[] {
+            new InstanceMethodsInterceptV2Point() {
                 @Override
                 public ElementMatcher<MethodDescription> getMethodsMatcher() {
                     return named("request");
                 }
 
                 @Override
-                public String getMethodsInterceptor() {
+                public String getMethodsInterceptorV2() {
                     return Constants.REQUEST_INTERCEPTOR;
                 }
 
@@ -59,4 +59,5 @@ public class HttpClientInstrumentation extends 
AbstractGateway200EnhancePluginDe
             }
         };
     }
+
 }
diff --git 
a/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptorTest.java
 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptorTest.java
new file mode 100644
index 0000000000..5ec786e6af
--- /dev/null
+++ 
b/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptorTest.java
@@ -0,0 +1,91 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import 
org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import reactor.ipc.netty.http.client.HttpClientResponse;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+@RunWith(TracingSegmentRunner.class)
+public class HttpClientRequestInterceptorTest {
+
+    private HttpClientRequestInterceptor httpClientRequestInterceptor = new 
HttpClientRequestInterceptor();
+
+    @Rule
+    public AgentServiceRule serviceRule = new AgentServiceRule();
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+
+    @SegmentStoragePoint
+    private SegmentStorage segmentStorage;
+
+    private HttpClientResponse httpClientResponse;
+
+    @Before
+    public void setUp() throws Exception {
+
+        httpClientResponse = Mockito.mock(HttpClientResponse.class);
+        HttpResponseStatus httpResponseStatus = 
Mockito.mock(HttpResponseStatus.class);
+
+        Mockito.when(httpResponseStatus.code()).thenReturn(200);
+        
Mockito.when(httpClientResponse.status()).thenReturn(httpResponseStatus);
+    }
+
+    @Test
+    public void testDoAfterSuccessOrError() {
+        AbstractSpan filterSpan = 
ContextManager.createLocalSpan("mockFilterSpan");
+        filterSpan.prepareForAsync();
+        ContextManager.stopSpan(filterSpan);
+
+        AbstractSpan sendSpan = 
ContextManager.createExitSpan("SpringCloudGateway/sendRequest", 
"http://127.0.0.1:80";);
+        sendSpan.prepareForAsync();
+        ContextManager.stopSpan(sendSpan);
+
+        EnhanceCacheObject enhanceCacheObject = new 
EnhanceCacheObject(filterSpan, sendSpan);
+        enhanceCacheObject = spy(enhanceCacheObject);
+
+        //Test the ContextManager is inactive.
+        httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, 
null, null);
+        verify(enhanceCacheObject, Mockito.times(0)).setSpanFinish(true);
+
+        //Test normal scenario.
+        httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, 
null, enhanceCacheObject);
+        verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
+
+        //Test the doAfterSuccessOrError method is executed multiple times.
+        httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, 
null, enhanceCacheObject);
+        verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
+    }
+
+}
\ No newline at end of file

Reply via email to