duhenglucky commented on a change in pull request #633: [ISSUE #598] Enhance 
transaction by putting messages that exceed max check times to system topic
URL: https://github.com/apache/rocketmq/pull/633#discussion_r264072346
 
 

 ##########
 File path: 
broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java
 ##########
 @@ -17,20 +17,79 @@
 package org.apache.rocketmq.broker.transaction.queue;
 
 import 
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+
+public class DefaultTransactionalMessageCheckListener extends
+        AbstractTransactionalMessageCheckListener {
+
+    private static final InternalLogger log = InternalLoggerFactory
+            .getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
 
-public class DefaultTransactionalMessageCheckListener extends 
AbstractTransactionalMessageCheckListener {
-    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
 
     public DefaultTransactionalMessageCheckListener() {
         super();
     }
 
     @Override
     public void resolveDiscardMsg(MessageExt msgExt) {
-        log.error("MsgExt:{} has been checked too many times, so discard it", 
msgExt);
+        log.error(
+                "MsgExt:{} has been checked too many times, so discard it by 
moving it to system topic TRANS_CHECK_MAXTIME_TOPIC",
+                msgExt);
+
+        try {
+            MessageExtBrokerInner brokerInner = 
toMessageExtBrokerInner(msgExt);
+            PutMessageResult putMessageResult = 
this.getBrokerController().getMessageStore()
+                    .putMessage(brokerInner);
+            if (putMessageResult != null
+                    && putMessageResult.getPutMessageStatus() == 
PutMessageStatus.PUT_OK) {
+                log.info(
+                        "Put checked-too-many-time half message to 
TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, "
+                                + "commitLogOffset={}, real topic={}",
+                        msgExt.getQueueOffset(), msgExt.getCommitLogOffset(),
+                        msgExt.getUserProperty(
+                                MessageConst.PROPERTY_REAL_TOPIC));
+            } else {
+                log.error(
+                        "Put checked-too-many-time half message to 
TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}",
+                        msgExt.getTopic(), msgExt.getMsgId());
+            }
+        } catch (Exception e) {
+            log.warn("Put checked-too-many-time message to 
TRANS_CHECK_MAXTIME_TOPIC error. {}", e);
+        }
+
+    }
+
+    private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
+        TopicConfig topicConfig = 
this.getBrokerController().getTopicConfigManager()
+                .createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS,
+                        PermName.PERM_READ | PermName.PERM_WRITE);
+        int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS;
 
 Review comment:
   Yes, as @zongtanghu comment, it has the difference with sendBackMessage 
method, so if only one queue in this inner topic, please use a constant instead.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to