This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 88e644756b [ISSUE #7988] Refector client trace (#7989)
88e644756b is described below
commit 88e644756bab0ebc01feba53483d831d477f0627
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Tue Apr 2 11:31:30 2024 +0800
[ISSUE #7988] Refector client trace (#7989)
* [ISSUE #7988] Refector client trace
* build trace dispatcher in start method
* setNamespaceV2 for dispatcher
* disable trace for inner traceProducer
* fix tls
---
.../org/apache/rocketmq/client/ClientConfig.java | 32 ++++++++++++
.../client/consumer/DefaultLitePullConsumer.java | 31 +++++------
.../client/consumer/DefaultMQPushConsumer.java | 48 ++++++++---------
.../client/producer/DefaultMQProducer.java | 60 +++++++++-------------
.../client/trace/AsyncTraceDispatcher.java | 11 ++++
.../DefaultMQConsumerWithOpenTracingTest.java | 2 +
.../trace/DefaultMQConsumerWithTraceTest.java | 10 ++--
.../DefaultMQProducerWithOpenTracingTest.java | 2 +
.../trace/DefaultMQProducerWithTraceTest.java | 13 ++---
.../TransactionMQProducerWithOpenTracingTest.java | 2 +
.../trace/TransactionMQProducerWithTraceTest.java | 5 +-
11 files changed, 124 insertions(+), 92 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 8a7beffc70..48c995301a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -98,6 +98,16 @@ public class ClientConfig {
private boolean enableHeartbeatChannelEventListener = true;
+ /**
+ * The switch for message trace
+ */
+ protected boolean enableTrace = true;
+
+ /**
+ * The name value of message trace topic. If not set, the default trace
topic name will be used.
+ */
+ protected String traceTopic;
+
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
@@ -215,6 +225,8 @@ public class ClientConfig {
this.detectInterval = cc.detectInterval;
this.detectTimeout = cc.detectTimeout;
this.namespaceV2 = cc.namespaceV2;
+ this.enableTrace = cc.enableTrace;
+ this.traceTopic = cc.traceTopic;
}
public ClientConfig cloneClientConfig() {
@@ -245,6 +257,8 @@ public class ClientConfig {
cc.detectInterval = detectInterval;
cc.detectTimeout = detectTimeout;
cc.namespaceV2 = namespaceV2;
+ cc.enableTrace = enableTrace;
+ cc.traceTopic = traceTopic;
return cc;
}
@@ -474,6 +488,22 @@ public class ClientConfig {
this.useHeartbeatV2 = useHeartbeatV2;
}
+ public boolean isEnableTrace() {
+ return enableTrace;
+ }
+
+ public void setEnableTrace(boolean enableTrace) {
+ this.enableTrace = enableTrace;
+ }
+
+ public String getTraceTopic() {
+ return traceTopic;
+ }
+
+ public void setTraceTopic(String traceTopic) {
+ this.traceTopic = traceTopic;
+ }
+
@Override
public String toString() {
return "ClientConfig{" +
@@ -505,6 +535,8 @@ public class ClientConfig {
", sendLatencyEnable=" + sendLatencyEnable +
", startDetectorEnable=" + startDetectorEnable +
", enableHeartbeatChannelEventListener=" +
enableHeartbeatChannelEventListener +
+ ", enableTrace=" + enableTrace +
+ ", traceTopic='" + traceTopic + '\'' +
'}';
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index c193c6a42e..3364df48f8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -169,15 +169,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
*/
private TraceDispatcher traceDispatcher = null;
- /**
- * The flag for message trace
- */
- private boolean enableMsgTrace = false;
-
- /**
- * The name value of message trace topic.If you don't config,you can use
the default trace topic name.
- */
- private String customizedTraceTopic;
+ private RPCHook rpcHook;
/**
* Default constructor.
@@ -212,6 +204,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
*/
public DefaultLitePullConsumer(final String consumerGroup, RPCHook
rpcHook) {
this.consumerGroup = consumerGroup;
+ this.rpcHook = rpcHook;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,
rpcHook);
}
@@ -226,6 +219,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
public DefaultLitePullConsumer(final String namespace, final String
consumerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.consumerGroup = consumerGroup;
+ this.rpcHook = rpcHook;
this.enableStreamRequestType = true;
defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this,
rpcHook);
}
@@ -592,15 +586,12 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
return traceDispatcher;
}
- public void setCustomizedTraceTopic(String customizedTraceTopic) {
- this.customizedTraceTopic = customizedTraceTopic;
- }
-
private void setTraceDispatcher() {
- if (isEnableMsgTrace()) {
+ if (enableTrace) {
try {
- AsyncTraceDispatcher traceDispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
customizedTraceTopic, null);
+ AsyncTraceDispatcher traceDispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic,
rpcHook);
traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
+ traceDispatcher.setNamespaceV2(namespaceV2);
this.traceDispatcher = traceDispatcher;
this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
@@ -611,14 +602,18 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
}
public String getCustomizedTraceTopic() {
- return customizedTraceTopic;
+ return traceTopic;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.traceTopic = customizedTraceTopic;
}
public boolean isEnableMsgTrace() {
- return enableMsgTrace;
+ return enableTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
- this.enableMsgTrace = enableMsgTrace;
+ this.enableTrace = enableMsgTrace;
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 502c5ef184..312f4632ca 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -293,6 +293,8 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
// force to use client rebalance
private boolean clientRebalance = true;
+ private RPCHook rpcHook = null;
+
/**
* Default constructor.
*/
@@ -327,6 +329,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
+ this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = new
AllocateMessageQueueAveragely();
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
}
@@ -353,6 +356,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
+ this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
}
@@ -369,18 +373,11 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
+ this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
- if (enableMsgTrace) {
- try {
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
customizedTraceTopic, rpcHook);
- dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
- traceDispatcher = dispatcher;
- this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new
ConsumeMessageTraceHookImpl(traceDispatcher));
- } catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send
msg trace data");
- }
- }
+ this.enableTrace = enableMsgTrace;
+ this.traceTopic = customizedTraceTopic;
}
/**
@@ -419,6 +416,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
+ this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
}
@@ -438,18 +436,11 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
+ this.rpcHook = rpcHook;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
- if (enableMsgTrace) {
- try {
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME,
customizedTraceTopic, rpcHook);
- dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
- traceDispatcher = dispatcher;
- this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new
ConsumeMessageTraceHookImpl(traceDispatcher));
- } catch (Throwable e) {
- log.error("system mqtrace hook init failed ,maybe can't send
msg trace data");
- }
- }
+ this.enableTrace = enableMsgTrace;
+ this.traceTopic = customizedTraceTopic;
}
/**
@@ -464,9 +455,6 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
- if (traceDispatcher instanceof AsyncTraceDispatcher) {
- ((AsyncTraceDispatcher)
traceDispatcher).getTraceProducer().setUseTLS(useTLS);
- }
}
/**
@@ -750,7 +738,21 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(),
this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
+ if (enableTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, traceTopic,
rpcHook);
+ dispatcher.setHostConsumer(this.defaultMQPushConsumerImpl);
+ dispatcher.setNamespaceV2(namespaceV2);
+ traceDispatcher = dispatcher;
+ this.defaultMQPushConsumerImpl.registerConsumeMessageHook(new
ConsumeMessageTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ log.error("system mqtrace hook init failed ,maybe can't send
msg trace data");
+ }
+ }
if (null != traceDispatcher) {
+ if (traceDispatcher instanceof AsyncTraceDispatcher) {
+ ((AsyncTraceDispatcher)
traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
+ }
try {
traceDispatcher.start(this.getNamesrvAddr(),
this.getAccessChannel());
} catch (MQClientException e) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index cabe96ca7b..0abf925a82 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -167,6 +167,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
private int backPressureForAsyncSendSize = 100 * 1024 * 1024;
+ private RPCHook rpcHook = null;
+
/**
* Default constructor.
*/
@@ -202,6 +204,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
+ this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
@@ -243,20 +246,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
boolean enableMsgTrace,
final String customizedTraceTopic) {
this(producerGroup, rpcHook);
- //if client open the message trace feature
- if (enableMsgTrace) {
- try {
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE,
customizedTraceTopic, rpcHook);
- dispatcher.setHostProducer(this.defaultMQProducerImpl);
- traceDispatcher = dispatcher;
- this.defaultMQProducerImpl.registerSendMessageHook(
- new SendMessageTraceHookImpl(traceDispatcher));
- this.defaultMQProducerImpl.registerEndTransactionHook(
- new EndTransactionTraceHookImpl(traceDispatcher));
- } catch (Throwable e) {
- logger.error("system mqtrace hook init failed ,maybe can't
send msg trace data");
- }
- }
+ this.enableTrace = enableMsgTrace;
+ this.traceTopic = customizedTraceTopic;
}
/**
@@ -298,6 +289,7 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
public DefaultMQProducer(final String namespace, final String
producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
+ this.rpcHook = rpcHook;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
produceAccumulator =
MQClientManager.getInstance().getOrCreateProduceAccumulator(this);
}
@@ -318,27 +310,8 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
boolean enableMsgTrace, final String customizedTraceTopic) {
this(namespace, producerGroup, rpcHook);
//if client open the message trace feature
- if (enableMsgTrace) {
- try {
- AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE,
customizedTraceTopic, rpcHook);
- dispatcher.setHostProducer(this.defaultMQProducerImpl);
- traceDispatcher = dispatcher;
- this.defaultMQProducerImpl.registerSendMessageHook(
- new SendMessageTraceHookImpl(traceDispatcher));
- this.defaultMQProducerImpl.registerEndTransactionHook(
- new EndTransactionTraceHookImpl(traceDispatcher));
- } catch (Throwable e) {
- logger.error("system mqtrace hook init failed ,maybe can't
send msg trace data");
- }
- }
- }
-
- @Override
- public void setUseTLS(boolean useTLS) {
- super.setUseTLS(useTLS);
- if (traceDispatcher instanceof AsyncTraceDispatcher) {
- ((AsyncTraceDispatcher)
traceDispatcher).getTraceProducer().setUseTLS(useTLS);
- }
+ this.enableTrace = enableMsgTrace;
+ this.traceTopic = customizedTraceTopic;
}
/**
@@ -356,7 +329,24 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
if (this.produceAccumulator != null) {
this.produceAccumulator.start();
}
+ if (enableTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new
AsyncTraceDispatcher(producerGroup, TraceDispatcher.Type.PRODUCE, traceTopic,
rpcHook);
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
+ dispatcher.setNamespaceV2(this.namespaceV2);
+ traceDispatcher = dispatcher;
+ this.defaultMQProducerImpl.registerSendMessageHook(
+ new SendMessageTraceHookImpl(traceDispatcher));
+ this.defaultMQProducerImpl.registerEndTransactionHook(
+ new EndTransactionTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ logger.error("system mqtrace hook init failed ,maybe can't
send msg trace data");
+ }
+ }
if (null != traceDispatcher) {
+ if (traceDispatcher instanceof AsyncTraceDispatcher) {
+ ((AsyncTraceDispatcher)
traceDispatcher).getTraceProducer().setUseTLS(isUseTLS());
+ }
try {
traceDispatcher.start(this.getNamesrvAddr(),
this.getAccessChannel());
} catch (MQClientException e) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ea423b7176..d44f22616f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -78,6 +78,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private volatile AccessChannel accessChannel = AccessChannel.LOCAL;
private String group;
private Type type;
+ private String namespaceV2;
public AsyncTraceDispatcher(String group, Type type, String
traceTopicName, RPCHook rpcHook) {
// queueSize is greater than or equal to the n power of 2 of value
@@ -144,10 +145,20 @@ public class AsyncTraceDispatcher implements
TraceDispatcher {
this.hostConsumer = hostConsumer;
}
+ public String getNamespaceV2() {
+ return namespaceV2;
+ }
+
+ public void setNamespaceV2(String namespaceV2) {
+ this.namespaceV2 = namespaceV2;
+ }
+
public void start(String nameSrvAddr, AccessChannel accessChannel) throws
MQClientException {
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" +
nameSrvAddr);
+ traceProducer.setNamespaceV2(namespaceV2);
+ traceProducer.setEnableTrace(false);
traceProducer.start();
}
this.accessChannel = accessChannel;
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
index a39ae4a4de..028445ef2d 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -135,6 +135,8 @@ public class DefaultMQConsumerWithOpenTracingTest {
new ConsumeMessageOpenTracingHookImpl(tracer));
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
+ // disable trace to let mock trace work
+ pushConsumer.setEnableTrace(false);
OffsetStore offsetStore = Mockito.mock(OffsetStore.class);
Mockito.when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(0L);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 60aa446bbe..fc63cce1ce 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -128,11 +128,9 @@ public class DefaultMQConsumerWithTraceTest {
normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,
false, "");
customTraceTopicPushConsumer = new
DefaultMQPushConsumer(consumerGroup, true, customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+ pushConsumer.setUseTLS(true);
pushConsumer.setPullInterval(60 * 1000);
- asyncTraceDispatcher = (AsyncTraceDispatcher)
pushConsumer.getTraceDispatcher();
- traceProducer = asyncTraceDispatcher.getTraceProducer();
-
pushConsumer.registerMessageListener(new MessageListenerConcurrently()
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
@@ -157,6 +155,9 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.start();
+ asyncTraceDispatcher = (AsyncTraceDispatcher)
pushConsumer.getTraceDispatcher();
+ traceProducer = asyncTraceDispatcher.getTraceProducer();
+
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory());
@@ -242,9 +243,6 @@ public class DefaultMQConsumerWithTraceTest {
@Test
public void testPushConsumerWithTraceTLS() {
- DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumerGroup", true, null);
- consumer.setUseTLS(true);
- AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher)
consumer.getTraceDispatcher();
Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
index 8fbc70ea44..9ce9d6b494 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
@@ -88,6 +88,8 @@ public class DefaultMQProducerWithOpenTracingTest {
new SendMessageOpenTracingHookImpl(tracer));
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
+ // disable trace to let mock trace work
+ producer.setEnableTrace(false);
producer.start();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index ee17335185..ed680d8e6c 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -92,14 +92,14 @@ public class DefaultMQProducerWithTraceTest {
normalProducer.setNamesrvAddr("127.0.0.1:9877");
customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
- asyncTraceDispatcher = (AsyncTraceDispatcher)
producer.getTraceDispatcher();
- asyncTraceDispatcher.setTraceTopicName(customerTraceTopic);
- asyncTraceDispatcher.getHostProducer();
- asyncTraceDispatcher.getHostConsumer();
- traceProducer = asyncTraceDispatcher.getTraceProducer();
+ producer.setTraceTopic(customerTraceTopic);
+ producer.setUseTLS(true);
producer.start();
+ asyncTraceDispatcher = (AsyncTraceDispatcher)
producer.getTraceDispatcher();
+ traceProducer = asyncTraceDispatcher.getTraceProducer();
+
Field field =
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
@@ -150,9 +150,6 @@ public class DefaultMQProducerWithTraceTest {
@Test
public void testProducerWithTraceTLS() {
- DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp,
true, null);
- producer.setUseTLS(true);
- AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher)
producer.getTraceDispatcher();
Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
index 5646a17dbe..5d4b81d16d 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
@@ -103,6 +103,8 @@ public class TransactionMQProducerWithOpenTracingTest {
producer.getDefaultMQProducerImpl().registerSendMessageHook(new
SendMessageOpenTracingHookImpl(tracer));
producer.getDefaultMQProducerImpl().registerEndTransactionHook(new
EndTransactionOpenTracingHookImpl(tracer));
producer.setTransactionListener(transactionListener);
+ // disable trace to let mock trace work
+ producer.setEnableTrace(false);
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
index 8cf87444c0..9f6036153b 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java
@@ -111,11 +111,12 @@ public class TransactionMQProducerWithTraceTest {
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
- asyncTraceDispatcher = (AsyncTraceDispatcher)
producer.getTraceDispatcher();
- traceProducer = asyncTraceDispatcher.getTraceProducer();
producer.start();
+ asyncTraceDispatcher = (AsyncTraceDispatcher)
producer.getTraceDispatcher();
+ traceProducer = asyncTraceDispatcher.getTraceProducer();
+
Field field =
DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);