lizhimins commented on code in PR #992:
URL: https://github.com/apache/rocketmq-clients/pull/992#discussion_r2097772707
##########
java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java:
##########
@@ -188,18 +195,52 @@ protected void startUp() throws Exception {
}
}
+ /**
+ * PushConsumerImpl shutdown order
+ * 1. when begin shutdown, do not send any new receive request
+ * 2. cancel scanAssignmentsFuture, do not create new processQueue
+ * 3. waiting all inflight receive request finished or timeout
+ * 4. shutdown consumptionExecutor and waiting all message consumption
finished
+ * 5. sleep 1s to ack message async
+ * 6. shutdown clientImpl
+ */
@Override
protected void shutDown() throws InterruptedException {
log.info("Begin to shutdown the rocketmq push consumer, clientId={}",
clientId);
if (null != scanAssignmentsFuture) {
scanAssignmentsFuture.cancel(false);
}
- super.shutDown();
+ log.info("Waiting for the inflight receive requests to be finished,
clientId={}", clientId);
+ waitingReceiveRequestFinished();
+ log.info("Begin to Shutdown consumption executor, clientId={}",
clientId);
this.consumptionExecutor.shutdown();
ExecutorServices.awaitTerminated(consumptionExecutor);
+ TimeUnit.SECONDS.sleep(1);
+ super.shutDown();
log.info("Shutdown the rocketmq push consumer successfully,
clientId={}", clientId);
}
+ private void waitingReceiveRequestFinished() {
+ Duration maxWaitingTime = clientConfiguration.getRequestTimeout()
+ .plus(pushSubscriptionSettings.getLongPollingTimeout());
+ try {
+ CompletableFuture.runAsync(() -> {
+ while
(inflightRequestCountInterceptor.getInflightReceiveRequestCount() > 0) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).get(maxWaitingTime.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
Review Comment:
try {
while
(inflightRequestCountInterceptor.getInflightReceiveRequestCount() > 0) {
if (System.currentTimeMillis() > endTime) {
log.warn("Timeout waiting for the inflight receive requests
to be finished, clientId={}", clientId);
return;
}
Thread.sleep(100); // 休眠100毫秒
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for the inflight receive
requests to be finished, clientId={}", clientId, e);
} catch (Exception e) {
log.error("Unexpected exception while waiting for the inflight
receive requests to be finished, clientId={}", clientId, e);
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]