This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 720c87e85d [ISSUE #8584] fix missing brokerName in sendMessageBack
request (#8585)
720c87e85d is described below
commit 720c87e85dd95be8e56166a7f00c652ce0f1d458
Author: yuz10 <[email protected]>
AuthorDate: Fri Aug 30 14:53:51 2024 +0800
[ISSUE #8584] fix missing brokerName in sendMessageBack request (#8585)
* fix missing brokerName in sendMessageBack request
* fix
---
.../org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java | 2 +-
.../org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java | 2 +-
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java | 3 ++-
4 files changed, 5 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 089fd39b3e..7c9a65ecdb 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -262,7 +262,7 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
+ this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel,
msg.getBrokerName());
}
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 94785c6970..5df5cc8fa1 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -688,7 +688,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
msg.setTopic(withNamespace(msg.getTopic()));
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel,
(String) null);
+ this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel,
msg.getBrokerName());
}
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 0fef8666cb..c92cadf505 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -752,7 +752,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
public void sendMessageBack(MessageExt msg, int delayLevel, final
MessageQueue mq)
throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
- sendMessageBack(msg, delayLevel, null, mq);
+ sendMessageBack(msg, delayLevel, msg.getBrokerName(), mq);
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
index 68563c0256..2bc9c5a18d 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -651,10 +651,11 @@ public class DefaultMQPushConsumerImplTest {
@Test
public void testSendMessageBack() throws InterruptedException,
MQClientException, MQBrokerException, RemotingException {
+
when(mQClientFactory.findBrokerAddressInPublish(anyString())).thenReturn(defaultBrokerAddr);
defaultMQPushConsumerImpl.sendMessageBack(createMessageExt(), 1,
createMessageQueue());
verify(mqClientAPIImpl).consumerSendMessageBack(
eq(defaultBrokerAddr),
- any(),
+ eq(defaultBroker),
any(MessageExt.class),
any(),
eq(1),