This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 04683ec058 [ISSUE 7117] check message is in memory or not when init
consumer offset for pop (#7118)
04683ec058 is described below
commit 04683ec05808d63f742f8702a9bd3a2fb846c154
Author: lk <[email protected]>
AuthorDate: Wed Aug 9 19:08:33 2023 +0800
[ISSUE 7117] check message is in memory or not when init consumer offset
for pop (#7118)
---
.../broker/processor/AckMessageProcessor.java | 1 -
.../broker/processor/PopMessageProcessor.java | 40 ++++++++++++++--------
.../org/apache/rocketmq/common/BrokerConfig.java | 9 +++++
.../proxy/service/route/TopicRouteService.java | 2 +-
4 files changed, 36 insertions(+), 16 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 2140aa881c..687811409e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -308,7 +308,6 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
&& putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
- System.out.printf("put ack to store %s", ackMsg);
PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 53e1725614..441f7de08a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -639,20 +639,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic,
queueId);
if (offset < 0) {
- if (ConsumeInitMode.MIN == initMode) {
- offset =
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
- } else {
- // pop last one,then commit offset.
- offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
- // max & no consumer offset
- if (offset < 0) {
- offset = 0;
- }
- if (init) {
-
this.brokerController.getConsumerOffsetManager().commitOffset(
- "getPopOffset", group, topic, queueId, offset);
- }
- }
+ offset = this.getInitOffset(topic, group, queueId, initMode, init);
}
if (checkResetOffset) {
@@ -670,6 +657,31 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
}
}
+ private long getInitOffset(String topic, String group, int queueId, int
initMode, boolean init) {
+ long offset;
+ if (ConsumeInitMode.MIN == initMode) {
+ return
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+ } else {
+ if
(this.brokerController.getBrokerConfig().isInitPopOffsetByCheckMsgInMem() &&
+
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId) <=
0 &&
+
this.brokerController.getMessageStore().checkInMemByConsumeOffset(topic,
queueId, 0, 1)) {
+ offset = 0;
+ } else {
+ // pop last one,then commit offset.
+ offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - 1;
+ // max & no consumer offset
+ if (offset < 0) {
+ offset = 0;
+ }
+ }
+ if (init) {
+ this.brokerController.getConsumerOffsetManager().commitOffset(
+ "getPopOffset", group, topic, queueId, offset);
+ }
+ }
+ return offset;
+ }
+
public final MessageExtBrokerInner buildCkMsg(final PopCheckPoint ck,
final int reviveQid) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 02c692e2b2..a815636b18 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -222,6 +222,7 @@ public class BrokerConfig extends BrokerIdentity {
private int popCkOffsetMaxQueueSize = 20000;
private boolean enablePopBatchAck = false;
private boolean enableNotifyAfterPopOrderLockRelease = true;
+ private boolean initPopOffsetByCheckMsgInMem = true;
private boolean realTimeNotifyConsumerChange = true;
@@ -1264,6 +1265,14 @@ public class BrokerConfig extends BrokerIdentity {
this.enableNotifyAfterPopOrderLockRelease =
enableNotifyAfterPopOrderLockRelease;
}
+ public boolean isInitPopOffsetByCheckMsgInMem() {
+ return initPopOffsetByCheckMsgInMem;
+ }
+
+ public void setInitPopOffsetByCheckMsgInMem(boolean
initPopOffsetByCheckMsgInMem) {
+ this.initPopOffsetByCheckMsgInMem = initPopOffsetByCheckMsgInMem;
+ }
+
public boolean isRealTimeNotifyConsumerChange() {
return realTimeNotifyConsumerChange;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index b6b14faa49..e012a5465a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -133,7 +133,7 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
protected MessageQueueView buildMessageQueueView(String topic,
TopicRouteData topicRouteData) {
if (isTopicRouteValid(topicRouteData)) {
MessageQueueView tmp = new MessageQueueView(topic, topicRouteData);
- log.info("load topic route from namesrv. topic: {}, queue: {}",
topic, tmp);
+ log.debug("load topic route from namesrv. topic: {}, queue: {}",
topic, tmp);
return tmp;
}
return MessageQueueView.WRAPPED_EMPTY_QUEUE;