This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-java.git
The following commit(s) were added to refs/heads/main by this push:
new 30fb81a fix rocketmq message header properties garbled characters
issue (#54)
30fb81a is described below
commit 30fb81a6de65162ccbba9f689bad0d29496ce0e8
Author: cnScarb <[email protected]>
AuthorDate: Wed Oct 27 09:16:27 2021 +0800
fix rocketmq message header properties garbled characters issue (#54)
---
CHANGES.md | 1 +
.../plugin/rocketMQ/v4/MessageSendInterceptor.java | 5 +-
.../rocketMQ/v4/MessageSendInterceptorTest.java | 54 ++++++++++++++++++++--
3 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 46455b0..bf03ed0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -39,6 +39,7 @@ Release Notes.
* Fix instrumentation v2 API doesn't work for constructor instrumentation.
* Add plugin to support okhttp 2.x
* Optimize okhttp 3.x 4.x plugin to get span time cost precisely
+* Adapt message header properties of RocketMQ 4.9.x
#### Documentation
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
index 50b8d19..b27e438 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
@@ -68,10 +68,13 @@ public class MessageSendInterceptor implements
InstanceMethodsAroundInterceptor
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
+ if (properties.length() > 0 &&
properties.charAt(properties.length() - 1) != PROPERTY_SEPARATOR) {
+ // adapt for RocketMQ 4.9.x or later
+ properties.append(PROPERTY_SEPARATOR);
+ }
properties.append(next.getHeadKey());
properties.append(NAME_VALUE_SEPARATOR);
properties.append(next.getHeadValue());
- properties.append(PROPERTY_SEPARATOR);
}
}
requestHeader.setProperties(properties.toString());
diff --git
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
index e9a9f3a..3f7f395 100644
---
a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
+++
b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
@@ -19,9 +19,13 @@
package org.apache.skywalking.apm.plugin.rocketMQ.v4;
import java.util.List;
+import java.util.Map;
+
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.skywalking.apm.agent.core.context.SW8ExtensionCarrierItem;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.junit.Before;
import org.junit.Rule;
@@ -44,10 +48,17 @@ import
org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;
+import static
org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static
org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class MessageSendInterceptorTest {
@@ -108,8 +119,8 @@ public class MessageSendInterceptorTest {
CommunicationMode.ASYNC,
null
};
- when(messageRequestHeader.getProperties()).thenReturn("");
when(message.getTags()).thenReturn("TagA");
+ stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA" +
PROPERTY_SEPARATOR);
}
@Test
@@ -117,6 +128,13 @@ public class MessageSendInterceptorTest {
messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments,
null, null);
messageSendInterceptor.afterMethod(enhancedInstance, null, arguments,
null, null);
+ Map<String, String> tags = MessageDecoder.string2messageProperties(
+ ((SendMessageRequestHeader) arguments[3]).getProperties());
+ // check original header of TAGS
+ assertThat(tags.get("TAGS"), is("TagA"));
+ // check skywalking header
+ assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME));
+
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
@@ -127,15 +145,27 @@ public class MessageSendInterceptorTest {
SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
SpanAssert.assertComponent(mqSpan,
ComponentsDefine.ROCKET_MQ_PRODUCER);
SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
- verify(messageRequestHeader).setProperties(anyString());
verify(callBack).setSkyWalkingDynamicField(Matchers.any());
}
@Test
+ public void testSendMessageNew() throws Throwable {
+ stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA");
+ testSendMessage();
+ }
+
+ @Test
public void testSendMessageWithoutCallBack() throws Throwable {
messageSendInterceptor.beforeMethod(enhancedInstance, null,
argumentsWithoutCallback, null, null);
messageSendInterceptor.afterMethod(enhancedInstance, null,
argumentsWithoutCallback, null, null);
+ Map<String, String> tags = MessageDecoder.string2messageProperties(
+ ((SendMessageRequestHeader)
argumentsWithoutCallback[3]).getProperties());
+ // check original header of TAGS
+ assertThat(tags.get("TAGS"), is("TagA"));
+ // check skywalking header
+ assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME));
+
assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
@@ -146,7 +176,25 @@ public class MessageSendInterceptorTest {
SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
SpanAssert.assertComponent(mqSpan,
ComponentsDefine.ROCKET_MQ_PRODUCER);
SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
- verify(messageRequestHeader).setProperties(anyString());
}
+ @Test
+ public void testSendMessageWithoutCallBackNew() throws Throwable {
+ stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA");
+ testSendMessageWithoutCallBack();
+ }
+
+ private void stubMessageRequestHeader(String properties) {
+ messageRequestHeader = mock(SendMessageRequestHeader.class,
RETURNS_DEEP_STUBS);
+ doAnswer(invocation -> {
+ String val = (String) invocation.getArguments()[0];
+ when(messageRequestHeader.getProperties()).thenReturn(val);
+ return null;
+ }).when(messageRequestHeader).setProperties(anyString());
+ when(messageRequestHeader.getProperties()).thenCallRealMethod();
+ messageRequestHeader.setProperties(properties);
+
+ arguments[3] = messageRequestHeader;
+ argumentsWithoutCallback[3] = messageRequestHeader;
+ }
}