This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0b687a9dd8 [ISSUE #8997] Ensure there is an opportunity to send a
retry message when broker no response (#9137)
0b687a9dd8 is described below
commit 0b687a9dd81075c8d811f1c49fdd1c59502db1c9
Author: gaoyf <[email protected]>
AuthorDate: Mon Mar 10 16:03:10 2025 +0800
[ISSUE #8997] Ensure there is an opportunity to send a retry message when
broker no response (#9137)
---
.../client/impl/producer/DefaultMQProducerImpl.java | 12 ++++++++++--
.../apache/rocketmq/client/producer/DefaultMQProducer.java | 13 +++++++++++++
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 15264f0e50..4aa605821f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -777,8 +777,16 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
callTimeout = true;
break;
}
-
- sendResult = this.sendKernelImpl(msg, mq,
communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
+ long curTimeout = timeout - costTime;
+ // Get the maximum timeout allowed per request
+ long maxSendTimeoutPerRequest =
defaultMQProducer.getSendMsgMaxTimeoutPerRequest();
+ // Determine if retries are still possible
+ boolean canRetryAgain = times + 1 < timesTotal;
+ // If retries are possible, and the current timeout
exceeds the max allowed timeout, set the current timeout to the max allowed
+ if (maxSendTimeoutPerRequest > -1 && canRetryAgain &&
curTimeout > maxSendTimeoutPerRequest) {
+ curTimeout = maxSendTimeoutPerRequest;
+ }
+ sendResult = this.sendKernelImpl(msg, mq,
communicationMode, sendCallback, topicPublishInfo, curTimeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp
- beginTimestampPrev, false, true);
switch (communicationMode) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index e3f81ad968..11edcaa441 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -115,6 +115,11 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
*/
private int sendMsgTimeout = 3000;
+ /**
+ * Max timeout for sending messages per request.
+ */
+ private int sendMsgMaxTimeoutPerRequest = -1;
+
/**
* Compress message body threshold, namely, message body larger than 4k
will be compressed on default.
*/
@@ -1259,6 +1264,14 @@ public class DefaultMQProducer extends ClientConfig
implements MQProducer {
this.sendMsgTimeout = sendMsgTimeout;
}
+ public int getSendMsgMaxTimeoutPerRequest() {
+ return sendMsgMaxTimeoutPerRequest;
+ }
+
+ public void setSendMsgMaxTimeoutPerRequest(int
sendMsgMaxTimeoutPerRequest) {
+ this.sendMsgMaxTimeoutPerRequest = sendMsgMaxTimeoutPerRequest;
+ }
+
public int getCompressMsgBodyOverHowmuch() {
return compressMsgBodyOverHowmuch;
}