This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6f5b551b88 [ISSUE #9918] Fix the message may be renewed once more if
the gRPC push consumer is unexpectedly disconnected (#9919)
6f5b551b88 is described below
commit 6f5b551b88dc2fb74f3baa53c0b490e9d19405f5
Author: qianye <[email protected]>
AuthorDate: Fri Dec 12 16:08:46 2025 +0800
[ISSUE #9918] Fix the message may be renewed once more if the gRPC push
consumer is unexpectedly disconnected (#9919)
Change-Id: I00424612c5ee9dd30e5c155dab17262051e5f097
---
.../grpc/v2/consumer/ReceiveMessageActivity.java | 23 ++++++++++++----------
.../ReceiveMessageResponseStreamWriter.java | 7 +++++++
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 50b6d924fd..fc8f714816 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -135,6 +135,7 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
request.hasAttemptId() ? request.getAttemptId() : null,
timeRemaining
).thenAccept(popResult -> {
+ Runnable doAfterWrite = null;
if (proxyConfig.isEnableProxyAutoRenew() &&
request.getAutoRenew()) {
if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
GrpcClientChannel clientChannel =
grpcChannelManager.getChannel(ctx.getClientID());
@@ -145,19 +146,21 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
writer.processThrowableWhenWriteMessage(e,
ctx, request, messageExt));
throw e;
}
- List<MessageExt> messageExtList =
popResult.getMsgFoundList();
- for (MessageExt messageExt : messageExtList) {
- String receiptHandle =
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
- if (receiptHandle != null) {
- MessageReceiptHandle messageReceiptHandle =
- new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
- messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
- messagingProcessor.addReceiptHandle(ctx,
clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
+ doAfterWrite = () -> {
+ List<MessageExt> messageExtList =
popResult.getMsgFoundList();
+ for (MessageExt messageExt : messageExtList) {
+ String receiptHandle =
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
+ if (receiptHandle != null) {
+ MessageReceiptHandle
messageReceiptHandle =
+ new MessageReceiptHandle(group,
topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
+ messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
+
messagingProcessor.addReceiptHandle(ctx, clientChannel, group,
messageExt.getMsgId(), messageReceiptHandle);
+ }
}
- }
+ };
}
}
- writer.writeAndComplete(ctx, request, popResult);
+ writer.writeAndComplete(ctx, request, popResult,
doAfterWrite);
})
.exceptionally(t -> {
writer.writeAndComplete(ctx, request, t);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
index bdeffbbc8d..843c0edec1 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
@@ -54,6 +54,10 @@ public class ReceiveMessageResponseStreamWriter {
}
public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest
request, PopResult popResult) {
+ writeAndComplete(ctx, request, popResult, null);
+ }
+
+ public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest
request, PopResult popResult, Runnable doAfterWrite) {
PopStatus status = popResult.getPopStatus();
List<MessageExt> messageFoundList = popResult.getMsgFoundList();
try {
@@ -103,6 +107,9 @@ public class ReceiveMessageResponseStreamWriter {
.build());
break;
}
+ if (doAfterWrite != null) {
+ doAfterWrite.run();
+ }
} catch (Throwable t) {
writeResponseWithErrorIgnore(
ReceiveMessageResponse.newBuilder().setStatus(ResponseBuilder.getInstance().buildStatus(t)).build());