This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch feature/bname-5
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 12bb80ddae55ec3f30fe9636c47596e2d4c8f993
Author: zhouxiang <[email protected]>
AuthorDate: Mon Oct 17 15:12:34 2022 +0800

    Support bname in protocol for 5.0 client
    
    * add bname for `CheckTransactionStateRequestHeader`, 
`ConsumerSendMsgBackRequestHeader`, `EndTransactionRequestHeader`,
    `SendMessageRequestHeader`
---
 .../AbstractTransactionalMessageCheckListener.java            |  1 +
 .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java |  2 ++
 .../client/impl/consumer/DefaultMQPullConsumerImpl.java       |  4 ++--
 .../client/impl/consumer/DefaultMQPushConsumerImpl.java       |  2 +-
 .../rocketmq/client/impl/producer/DefaultMQProducerImpl.java  |  3 +++
 .../protocol/header/CheckTransactionStateRequestHeader.java   |  4 ++--
 .../protocol/header/ConsumerSendMsgBackRequestHeader.java     |  4 ++--
 .../common/protocol/header/EndTransactionRequestHeader.java   |  4 ++--
 .../common/protocol/header/SendMessageRequestHeaderV2.java    | 11 +++++++++++
 9 files changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index beda6504c..6ed015b99 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -56,6 +56,7 @@ public abstract class 
AbstractTransactionalMessageCheckListener {
         
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
         
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
         
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
+        
checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName());
         
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
         
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
         msgExt.setStoreSize(0);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b327ee28b..5f393cb57 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1451,6 +1451,7 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
 
     public void consumerSendMessageBack(
         final String addr,
+        final String brokerName,
         final MessageExt msg,
         final String consumerGroup,
         final int delayLevel,
@@ -1466,6 +1467,7 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         requestHeader.setDelayLevel(delayLevel);
         requestHeader.setOriginMsgId(msg.getMsgId());
         requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+        requestHeader.setBname(brokerName);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
             request, timeoutMillis);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 66f3578fe..96f31724e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -622,8 +622,8 @@ public class DefaultMQPullConsumerImpl implements 
MQConsumerInner {
                 consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
             }
 
-            
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, 
msg, consumerGroup, delayLevel, 3000,
-                this.defaultMQPullConsumer.getMaxReconsumeTimes());
+            
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, 
brokerName, msg, consumerGroup,
+                delayLevel, 3000, 
this.defaultMQPullConsumer.getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + 
this.defaultMQPullConsumer.getConsumerGroup(), e);
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 7dc212dd1..e5121dc73 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -732,7 +732,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
             } else {
                 String brokerAddr = (null != brokerName) ? 
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                         : 
RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-                
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, 
msg,
+                
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, 
brokerName, msg,
                         this.defaultMQPushConsumer.getConsumerGroup(), 
delayLevel, 5000, getMaxReconsumeTimes());
             }
         } catch (Exception e) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index d97266319..b40f536fd 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -375,6 +375,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 thisHeader.setProducerGroup(producerGroup);
                 
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                 thisHeader.setFromTransactionCheck(true);
+                thisHeader.setBname(checkRequestHeader.getBname());
 
                 String uniqueKey = 
message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                 if (uniqueKey == null) {
@@ -835,6 +836,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 requestHeader.setReconsumeTimes(0);
                 requestHeader.setUnitMode(this.isUnitMode());
                 requestHeader.setBatch(msg instanceof MessageBatch);
+                requestHeader.setBname(brokerName);
                 if 
(requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     String reconsumeTimes = 
MessageAccessor.getReconsumeTime(msg);
                     if (reconsumeTimes != null) {
@@ -1365,6 +1367,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
         EndTransactionRequestHeader requestHeader = new 
EndTransactionRequestHeader();
         requestHeader.setTransactionId(transactionId);
         requestHeader.setCommitLogOffset(id.getOffset());
+        requestHeader.setBname(destBrokerName);
         switch (localTransactionState) {
             case COMMIT_MESSAGE:
                 
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
index d62802c06..6ef4099b0 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -21,11 +21,11 @@
 package org.apache.rocketmq.common.protocol.header;
 
 import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class CheckTransactionStateRequestHeader implements CommandCustomHeader 
{
+public class CheckTransactionStateRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private Long tranStateTableOffset;
     @CFNotNull
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
index 3d65f2392..ee0416f52 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -18,12 +18,12 @@
 package org.apache.rocketmq.common.protocol.header;
 
 import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private Long offset;
     @CFNotNull
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
index 80fdc3d4a..eabc4bed6 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
@@ -18,13 +18,13 @@
 package org.apache.rocketmq.common.protocol.header;
 
 import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class EndTransactionRequestHeader implements CommandCustomHeader {
+public class EndTransactionRequestHeader extends RpcRequestHeader {
     @CFNotNull
     private String producerGroup;
     @CFNotNull
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index f4771252e..1985f65f4 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -59,6 +59,8 @@ public class SendMessageRequestHeaderV2 implements 
CommandCustomHeader, FastCode
 
     @CFNullable
     private boolean m; //batch
+    @CFNullable
+    private String n; // brokerName
 
     public static SendMessageRequestHeader 
createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
         SendMessageRequestHeader v1 = new SendMessageRequestHeader();
@@ -75,6 +77,7 @@ public class SendMessageRequestHeaderV2 implements 
CommandCustomHeader, FastCode
         v1.setUnitMode(v2.k);
         v1.setMaxReconsumeTimes(v2.l);
         v1.setBatch(v2.m);
+        v1.setBname(v2.n);
         return v1;
     }
 
@@ -93,6 +96,7 @@ public class SendMessageRequestHeaderV2 implements 
CommandCustomHeader, FastCode
         v2.k = v1.isUnitMode();
         v2.l = v1.getMaxReconsumeTimes();
         v2.m = v1.isBatch();
+        v2.n = v1.getBname();
         return v2;
     }
 
@@ -115,6 +119,7 @@ public class SendMessageRequestHeaderV2 implements 
CommandCustomHeader, FastCode
         writeIfNotNull(out, "k", k);
         writeIfNotNull(out, "l", l);
         writeIfNotNull(out, "m", m);
+        writeIfNotNull(out, "n", n);
     }
 
     @Override
@@ -184,6 +189,11 @@ public class SendMessageRequestHeaderV2 implements 
CommandCustomHeader, FastCode
         if (str != null) {
             m = Boolean.parseBoolean(str);
         }
+
+        str = fields.get("n");
+        if (str != null) {
+            n = str;
+        }
     }
 
     public String getA() {
@@ -306,6 +316,7 @@ public class SendMessageRequestHeaderV2 implements 
CommandCustomHeader, FastCode
             .add("k", k)
             .add("l", l)
             .add("m", m)
+            .add("n", n)
             .toString();
     }
 }
\ No newline at end of file

Reply via email to