dongeforever closed pull request #600: [ISSUE #525] Support the message
track,add the function which supports trace topic name value configurable by
users.
URL: https://github.com/apache/rocketmq/pull/600
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 163897b1c..bd5eaeeab 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -126,7 +126,7 @@ public TopicConfigManager(BrokerController
brokerController) {
}
{
if
(this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) {
- String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+ String topic =
this.brokerController.getBrokerConfig().getMsgTrackTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 0b5ce0521..179a80daa 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -289,9 +289,10 @@ public DefaultMQPushConsumer(final String consumerGroup,
RPCHook rpcHook,
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
* @param msgTraceSwitch switch flag instance for message track trace.
+ * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
- AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
msgTraceSwitch) {
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean
msgTraceSwitch, final String traceTopicName) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
@@ -303,6 +304,11 @@ public DefaultMQPushConsumer(final String consumerGroup,
RPCHook rpcHook,
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME,
"PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE,
TrackTraceDispatcherType.CONSUMER.name());
+ if (!UtilAll.isBlank(traceTopicName)) {
+ tempProperties.put(TrackTraceConstants.TRACE_TOPIC,
traceTopicName);
+ } else {
+ tempProperties.put(TrackTraceConstants.TRACE_TOPIC,
MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
+ }
AsyncArrayDispatcher dispatcher = new
AsyncArrayDispatcher(tempProperties);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
@@ -329,9 +335,10 @@ public DefaultMQPushConsumer(RPCHook rpcHook) {
*
* @param consumerGroup Consumer group.
* @param msgTraceSwitch switch flag instance for message track trace.
+ * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
*/
- public DefaultMQPushConsumer(final String consumerGroup, boolean
msgTraceSwitch) {
- this(consumerGroup, null, new AllocateMessageQueueAveragely(),
msgTraceSwitch);
+ public DefaultMQPushConsumer(final String consumerGroup, boolean
msgTraceSwitch, final String traceTopicName) {
+ this(consumerGroup, null, new AllocateMessageQueueAveragely(),
msgTraceSwitch, traceTopicName);
}
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 8bee795d0..3c33d2eed 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -33,6 +33,7 @@
import
org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -138,7 +139,7 @@
* Default constructor.
*/
public DefaultMQProducer() {
- this(MixAll.DEFAULT_PRODUCER_GROUP, null,false);
+ this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}
/**
@@ -158,8 +159,9 @@ public DefaultMQProducer(final String producerGroup,
RPCHook rpcHook) {
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param msgTraceSwitch switch flag instance for message track trace.
+ * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
*/
- public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
boolean msgTraceSwitch) {
+ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook,
boolean msgTraceSwitch,final String traceTopicName) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message track trace feature
@@ -171,6 +173,11 @@ public DefaultMQProducer(final String producerGroup,
RPCHook rpcHook, boolean ms
tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100");
tempProperties.put(TrackTraceConstants.INSTANCE_NAME,
"PID_CLIENT_INNER_TRACE_PRODUCER");
tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE,
TrackTraceDispatcherType.PRODUCER.name());
+ if (!UtilAll.isBlank(traceTopicName)) {
+ tempProperties.put(TrackTraceConstants.TRACE_TOPIC,
traceTopicName);
+ } else {
+ tempProperties.put(TrackTraceConstants.TRACE_TOPIC,
MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
+ }
AsyncArrayDispatcher dispatcher = new
AsyncArrayDispatcher(tempProperties);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
@@ -197,9 +204,10 @@ public DefaultMQProducer(final String producerGroup) {
*
* @param producerGroup Producer group, see the name-sake field.
* @param msgTraceSwitch switch flag instance for message track trace.
+ * @param traceTopicName the name value of message track trace topic.If
you don't config,you can use the default trace topic name.
*/
- public DefaultMQProducer(final String producerGroup, boolean
msgTraceSwitch) {
- this(producerGroup, null, msgTraceSwitch);
+ public DefaultMQProducer(final String producerGroup, boolean
msgTraceSwitch, final String traceTopicName) {
+ this(producerGroup, null, msgTraceSwitch, traceTopicName);
}
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
index aa49d1c5b..a8868614b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.client.trace.core.common;
-import org.apache.rocketmq.common.MixAll;
-
public class TrackTraceConstants {
public static final String NAMESRV_ADDR = "NAMESRV_ADDR";
public static final String ADDRSRV_URL = "ADDRSRV_URL";
@@ -27,7 +25,7 @@
public static final String WAKE_UP_NUM = "WakeUpNum";
public static final String MAX_MSG_SIZE = "MaxMsgSize";
public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
- public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
+ public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_DISPATCHER_TYPE = "DispatcherType";
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index 1c1c4c381..90b00d414 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -72,6 +72,7 @@
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
+ private String traceTopicName;
public AsyncArrayDispatcher(Properties properties) throws
MQClientException {
dispatcherType =
properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
@@ -83,7 +84,7 @@ public AsyncArrayDispatcher(Properties properties) throws
MQClientException {
this.discardCount = new AtomicLong(0L);
traceContextQueue = new ArrayBlockingQueue<TrackTraceContext>(1024);
appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
-
+ traceTopicName =
properties.getProperty(TrackTraceConstants.TRACE_TOPIC);
this.traceExecuter = new ThreadPoolExecutor(//
10, //
20, //
@@ -94,6 +95,14 @@ public AsyncArrayDispatcher(Properties properties) throws
MQClientException {
traceProducer =
TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
}
+ public String getTraceTopicName() {
+ return traceTopicName;
+ }
+
+ public void setTraceTopicName(String traceTopicName) {
+ this.traceTopicName = traceTopicName;
+ }
+
public DefaultMQProducer getTraceProducer() {
return traceProducer;
}
@@ -115,7 +124,7 @@ public void setHostConsumer(DefaultMQPushConsumerImpl
hostConsumer) {
}
public void start(Properties properties) throws MQClientException {
-
TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
+ TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,
properties.getProperty(TrackTraceConstants.NAMESRV_ADDR));
this.worker = new Thread(new AsyncRunnable(),
"MQ-AsyncArrayDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
@@ -247,16 +256,14 @@ public void sendTraceData(List<TrackTraceContext>
contextList) {
transBeanList.add(traceData);
}
for (Map.Entry<String, List<TrackTraceTransferBean>> entry :
transBeanMap.entrySet()) {
- //key -> dataTopic(Not trace Topic)
- String dataTopic = entry.getKey();
- flushData(entry.getValue(), dataTopic);
+ flushData(entry.getValue());
}
}
/**
* batch sending data actually
*/
- private void flushData(List<TrackTraceTransferBean> transBeanList,
String topic) {
+ private void flushData(List<TrackTraceTransferBean> transBeanList) {
if (transBeanList.size() == 0) {
return;
}
@@ -292,7 +299,7 @@ private void flushData(List<TrackTraceTransferBean>
transBeanList, String topic)
* @param data the message track trace data in this batch
*/
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
- String topic = TrackTraceConstants.TRACE_TOPIC;
+ String topic = traceTopicName;
final Message message = new Message(topic, data.getBytes());
//keyset of message track trace includes msgId of or original
message
diff --git
a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
index 0c38e2203..c174f462d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java
@@ -20,11 +20,10 @@
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.core.common.TrackTraceBean;
-import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants;
import org.apache.rocketmq.client.trace.core.common.TrackTraceContext;
import org.apache.rocketmq.client.trace.core.common.TrackTraceType;
import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher;
-import org.apache.rocketmq.common.MixAll;
+import
org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher;
import java.util.ArrayList;
public class SendMessageTrackHookImpl implements SendMessageHook {
@@ -43,7 +42,7 @@ public String hookName() {
@Override
public void sendMessageBefore(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded
- if (context == null ||
context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) {
+ if (context == null ||
context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher)
localDispatcher).getTraceTopicName())) {
return;
}
//build the context content of TuxeTraceContext
@@ -67,7 +66,7 @@ public void sendMessageBefore(SendMessageContext context) {
@Override
public void sendMessageAfter(SendMessageContext context) {
//if it is message track trace data,then it doesn't recorded
- if (context == null ||
context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC)
+ if (context == null ||
context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher)
localDispatcher).getTraceTopicName())
|| context.getMqTraceContext() == null) {
return;
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
index 27c10dad5..08382dfe8 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java
@@ -100,21 +100,23 @@
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
private DefaultMQPushConsumer normalPushConsumer;
+ private DefaultMQPushConsumer customTraceTopicpushConsumer;
+
private AsyncArrayDispatcher asyncArrayDispatcher;
private MQClientInstance mQClientTraceFactory;
@Mock
private MQClientAPIImpl mQClientTraceAPIImpl;
private DefaultMQProducer traceProducer;
-
+ private String customerTraceTopic = "rmq_track_trace_topic_12345";
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
- pushConsumer = new DefaultMQPushConsumer(consumerGroup,true);
+ pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,"");
consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis();
- normalPushConsumer = new
DefaultMQPushConsumer(consumerGroupNormal,false);
-
+ normalPushConsumer = new
DefaultMQPushConsumer(consumerGroupNormal,false,"");
+ customTraceTopicpushConsumer = new
DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
index c3757a8af..9460d6834 100644
---
a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java
@@ -76,6 +76,7 @@
private AsyncArrayDispatcher asyncArrayDispatcher;
private DefaultMQProducer producer;
+ private DefaultMQProducer customTraceTopicproducer;
private DefaultMQProducer traceProducer;
private DefaultMQProducer normalProducer;
@@ -84,16 +85,22 @@
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix +
System.currentTimeMillis();
private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC +
System.currentTimeMillis();
-
+ private String customerTraceTopic = "rmq_track_trace_topic_12345";
+
@Before
public void init() throws Exception {
- normalProducer = new DefaultMQProducer(producerGroupTemp,false);
- producer = new DefaultMQProducer(producerGroupTemp,true);
+ customTraceTopicproducer = new
DefaultMQProducer(producerGroupTemp,false, customerTraceTopic);
+ normalProducer = new DefaultMQProducer(producerGroupTemp,false,"");
+ producer = new DefaultMQProducer(producerGroupTemp,true,"");
producer.setNamesrvAddr("127.0.0.1:9876");
normalProducer.setNamesrvAddr("127.0.0.1:9877");
+ customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878");
message = new Message(topic, new byte[] {'a', 'b' ,'c'});
asyncArrayDispatcher =
(AsyncArrayDispatcher)producer.getTraceDispatcher();
+ asyncArrayDispatcher.setTraceTopicName(customerTraceTopic);
+ asyncArrayDispatcher.getHostProducer();
+ asyncArrayDispatcher.getHostConsumer();
traceProducer = asyncArrayDispatcher.getTraceProducer();
producer.start();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 77d492ecb..11c1fcb91 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -53,6 +53,8 @@
private String messageStorePlugIn = "";
@ImportantField
private boolean autoTraceBrokerEnable = false;
+ @ImportantField
+ private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC;
/**
* thread numbers for send message thread pool, since spin lock will be
used by default since 4.0.x, the default
* value is 1.
@@ -741,4 +743,12 @@ public boolean isAutoTraceBrokerEnable() {
public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) {
this.autoTraceBrokerEnable = autoTraceBrokerEnable;
}
+
+ public String getMsgTrackTopicName() {
+ return msgTrackTopicName;
+ }
+
+ public void setMsgTrackTopicName(String msgTrackTopicName) {
+ this.msgTrackTopicName = msgTrackTopicName;
+ }
}
diff --git
a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
index 0fb9b3afb..8b10eef2a 100644
--- a/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/BrokerConfigTest.java
@@ -27,4 +27,24 @@ public void testConsumerFallBehindThresholdOverflow() {
long expect = 1024L * 1024 * 1024 * 16;
assertThat(new
BrokerConfig().getConsumerFallbehindThreshold()).isEqualTo(expect);
}
+
+ @Test
+ public void testBrokerConfigAttribute() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setNamesrvAddr("127.0.0.1:9876");
+ brokerConfig.setAutoCreateTopicEnable(false);
+ brokerConfig.setAutoTraceBrokerEnable(true);
+ brokerConfig.setBrokerName("broker-a");
+ brokerConfig.setBrokerId(0);
+ brokerConfig.setBrokerClusterName("DefaultCluster");
+ brokerConfig.setMsgTrackTopicName("RMQ_SYS_TRACK_TRACE_TOPIC4");
+
assertThat(brokerConfig.getBrokerClusterName()).isEqualTo("DefaultCluster");
+ assertThat(brokerConfig.getNamesrvAddr()).isEqualTo("127.0.0.1:9876");
+
assertThat(brokerConfig.getMsgTrackTopicName()).isEqualTo("RMQ_SYS_TRACK_TRACE_TOPIC4");
+ assertThat(brokerConfig.getBrokerId()).isEqualTo(0);
+ assertThat(brokerConfig.getBrokerName()).isEqualTo("broker-a");
+ assertThat(brokerConfig.isAutoCreateTopicEnable()).isEqualTo(false);
+ assertThat(brokerConfig.isAutoTraceBrokerEnable()).isEqualTo(true);
+
+ }
}
\ No newline at end of file
diff --git
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
index fb8e37fd2..5a513e414 100644
---
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java
@@ -26,7 +26,7 @@
public class TraceProducer {
public static void main(String[] args) throws MQClientException,
InterruptedException {
- DefaultMQProducer producer = new
DefaultMQProducer("ProducerGroupName",true);
+ DefaultMQProducer producer = new
DefaultMQProducer("ProducerGroupName",true, "");
producer.start();
for (int i = 0; i < 128; i++)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
index b9710d4a7..e0e05a8cf 100644
---
a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
+++
b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java
@@ -28,7 +28,8 @@
public class TracePushConsumer {
public static void main(String[] args) throws InterruptedException,
MQClientException {
- DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("CID_JODIE_1",true);
+ //here,we use the default message track trace topic name
+ DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("CID_JODIE_1",true, "");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
diff --git
a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
index 7ba580df9..d2adac53d 100644
---
a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
+++
b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java
@@ -63,7 +63,7 @@ public MQProducer getInstance(String nameServerAddress,
String group) throws MQC
return p;
}
- DefaultMQProducer defaultMQProducer = new
DefaultMQProducer(group,false);
+ DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group);
defaultMQProducer.setNamesrvAddr(nameServerAddress);
MQProducer beforeProducer = null;
beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey,
defaultMQProducer);
diff --git
a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
index 128939385..38904c0bf 100644
---
a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
+++
b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java
@@ -39,7 +39,7 @@
@Before
public void mockLoggerAppender() throws Exception {
- DefaultMQProducer defaultMQProducer = spy(new
DefaultMQProducer("loggerAppender",false));
+ DefaultMQProducer defaultMQProducer = spy(new
DefaultMQProducer("loggerAppender"));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws
Throwable {
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index a828e8ddf..ce739be59 100644
---
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -46,7 +46,7 @@ public void create() {
}
public void create(boolean useTLS) {
- consumer = new DefaultMQPushConsumer(consumerGroup,false);
+ consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
try {
diff --git
a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
index abd1e3b64..66767cc9f 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
@@ -24,7 +24,7 @@
public class ProducerFactory {
public static DefaultMQProducer getRMQProducer(String ns) {
- DefaultMQProducer producer = new
DefaultMQProducer(RandomUtil.getStringByUUID(),false);
+ DefaultMQProducer producer = new
DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setNamesrvAddr(ns);
try {
producer.start();
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 675fc2a81..6bb8caad4 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -213,7 +213,7 @@ public Options buildCommandlineOptions(Options options) {
public void execute(CommandLine commandLine, Options options, RPCHook
rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- DefaultMQProducer defaultMQProducer = new
DefaultMQProducer("ReSendMsgById",false);
+ DefaultMQProducer defaultMQProducer = new
DefaultMQProducer("ReSendMsgById");
defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
index 3debb3d15..9bf09ad41 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
@@ -65,7 +65,7 @@
private final DefaultMQPullConsumer defaultMQPullConsumer = new
DefaultMQPullConsumer(
MixAll.TOOLS_CONSUMER_GROUP);
private final DefaultMQPushConsumer defaultMQPushConsumer = new
DefaultMQPushConsumer(
- MixAll.MONITOR_CONSUMER_GROUP,false);
+ MixAll.MONITOR_CONSUMER_GROUP);
public MonitorService(MonitorConfig monitorConfig, MonitorListener
monitorListener, RPCHook rpcHook) {
this.monitorConfig = monitorConfig;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services