This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6f5b551b88 [ISSUE #9918] Fix the message may be renewed once more if 
the gRPC push consumer is unexpectedly disconnected (#9919)
6f5b551b88 is described below

commit 6f5b551b88dc2fb74f3baa53c0b490e9d19405f5
Author: qianye <[email protected]>
AuthorDate: Fri Dec 12 16:08:46 2025 +0800

    [ISSUE #9918] Fix the message may be renewed once more if the gRPC push 
consumer is unexpectedly disconnected (#9919)
    
    Change-Id: I00424612c5ee9dd30e5c155dab17262051e5f097
---
 .../grpc/v2/consumer/ReceiveMessageActivity.java   | 23 ++++++++++++----------
 .../ReceiveMessageResponseStreamWriter.java        |  7 +++++++
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 50b6d924fd..fc8f714816 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -135,6 +135,7 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
                     request.hasAttemptId() ? request.getAttemptId() : null,
                     timeRemaining
                 ).thenAccept(popResult -> {
+                    Runnable doAfterWrite = null;
                     if (proxyConfig.isEnableProxyAutoRenew() && 
request.getAutoRenew()) {
                         if (PopStatus.FOUND.equals(popResult.getPopStatus())) {
                             GrpcClientChannel clientChannel = 
grpcChannelManager.getChannel(ctx.getClientID());
@@ -145,19 +146,21 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
                                     writer.processThrowableWhenWriteMessage(e, 
ctx, request, messageExt));
                                 throw e;
                             }
-                            List<MessageExt> messageExtList = 
popResult.getMsgFoundList();
-                            for (MessageExt messageExt : messageExtList) {
-                                String receiptHandle = 
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
-                                if (receiptHandle != null) {
-                                    MessageReceiptHandle messageReceiptHandle =
-                                        new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
-                                            messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
-                                    messagingProcessor.addReceiptHandle(ctx, 
clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
+                            doAfterWrite = () -> {
+                                List<MessageExt> messageExtList = 
popResult.getMsgFoundList();
+                                for (MessageExt messageExt : messageExtList) {
+                                    String receiptHandle = 
messageExt.getProperty(MessageConst.PROPERTY_POP_CK);
+                                    if (receiptHandle != null) {
+                                        MessageReceiptHandle 
messageReceiptHandle =
+                                            new MessageReceiptHandle(group, 
topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
+                                                messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
+                                        
messagingProcessor.addReceiptHandle(ctx, clientChannel, group, 
messageExt.getMsgId(), messageReceiptHandle);
+                                    }
                                 }
-                            }
+                            };
                         }
                     }
-                    writer.writeAndComplete(ctx, request, popResult);
+                    writer.writeAndComplete(ctx, request, popResult, 
doAfterWrite);
                 })
                 .exceptionally(t -> {
                     writer.writeAndComplete(ctx, request, t);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
index bdeffbbc8d..843c0edec1 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageResponseStreamWriter.java
@@ -54,6 +54,10 @@ public class ReceiveMessageResponseStreamWriter {
     }
 
     public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest 
request, PopResult popResult) {
+        writeAndComplete(ctx, request, popResult, null);
+    }
+
+    public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest 
request, PopResult popResult, Runnable doAfterWrite) {
         PopStatus status = popResult.getPopStatus();
         List<MessageExt> messageFoundList = popResult.getMsgFoundList();
         try {
@@ -103,6 +107,9 @@ public class ReceiveMessageResponseStreamWriter {
                         .build());
                     break;
             }
+            if (doAfterWrite != null) {
+                doAfterWrite.run();
+            }
         } catch (Throwable t) {
             writeResponseWithErrorIgnore(
                 
ReceiveMessageResponse.newBuilder().setStatus(ResponseBuilder.getInstance().buildStatus(t)).build());

Reply via email to