redlsz opened a new issue, #9253:
URL: https://github.com/apache/rocketmq/issues/9253
### Before Creating the Enhancement Request
- [x] I have confirmed that this should be classified as an enhancement
rather than a bug/feature.
### Summary
In proxy receiveMessage process, when adding message renew task fails due to
client disconnection, we can make the message visible earlier by calling
changeInvisibleTime, to reduce delay of the next consumption retry.
### Motivation
Reproduce code:
```
public static void main(String[] args) throws ClientException, IOException,
InterruptedException {
String topic = "topic";
String consumerGroup = "group";
String accessKey = "xxx";
String secretKey = "xxx";
String endpoints = "127.0.0.1:8080";
SessionCredentialsProvider sessionCredentialsProvider = new
StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
ClientServiceProvider provider = ClientServiceProvider.loadService();
PushConsumerBuilder pushConsumerBuilder =
provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, new
FilterExpression("*", FilterExpressionType.TAG)))
.setMessageListener(messageView -> {
System.out.printf("%s [consumer] receive %s attempts=%d%n",
dateFormat.format(new Date()),
messageView.getMessageId(), messageView.getDeliveryAttempt());
return ConsumeResult.SUCCESS;
});
// Start consumer
AtomicReference<PushConsumer> consumerRef = new
AtomicReference<>(pushConsumerBuilder.build());
System.out.printf("%s [consumer] started %n", dateFormat.format(new
Date()));
// Wait for a while
TimeUnit.SECONDS.sleep(3);
new Thread(() -> {
try {
// Restart consumer
consumerRef.get().close();
System.out.printf("%s [consumer] closed %n",
dateFormat.format(new Date()));
consumerRef.set(pushConsumerBuilder.build());
System.out.printf("%s [consumer] restarted %n",
dateFormat.format(new Date()));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// Wait for a while
TimeUnit.SECONDS.sleep(1);
// Then send one message
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
System.out.printf("%s [producer] started %n", dateFormat.format(new
Date()));
MessageBuilder messageBuilder = provider.newMessageBuilder()
.setTopic(topic)
.setBody("BODY".getBytes());
SendReceipt receipt = producer.send(messageBuilder.build());
System.out.printf("%s [producer] send %s %n", dateFormat.format(new
Date()), receipt.getMessageId());
TimeUnit.MINUTES.sleep(10);
producer.close();
consumerRef.get().close();
}
```
Output:
<img width="799" alt="Image"
src="https://github.com/user-attachments/assets/aaac9e0d-45e1-4d52-aa51-155c20557fe3"
/>
Proxy NullPointerException logs:
> 2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - clear handle of
this client when client unregister. group:group,
clientChannelInfo:ClientChannelInfo
[channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn,
remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081},
clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413,
lastUpdateTimestamp=1742178611780]
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove grpc channel
when client unregister. group:group, clientChannelInfo:ClientChannelInfo
[channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn,
remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081},
clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413,
lastUpdateTimestamp=1742178611780], removed:false
2025-03-17 10:30:11 INFO GrpcClientManagerThreadPool-1 - remove remoting
channel when client unregister. clientChannelInfo:ClientChannelInfo
[channel=GrpcClientChannel{clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn,
remoteAddress=127.0.0.1:52424, localAddress=127.0.0.1:8081},
clientId=ANONYMOUS-MB0@28703@0@f21f9v1emn, language=JAVA, version=413,
lastUpdateTimestamp=1742178611780]
2025-03-17 10:30:12 ERROR ConsumerProcessorExecutor-9 - internal server error
java.lang.NullPointerException: null
at
org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.getChannelId(ReceiptHandleGroupKey.java:34)
at
org.apache.rocketmq.proxy.common.ReceiptHandleGroupKey.hashCode(ReceiptHandleGroupKey.java:59)
at
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
at
org.apache.rocketmq.common.utils.ConcurrentHashMapUtils.computeIfAbsent(ConcurrentHashMapUtils.java:48)
at
org.apache.rocketmq.proxy.service.receipt.DefaultReceiptHandleManager.addReceiptHandle(DefaultReceiptHandleManager.java:128)
at
org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor.addReceiptHandle(ReceiptHandleProcessor.java:61)
at
org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor.addReceiptHandle(DefaultMessagingProcessor.java:362)
at
org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:145)
at
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
at
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
### Describe the Solution You'd Like
Call changeInvisibleTime to make message visible earlier when this happens.
### Describe Alternatives You've Considered
/
### Additional Context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]