This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 524ca4e862 [ISSUE #8460] Improve the pop revive process when reading
biz messages from a remote broker - part2 (#8494)
524ca4e862 is described below
commit 524ca4e862b3bb4b5f713ba09dac106beff0ce98
Author: imzs <[email protected]>
AuthorDate: Wed Aug 14 13:48:44 2024 +0800
[ISSUE #8460] Improve the pop revive process when reading biz messages from
a remote broker - part2 (#8494)
---
.../rocketmq/broker/failover/EscapeBridge.java | 39 ++++++--
.../broker/processor/PopReviveService.java | 22 ++---
.../rocketmq/broker/failover/EscapeBridgeTest.java | 106 +++++++++++++++++++--
.../broker/processor/PopReviveServiceTest.java | 45 ++++++++-
.../org/apache/rocketmq/common/BrokerConfig.java | 11 +++
5 files changed, 196 insertions(+), 27 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
index 7df49f8c47..762d917d64 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java
@@ -48,9 +48,11 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.tieredstore.TieredMessageStore;
public class EscapeBridge {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -99,7 +101,7 @@ public class EscapeBridge {
try {
messageExt.setWaitStoreMsgOK(false);
- final SendResult sendResult =
putMessageToRemoteBroker(messageExt);
+ final SendResult sendResult =
putMessageToRemoteBroker(messageExt, null);
return transformSendResult2PutResult(sendResult);
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
@@ -112,7 +114,10 @@ public class EscapeBridge {
}
}
- private SendResult putMessageToRemoteBroker(MessageExtBrokerInner
messageExt) {
+ public SendResult putMessageToRemoteBroker(MessageExtBrokerInner
messageExt, String brokerNameToSend) {
+ if
(this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend))
{ // not remote broker
+ return null;
+ }
final boolean isTransHalfMessage =
TransactionalMessageUtil.buildHalfTopic().equals(messageExt.getTopic());
MessageExtBrokerInner messageToPut = messageExt;
if (isTransHalfMessage) {
@@ -125,12 +130,26 @@ public class EscapeBridge {
return null;
}
- final MessageQueue mqSelected =
topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());
-
- messageToPut.setQueueId(mqSelected.getQueueId());
+ final MessageQueue mqSelected;
+ if (StringUtils.isEmpty(brokerNameToSend)) {
+ mqSelected =
topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());
+ messageToPut.setQueueId(mqSelected.getQueueId());
+ brokerNameToSend = mqSelected.getBrokerName();
+ if
(this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend))
{
+ LOG.warn("putMessageToRemoteBroker failed, remote broker not
found. Topic: {}, MsgId: {}, Broker: {}",
+ messageExt.getTopic(), messageExt.getMsgId(),
brokerNameToSend);
+ return null;
+ }
+ } else {
+ mqSelected = new MessageQueue(messageExt.getTopic(),
brokerNameToSend, messageExt.getQueueId());
+ }
- final String brokerNameToSend = mqSelected.getBrokerName();
final String brokerAddrToSend =
this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
+ if (null == brokerAddrToSend) {
+ LOG.warn("putMessageToRemoteBroker failed, remote broker address
not found. Topic: {}, MsgId: {}, Broker: {}",
+ messageExt.getTopic(), messageExt.getMsgId(),
brokerNameToSend);
+ return null;
+ }
final long beginTimestamp = System.currentTimeMillis();
try {
@@ -279,8 +298,12 @@ public class EscapeBridge {
}
List<MessageExt> list = decodeMsgList(result,
deCompressBody);
if (list == null || list.isEmpty()) {
- LOG.warn("Can not get msg , topic {}, offset {},
queueId {}, result is {}", topic, offset, queueId, result);
- return Triple.of(null, "Can not get msg", false); //
local store, so no retry
+ // OFFSET_FOUND_NULL returned by TieredMessageStore
indicates exception occurred
+ boolean needRetry =
GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
+ && messageStore instanceof TieredMessageStore;
+ LOG.warn("Can not get msg , topic {}, offset {},
queueId {}, needRetry {}, result is {}",
+ topic, offset, queueId, needRetry, result);
+ return Triple.of(null, "Can not get msg", needRetry);
}
return Triple.of(list.get(0), "", false);
});
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 114d094600..4b141d2910 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -199,9 +199,8 @@ public class PopReviveService extends ServiceThread {
}
// Triple<MessageExt, info, needRetry>
- private CompletableFuture<Triple<MessageExt, String, Boolean>>
getBizMessage(String topic, long offset, int queueId,
- String brokerName) {
- return this.brokerController.getEscapeBridge().getMessageAsync(topic,
offset, queueId, brokerName, false);
+ public CompletableFuture<Triple<MessageExt, String, Boolean>>
getBizMessage(PopCheckPoint popCheckPoint, long offset) {
+ return
this.brokerController.getEscapeBridge().getMessageAsync(popCheckPoint.getTopic(),
offset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName(), false);
}
public PullResult getMessage(String group, String topic, int queueId, long
offset, int nums,
@@ -358,7 +357,7 @@ public class PopReviveService extends ServiceThread {
if (point.getTopic() == null || point.getCId() == null) {
continue;
}
- map.put(point.getTopic() + point.getCId() +
point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
+ map.put(point.getTopic() + point.getCId() +
point.getQueueId() + point.getStartOffset() + point.getPopTime() +
point.getBrokerName(), point);
PopMetricsManager.incPopReviveCkGetCount(point, queueId);
point.setReviveOffset(messageExt.getQueueOffset());
if (firstRt == 0) {
@@ -371,7 +370,7 @@ public class PopReviveService extends ServiceThread {
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
- String mergeKey = ackMsg.getTopic() +
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() +
ackMsg.getPopTime();
+ String mergeKey = ackMsg.getTopic() +
ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() +
ackMsg.getPopTime() + ackMsg.getBrokerName();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
@@ -396,7 +395,7 @@ public class PopReviveService extends ServiceThread {
BatchAckMsg bAckMsg = JSON.parseObject(raw,
BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg,
queueId);
- String mergeKey = bAckMsg.getTopic() +
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() +
bAckMsg.getPopTime();
+ String mergeKey = bAckMsg.getTopic() +
bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() +
bAckMsg.getPopTime() + bAckMsg.getBrokerName();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if
(!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
@@ -528,7 +527,7 @@ public class PopReviveService extends ServiceThread {
// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
- CompletableFuture<Pair<Long, Boolean>> future =
getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(),
popCheckPoint.getBrokerName())
+ CompletableFuture<Pair<Long, Boolean>> future =
getBizMessage(popCheckPoint, msgOffset)
.thenApply(rst -> {
MessageExt message = rst.getLeft();
if (message == null) {
@@ -568,9 +567,9 @@ public class PopReviveService extends ServiceThread {
private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
int rePutTimes = oldCK.parseRePutTimes();
- if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
- POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {},
{}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
- oldCK.getBrokerName(), oldCK.getQueueId(),
pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
+ if (rePutTimes >= ckRewriteIntervalsInSeconds.length &&
brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) {
+ POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {},
{}-{}, {}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
+ oldCK.getBrokerName(), oldCK.getQueueId(),
pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime(), rePutTimes);
return;
}
@@ -588,7 +587,8 @@ public class PopReviveService extends ServiceThread {
newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always
increment even if removed from reviveRequestMap
if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
// never expect an ACK matched in the future, we just use it to
rewrite CK and try to revive retry message next time
- newCk.setInvisibleTime(oldCK.getInvisibleTime() +
ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
+ int intervalIndex = rePutTimes >=
ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 :
rePutTimes;
+ newCk.setInvisibleTime(oldCK.getInvisibleTime() +
ckRewriteIntervalsInSeconds[intervalIndex] * 1000);
}
MessageExtBrokerInner ckMsg =
brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
brokerController.getMessageStore().putMessage(ckMsg);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
index 7ea06665c3..27fc37dbec 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java
@@ -30,19 +30,23 @@ import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
@@ -58,6 +62,9 @@ import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class EscapeBridgeTest {
@@ -75,6 +82,9 @@ public class EscapeBridgeTest {
@Mock
private DefaultMessageStore defaultMessageStore;
+ @Mock
+ private TieredMessageStore tieredMessageStore;
+
private GetMessageResult getMessageResult;
@Mock
@@ -200,14 +210,37 @@ public class EscapeBridgeTest {
}
@Test
- public void getMessageAsyncTest_localStore_decodeNothing() throws
Exception {
+ public void
getMessageAsyncTest_localStore_decodeNothing_DefaultMessageStore() throws
Exception {
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
- when(defaultMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(), any()))
-
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0,
TEST_TOPIC, null)));
- Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
- Assert.assertNull(rst.getLeft());
- Assert.assertEquals("Can not get msg", rst.getMiddle());
- Assert.assertFalse(rst.getRight()); // no retry
+ for (GetMessageStatus status : GetMessageStatus.values()) {
+ GetMessageResult getMessageResult = mockGetMessageResult(0,
TEST_TOPIC, null);
+ getMessageResult.setStatus(status);
+ when(defaultMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(), any()))
+
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("Can not get msg", rst.getMiddle());
+ Assert.assertFalse(rst.getRight()); // DefaultMessageStore, no
retry
+ }
+ }
+
+ @Test
+ public void
getMessageAsyncTest_localStore_decodeNothing_TieredMessageStore() throws
Exception {
+
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(tieredMessageStore);
+ for (GetMessageStatus status : GetMessageStatus.values()) {
+ GetMessageResult getMessageResult = new GetMessageResult();
+ getMessageResult.setStatus(status);
+ when(tieredMessageStore.getMessageAsync(anyString(), anyString(),
anyInt(), anyLong(), anyInt(), any()))
+
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
+ Triple<MessageExt, String, Boolean> rst =
escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME,
false).join();
+ Assert.assertNull(rst.getLeft());
+ Assert.assertEquals("Can not get msg", rst.getMiddle());
+ if (GetMessageStatus.OFFSET_FOUND_NULL.equals(status)) {
+ Assert.assertTrue(rst.getRight()); // TieredMessageStore
returns OFFSET_FOUND_NULL, need retry
+ } else {
+ Assert.assertFalse(rst.getRight()); // other status, like
DefaultMessageStore, no retry
+ }
+ }
}
@Test
@@ -320,6 +353,57 @@ public class EscapeBridgeTest {
Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
}
+ @Test
+ public void
testPutMessageToRemoteBroker_noSpecificBrokerName_hasRemoteBroker() throws
Exception {
+ MessageExtBrokerInner message = new MessageExtBrokerInner();
+ message.setTopic(TEST_TOPIC);
+ String anotherBrokerName = "broker_b";
+ TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME,
anotherBrokerName);
+
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
+ escapeBridge.putMessageToRemoteBroker(message, null);
+ verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"),
eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(),
anyLong());
+ }
+
+ @Test
+ public void
testPutMessageToRemoteBroker_noSpecificBrokerName_noRemoteBroker() throws
Exception {
+ MessageExtBrokerInner message = new MessageExtBrokerInner();
+ message.setTopic(TEST_TOPIC);
+ TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
+
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+ escapeBridge.putMessageToRemoteBroker(message, null);
+ verify(topicRouteInfoManager,
times(0)).findBrokerAddressInPublish(anyString());
+ }
+
+ @Test
+ public void testPutMessageToRemoteBroker_specificBrokerName_equals()
throws Exception {
+ escapeBridge.putMessageToRemoteBroker(new MessageExtBrokerInner(),
BROKER_NAME);
+ verify(topicRouteInfoManager,
times(0)).tryToFindTopicPublishInfo(anyString());
+ }
+
+ @Test
+ public void
testPutMessageToRemoteBroker_specificBrokerName_addressNotFound() throws
Exception {
+ MessageExtBrokerInner message = new MessageExtBrokerInner();
+ message.setTopic(TEST_TOPIC);
+ TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
+
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+ escapeBridge.putMessageToRemoteBroker(message, "whatever");
+
verify(topicRouteInfoManager).findBrokerAddressInPublish(eq("whatever"));
+ verify(brokerOuterAPI,
times(0)).sendMessageToSpecificBroker(anyString(), anyString(),
any(MessageExtBrokerInner.class), anyString(), anyLong());
+ }
+
+ @Test
+ public void testPutMessageToRemoteBroker_specificBrokerName_addressFound()
throws Exception {
+ MessageExtBrokerInner message = new MessageExtBrokerInner();
+ message.setTopic(TEST_TOPIC);
+ String anotherBrokerName = "broker_b";
+ TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME,
anotherBrokerName);
+
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
+
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
+ escapeBridge.putMessageToRemoteBroker(message, anotherBrokerName);
+ verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"),
eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(),
anyLong());
+ }
+
private GetMessageResult mockGetMessageResult(int count, String topic,
byte[] body) throws Exception {
GetMessageResult result = new GetMessageResult();
for (int i = 0; i < count; i++) {
@@ -337,4 +421,12 @@ public class EscapeBridgeTest {
return result;
}
+ private TopicPublishInfo mockTopicPublishInfo(String... brokerNames) {
+ TopicPublishInfo topicPublishInfo = new TopicPublishInfo();
+ for (String brokerName : brokerNames) {
+ topicPublishInfo.getMessageQueueList().add(new
MessageQueue(TEST_TOPIC, brokerName, 0));
+ }
+ return topicPublishInfo;
+ }
+
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index d7ea97c550..3010e83610 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -104,7 +104,6 @@ public class PopReviveServiceTest {
brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(CLUSTER_NAME);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
- when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
@@ -285,6 +284,7 @@ public class PopReviveServiceTest {
@Test
public void
testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws
Throwable {
+ brokerConfig.setSkipWhenCKRePutReachMaxTimes(true);
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setRePutTimes("17");
PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
@@ -306,6 +306,30 @@ public class PopReviveServiceTest {
verify(messageStore,
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}
+ @Test
+ public void
testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_noEnd() throws
Throwable {
+ brokerConfig.setSkipWhenCKRePutReachMaxTimes(false);
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ ck.setRePutTimes(Byte.MAX_VALUE + "");
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+ StringBuilder actualRetryTopic = new StringBuilder();
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(new
MessageExt(), "", false)));
+
when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation
-> {
+ MessageExtBrokerInner msg = invocation.getArgument(0);
+ actualRetryTopic.append(msg.getTopic());
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED));
+ });
+
+ popReviveService.mergeAndRevive(reviveObj);
+ Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP,
false), actualRetryTopic.toString());
+ verify(escapeBridge,
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
@Test
public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable
{
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
@@ -349,6 +373,7 @@ public class PopReviveServiceTest {
@Test
public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws
Throwable {
+ brokerConfig.setSkipWhenCKRePutReachMaxTimes(true);
PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
ck.setRePutTimes("17");
PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
@@ -363,6 +388,23 @@ public class PopReviveServiceTest {
verify(messageStore,
times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}
+ @Test
+ public void testReviveMsgFromCk_messageNotFound_needRetry_noEnd() throws
Throwable {
+ brokerConfig.setSkipWhenCKRePutReachMaxTimes(false);
+ PopCheckPoint ck = buildPopCheckPoint(0, 0, 0);
+ ck.setRePutTimes(Byte.MAX_VALUE + "");
+ PopReviveService.ConsumeReviveObj reviveObj = new
PopReviveService.ConsumeReviveObj();
+ reviveObj.map.put("", ck);
+ reviveObj.endTime = System.currentTimeMillis();
+
+ when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(),
anyString(), anyBoolean()))
+ .thenReturn(CompletableFuture.completedFuture(Triple.of(null,
"", true)));
+
+ popReviveService.mergeAndRevive(reviveObj);
+ verify(escapeBridge,
times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write
retry
+ verify(messageStore,
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
+ }
+
public static PopCheckPoint buildPopCheckPoint(long startOffset, long
popTime, long reviveOffset) {
PopCheckPoint ck = new PopCheckPoint();
ck.setStartOffset(startOffset);
@@ -386,6 +428,7 @@ public class PopReviveServiceTest {
ackMsg.setTopic(TOPIC);
ackMsg.setQueueId(0);
ackMsg.setPopTime(popTime);
+ ackMsg.setBrokerName("broker-a");
return ackMsg;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 8982e59d03..10bf7f76e8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -419,6 +419,9 @@ public class BrokerConfig extends BrokerIdentity {
*/
private String configBlackList = "configBlackList;brokerConfigPath";
+ // if false, will still rewrite ck after max times 17
+ private boolean skipWhenCKRePutReachMaxTimes = false;
+
public String getConfigBlackList() {
return configBlackList;
}
@@ -1826,4 +1829,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setEnablePopMessageThreshold(boolean
enablePopMessageThreshold) {
this.enablePopMessageThreshold = enablePopMessageThreshold;
}
+
+ public boolean isSkipWhenCKRePutReachMaxTimes() {
+ return skipWhenCKRePutReachMaxTimes;
+ }
+
+ public void setSkipWhenCKRePutReachMaxTimes(boolean
skipWhenCKRePutReachMaxTimes) {
+ this.skipWhenCKRePutReachMaxTimes = skipWhenCKRePutReachMaxTimes;
+ }
}