zhouxinyu commented on a change in pull request #255: [ISSUE #254] Add message
statistical log on DLQ for security audit
URL: https://github.com/apache/rocketmq/pull/255#discussion_r177698473
##########
File path:
broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
##########
@@ -226,4 +238,75 @@ public Object answer(InvocationOnMock invocation) throws
Throwable {
assertThat(response[0].getCode()).isEqualTo(responseCode);
assertThat(response[0].getOpaque()).isEqualTo(request.getOpaque());
}
+
+ @Test
+ public void testDlqStatLog() throws RemotingCommandException,
NoSuchFieldException, IllegalAccessException {
+ String topic = "TopicA";
+ String groupName = "TestDLQGroup";
+ String msgId = "msgid4r3nufeu4gtbfy3gf3fy43g4";
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setRetryQueueNums(1);
+ subscriptionGroupConfig.setConsumeEnable(true);
+ subscriptionGroupConfig.setGroupName(groupName);
+ subscriptionGroupConfig.setRetryQueueNums(5);
+
+ final SubscriptionGroupManager subscriptionGroupManager = new
SubscriptionGroupManager();
+
subscriptionGroupManager.getSubscriptionGroupTable().put(subscriptionGroupConfig.getGroupName(),subscriptionGroupConfig);
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ return subscriptionGroupManager;
+ }
+ }).when(brokerController).getSubscriptionGroupManager();
+ final StringBuilder resultCollector = new StringBuilder();
+
+ MessageExt messageExt = new MessageExt();
+ messageExt.setTopic(topic);
+ messageExt.setQueueId(0);
+ messageExt.setBody("simple message".getBytes());
+ messageExt.setCommitLogOffset(876867867L);
+ messageExt.setReconsumeTimes(16);
+ messageExt.setMsgId(msgId);
+ messageExt.setStoreTimestamp(System.currentTimeMillis());
+ messageExt.putUserProperty("test222","gggg");
+
messageExt.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,msgId);
+
messageExt.getProperties().put(MessageConst.PROPERTY_RETRY_TOPIC,topic);
+
when(messageStore.lookMessageByOffset(anyLong())).thenReturn(messageExt);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
+ Object[] arguments = invocationOnMock.getArguments();
+ resultCollector.append(arguments[0]);
+ return null;
+ }
+ }).when(dlqLogger).info(anyString());
+
+ Field dlqLoggerField =
SendMessageProcessor.class.getDeclaredField("dlqLogger");
+ dlqLoggerField.setAccessible(true);
+ dlqLoggerField.set(sendMessageProcessor,dlqLogger);
+
+
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+
+ ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader =
new ConsumerSendMsgBackRequestHeader();
+ consumerSendMsgBackRequestHeader.setGroup(groupName);
+ consumerSendMsgBackRequestHeader.setMaxReconsumeTimes(3);
+ consumerSendMsgBackRequestHeader.setOffset(876867867L);
+ consumerSendMsgBackRequestHeader.setDelayLevel(10);
+ consumerSendMsgBackRequestHeader.setOriginTopic("%RETRY%"+groupName);
+ consumerSendMsgBackRequestHeader.setOriginMsgId(msgId);
+
+ RemotingCommand requestCommand =
RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK,
consumerSendMsgBackRequestHeader);
+ requestCommand.addExtField("group",groupName);
+ requestCommand.addExtField("maxReconsumeTimes","3");
+ requestCommand.addExtField("offset","876867867");
+ requestCommand.addExtField("delayLevel","10");
+ requestCommand.addExtField("originTopic","%RETRY%"+groupName);
+ requestCommand.addExtField("originMsgId",msgId);
+ sendMessageProcessor.processRequest(null,requestCommand);
+ String result = resultCollector.toString();
+ Assert.assertTrue(result.contains(msgId));
+ Assert.assertTrue(result.contains(topic));
Review comment:
Can we use the unified `assertThat` of `assertj`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services