duhenglucky commented on a change in pull request #633: [ISSUE #598] Enhance
transaction by putting messages that exceed max check times to system topic
URL: https://github.com/apache/rocketmq/pull/633#discussion_r264072346
##########
File path:
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
##########
@@ -17,20 +17,79 @@
package org.apache.rocketmq.broker.transaction.queue;
import
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+
+public class DefaultTransactionalMessageCheckListener extends
+ AbstractTransactionalMessageCheckListener {
+
+ private static final InternalLogger log = InternalLoggerFactory
+ .getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
-public class DefaultTransactionalMessageCheckListener extends
AbstractTransactionalMessageCheckListener {
- private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
public DefaultTransactionalMessageCheckListener() {
super();
}
@Override
public void resolveDiscardMsg(MessageExt msgExt) {
- log.error("MsgExt:{} has been checked too many times, so discard it",
msgExt);
+ log.error(
+ "MsgExt:{} has been checked too many times, so discard it by
moving it to system topic TRANS_CHECK_MAXTIME_TOPIC",
+ msgExt);
+
+ try {
+ MessageExtBrokerInner brokerInner =
toMessageExtBrokerInner(msgExt);
+ PutMessageResult putMessageResult =
this.getBrokerController().getMessageStore()
+ .putMessage(brokerInner);
+ if (putMessageResult != null
+ && putMessageResult.getPutMessageStatus() ==
PutMessageStatus.PUT_OK) {
+ log.info(
+ "Put checked-too-many-time half message to
TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, "
+ + "commitLogOffset={}, real topic={}",
+ msgExt.getQueueOffset(), msgExt.getCommitLogOffset(),
+ msgExt.getUserProperty(
+ MessageConst.PROPERTY_REAL_TOPIC));
+ } else {
+ log.error(
+ "Put checked-too-many-time half message to
TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}",
+ msgExt.getTopic(), msgExt.getMsgId());
+ }
+ } catch (Exception e) {
+ log.warn("Put checked-too-many-time message to
TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
+ }
+
+ }
+
+ private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+ TopicConfig topicConfig =
this.getBrokerController().getTopicConfigManager()
+ .createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS,
+ PermName.PERM_READ | PermName.PERM_WRITE);
+ int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
Review comment:
Yes, as @zongtanghu comment, it has the difference with sendBackMessage
method, so if only one queue in this inner topic, please use a constant instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services