This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git


The following commit(s) were added to refs/heads/main by this push:
     new 30fb81a  fix rocketmq message header properties garbled characters 
issue (#54)
30fb81a is described below

commit 30fb81a6de65162ccbba9f689bad0d29496ce0e8
Author: cnScarb <[email protected]>
AuthorDate: Wed Oct 27 09:16:27 2021 +0800

    fix rocketmq message header properties garbled characters issue (#54)
---
 CHANGES.md                                         |  1 +
 .../plugin/rocketMQ/v4/MessageSendInterceptor.java |  5 +-
 .../rocketMQ/v4/MessageSendInterceptorTest.java    | 54 ++++++++++++++++++++--
 3 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 46455b0..bf03ed0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -39,6 +39,7 @@ Release Notes.
 * Fix instrumentation v2 API doesn't work for constructor instrumentation.
 * Add plugin to support okhttp 2.x
 * Optimize okhttp 3.x 4.x plugin to get span time cost precisely
+* Adapt message header properties of RocketMQ 4.9.x
 
 #### Documentation
 
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
index 50b8d19..b27e438 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
@@ -68,10 +68,13 @@ public class MessageSendInterceptor implements 
InstanceMethodsAroundInterceptor
         while (next.hasNext()) {
             next = next.next();
             if (!StringUtil.isEmpty(next.getHeadValue())) {
+                if (properties.length() > 0 && 
properties.charAt(properties.length() - 1) != PROPERTY_SEPARATOR) {
+                    // adapt for RocketMQ 4.9.x or later
+                    properties.append(PROPERTY_SEPARATOR);
+                }
                 properties.append(next.getHeadKey());
                 properties.append(NAME_VALUE_SEPARATOR);
                 properties.append(next.getHeadValue());
-                properties.append(PROPERTY_SEPARATOR);
             }
         }
         requestHeader.setProperties(properties.toString());
diff --git 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
index e9a9f3a..3f7f395 100644
--- 
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
+++ 
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
@@ -19,9 +19,13 @@
 package org.apache.skywalking.apm.plugin.rocketMQ.v4;
 
 import java.util.List;
+import java.util.Map;
+
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.skywalking.apm.agent.core.context.SW8ExtensionCarrierItem;
 import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
 import org.junit.Before;
 import org.junit.Rule;
@@ -44,10 +48,17 @@ import 
org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.when;
 
+import static 
org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static 
org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(TracingSegmentRunner.class)
 public class MessageSendInterceptorTest {
@@ -108,8 +119,8 @@ public class MessageSendInterceptorTest {
             CommunicationMode.ASYNC,
             null
         };
-        when(messageRequestHeader.getProperties()).thenReturn("");
         when(message.getTags()).thenReturn("TagA");
+        stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA" + 
PROPERTY_SEPARATOR);
     }
 
     @Test
@@ -117,6 +128,13 @@ public class MessageSendInterceptorTest {
         messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, 
null, null);
         messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, 
null, null);
 
+        Map<String, String> tags = MessageDecoder.string2messageProperties(
+            ((SendMessageRequestHeader) arguments[3]).getProperties());
+        // check original header of TAGS
+        assertThat(tags.get("TAGS"), is("TagA"));
+        // check skywalking header
+        assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME));
+
         assertThat(segmentStorage.getTraceSegments().size(), is(1));
         TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
@@ -127,15 +145,27 @@ public class MessageSendInterceptorTest {
         SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
         SpanAssert.assertComponent(mqSpan, 
ComponentsDefine.ROCKET_MQ_PRODUCER);
         SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
-        verify(messageRequestHeader).setProperties(anyString());
         verify(callBack).setSkyWalkingDynamicField(Matchers.any());
     }
 
     @Test
+    public void testSendMessageNew() throws Throwable {
+        stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA");
+        testSendMessage();
+    }
+
+    @Test
     public void testSendMessageWithoutCallBack() throws Throwable {
         messageSendInterceptor.beforeMethod(enhancedInstance, null, 
argumentsWithoutCallback, null, null);
         messageSendInterceptor.afterMethod(enhancedInstance, null, 
argumentsWithoutCallback, null, null);
 
+        Map<String, String> tags = MessageDecoder.string2messageProperties(
+            ((SendMessageRequestHeader) 
argumentsWithoutCallback[3]).getProperties());
+        // check original header of TAGS
+        assertThat(tags.get("TAGS"), is("TagA"));
+        // check skywalking header
+        assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME));
+
         assertThat(segmentStorage.getTraceSegments().size(), is(1));
         TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
         List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
@@ -146,7 +176,25 @@ public class MessageSendInterceptorTest {
         SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
         SpanAssert.assertComponent(mqSpan, 
ComponentsDefine.ROCKET_MQ_PRODUCER);
         SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
-        verify(messageRequestHeader).setProperties(anyString());
     }
 
+    @Test
+    public void testSendMessageWithoutCallBackNew() throws Throwable {
+        stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA");
+        testSendMessageWithoutCallBack();
+    }
+
+    private void stubMessageRequestHeader(String properties) {
+        messageRequestHeader = mock(SendMessageRequestHeader.class, 
RETURNS_DEEP_STUBS);
+        doAnswer(invocation -> {
+            String val = (String) invocation.getArguments()[0];
+            when(messageRequestHeader.getProperties()).thenReturn(val);
+            return null;
+        }).when(messageRequestHeader).setProperties(anyString());
+        when(messageRequestHeader.getProperties()).thenCallRealMethod();
+        messageRequestHeader.setProperties(properties);
+
+        arguments[3] = messageRequestHeader;
+        argumentsWithoutCallback[3] = messageRequestHeader;
+    }
 }

Reply via email to