This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch feature/bname-5 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 12bb80ddae55ec3f30fe9636c47596e2d4c8f993 Author: zhouxiang <[email protected]> AuthorDate: Mon Oct 17 15:12:34 2022 +0800 Support bname in protocol for 5.0 client * add bname for `CheckTransactionStateRequestHeader`, `ConsumerSendMsgBackRequestHeader`, `EndTransactionRequestHeader`, `SendMessageRequestHeader` --- .../AbstractTransactionalMessageCheckListener.java | 1 + .../java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java | 2 ++ .../client/impl/consumer/DefaultMQPullConsumerImpl.java | 4 ++-- .../client/impl/consumer/DefaultMQPushConsumerImpl.java | 2 +- .../rocketmq/client/impl/producer/DefaultMQProducerImpl.java | 3 +++ .../protocol/header/CheckTransactionStateRequestHeader.java | 4 ++-- .../protocol/header/ConsumerSendMsgBackRequestHeader.java | 4 ++-- .../common/protocol/header/EndTransactionRequestHeader.java | 4 ++-- .../common/protocol/header/SendMessageRequestHeaderV2.java | 11 +++++++++++ 9 files changed, 26 insertions(+), 9 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index beda6504c..6ed015b99 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -56,6 +56,7 @@ public abstract class AbstractTransactionalMessageCheckListener { checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); + checkTransactionStateRequestHeader.setBname(brokerController.getBrokerConfig().getBrokerName()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index b327ee28b..5f393cb57 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -1451,6 +1451,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { public void consumerSendMessageBack( final String addr, + final String brokerName, final MessageExt msg, final String consumerGroup, final int delayLevel, @@ -1466,6 +1467,7 @@ public class MQClientAPIImpl implements NameServerUpdateCallback { requestHeader.setDelayLevel(delayLevel); requestHeader.setOriginMsgId(msg.getMsgId()); requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); + requestHeader.setBname(brokerName); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 66f3578fe..96f31724e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -622,8 +622,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000, - this.defaultMQPullConsumer.getMaxReconsumeTimes()); + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, consumerGroup, + delayLevel, 3000, this.defaultMQPullConsumer.getMaxReconsumeTimes()); } catch (Exception e) { log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 7dc212dd1..e5121dc73 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -732,7 +732,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } else { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); - this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, + this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, brokerName, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } } catch (Exception e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d97266319..b40f536fd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -375,6 +375,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); + thisHeader.setBname(checkRequestHeader.getBname()); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { @@ -835,6 +836,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); + requestHeader.setBname(brokerName); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { @@ -1365,6 +1367,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); + requestHeader.setBname(destBrokerName); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java index d62802c06..6ef4099b0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java @@ -21,11 +21,11 @@ package org.apache.rocketmq.common.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class CheckTransactionStateRequestHeader implements CommandCustomHeader { +public class CheckTransactionStateRequestHeader extends RpcRequestHeader { @CFNotNull private Long tranStateTableOffset; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java index 3d65f2392..ee0416f52 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java @@ -18,12 +18,12 @@ package org.apache.rocketmq.common.protocol.header; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.remoting.CommandCustomHeader; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader { +public class ConsumerSendMsgBackRequestHeader extends RpcRequestHeader { @CFNotNull private Long offset; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java index 80fdc3d4a..eabc4bed6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java @@ -18,13 +18,13 @@ package org.apache.rocketmq.common.protocol.header; import com.google.common.base.MoreObjects; +import org.apache.rocketmq.common.rpc.RpcRequestHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; -import org.apache.rocketmq.remoting.CommandCustomHeader; import org.apache.rocketmq.remoting.annotation.CFNotNull; import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -public class EndTransactionRequestHeader implements CommandCustomHeader { +public class EndTransactionRequestHeader extends RpcRequestHeader { @CFNotNull private String producerGroup; @CFNotNull diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index f4771252e..1985f65f4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -59,6 +59,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode @CFNullable private boolean m; //batch + @CFNullable + private String n; // brokerName public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); @@ -75,6 +77,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); v1.setBatch(v2.m); + v1.setBname(v2.n); return v1; } @@ -93,6 +96,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); + v2.n = v1.getBname(); return v2; } @@ -115,6 +119,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode writeIfNotNull(out, "k", k); writeIfNotNull(out, "l", l); writeIfNotNull(out, "m", m); + writeIfNotNull(out, "n", n); } @Override @@ -184,6 +189,11 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode if (str != null) { m = Boolean.parseBoolean(str); } + + str = fields.get("n"); + if (str != null) { + n = str; + } } public String getA() { @@ -306,6 +316,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode .add("k", k) .add("l", l) .add("m", m) + .add("n", n) .toString(); } } \ No newline at end of file
