RongtongJin commented on code in PR #10519:
URL: https://github.com/apache/rocketmq/pull/10519#discussion_r3433571241


##########
proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java:
##########
@@ -404,6 +434,150 @@ protected CompletableFuture<List<BatchAckResult>> 
processBrokerHandle(ProxyConte
             });
     }
 
+    public CompletableFuture<List<BatchChangeInvisibleTimeResult>> 
batchChangeInvisibleTime(
+        ProxyContext ctx,
+        List<ReceiptHandleMessage> handleMessageList,
+        String consumerGroup,
+        String topic,
+        long invisibleTime,
+        long timeoutMillis,
+        boolean suspend
+    ) {
+        CompletableFuture<List<BatchChangeInvisibleTimeResult>> future = new 
CompletableFuture<>();
+        try {
+            List<BatchChangeInvisibleTimeResult> batchResultList = new 
ArrayList<>(handleMessageList.size());
+            Map<String, List<ReceiptHandleMessage>> brokerHandleListMap = new 
HashMap<>();
+
+            for (ReceiptHandleMessage handleMessage : handleMessageList) {
+                if (handleMessage.getReceiptHandle().isExpired()) {
+                    batchResultList.add(new 
BatchChangeInvisibleTimeResult(handleMessage, EXPIRED_HANDLE_PROXY_EXCEPTION));
+                    continue;
+                }
+                ReceiptHandle handle = handleMessage.getReceiptHandle();
+                String realTopic = handle.getRealTopic(topic, consumerGroup);
+                String batchKey = handle.getBrokerName() + "@" + realTopic;
+                brokerHandleListMap.computeIfAbsent(batchKey, key -> new 
ArrayList<>())
+                    .add(handleMessage);
+            }
+
+            if (brokerHandleListMap.isEmpty()) {
+                return 
FutureUtils.addExecutor(CompletableFuture.completedFuture(batchResultList), 
this.executor);
+            }
+
+            CompletableFuture<List<BatchChangeInvisibleTimeResult>>[] futures 
= new CompletableFuture[brokerHandleListMap.size()];
+            int futureIndex = 0;
+            for (List<ReceiptHandleMessage> brokerHandleList : 
brokerHandleListMap.values()) {
+                futures[futureIndex++] = processBrokerChangeInvisibleTime(
+                    ctx, consumerGroup, topic, brokerHandleList, 
invisibleTime, timeoutMillis, suspend);
+            }
+            CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                    return;
+                }
+                for (CompletableFuture<List<BatchChangeInvisibleTimeResult>> 
resultFuture : futures) {
+                    batchResultList.addAll(resultFuture.join());

Review Comment:
   This changes the result ordering contract. `batchResultList` is built from 
expired handles first, then appends each broker/topic group using 
`HashMap.values()` iteration order. However, 
`ReceiptHandleProcessor.batchChangeInvisibleTime` consumes the returned list by 
index and completes `futureList.get(indexes.get(i))` with `results.get(i)`. If 
one renewal batch contains handles for multiple brokers, the result list can 
differ from the original `handleMessageList` order, causing a message to 
receive another message's renewed receipt handle. Please either preserve the 
original input order when building the result list, or have the caller map 
results back by `BatchChangeInvisibleTimeResult.getReceiptHandleMessage()` 
instead of by index.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to