RongtongJin commented on code in PR #10519:
URL: https://github.com/apache/rocketmq/pull/10519#discussion_r3433571241
##########
proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java:
##########
@@ -404,6 +434,150 @@ protected CompletableFuture<List<BatchAckResult>>
processBrokerHandle(ProxyConte
});
}
+ public CompletableFuture<List<BatchChangeInvisibleTimeResult>>
batchChangeInvisibleTime(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic,
+ long invisibleTime,
+ long timeoutMillis,
+ boolean suspend
+ ) {
+ CompletableFuture<List<BatchChangeInvisibleTimeResult>> future = new
CompletableFuture<>();
+ try {
+ List<BatchChangeInvisibleTimeResult> batchResultList = new
ArrayList<>(handleMessageList.size());
+ Map<String, List<ReceiptHandleMessage>> brokerHandleListMap = new
HashMap<>();
+
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ if (handleMessage.getReceiptHandle().isExpired()) {
+ batchResultList.add(new
BatchChangeInvisibleTimeResult(handleMessage, EXPIRED_HANDLE_PROXY_EXCEPTION));
+ continue;
+ }
+ ReceiptHandle handle = handleMessage.getReceiptHandle();
+ String realTopic = handle.getRealTopic(topic, consumerGroup);
+ String batchKey = handle.getBrokerName() + "@" + realTopic;
+ brokerHandleListMap.computeIfAbsent(batchKey, key -> new
ArrayList<>())
+ .add(handleMessage);
+ }
+
+ if (brokerHandleListMap.isEmpty()) {
+ return
FutureUtils.addExecutor(CompletableFuture.completedFuture(batchResultList),
this.executor);
+ }
+
+ CompletableFuture<List<BatchChangeInvisibleTimeResult>>[] futures
= new CompletableFuture[brokerHandleListMap.size()];
+ int futureIndex = 0;
+ for (List<ReceiptHandleMessage> brokerHandleList :
brokerHandleListMap.values()) {
+ futures[futureIndex++] = processBrokerChangeInvisibleTime(
+ ctx, consumerGroup, topic, brokerHandleList,
invisibleTime, timeoutMillis, suspend);
+ }
+ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+ for (CompletableFuture<List<BatchChangeInvisibleTimeResult>>
resultFuture : futures) {
+ batchResultList.addAll(resultFuture.join());
Review Comment:
This changes the result ordering contract. `batchResultList` is built from
expired handles first, then appends each broker/topic group using
`HashMap.values()` iteration order. However,
`ReceiptHandleProcessor.batchChangeInvisibleTime` consumes the returned list by
index and completes `futureList.get(indexes.get(i))` with `results.get(i)`. If
one renewal batch contains handles for multiple brokers, the result list can
differ from the original `handleMessageList` order, causing a message to
receive another message's renewed receipt handle. Please either preserve the
original input order when building the result list, or have the caller map
results back by `BatchChangeInvisibleTimeResult.getReceiptHandleMessage()`
instead of by index.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]