chia7712 commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1769232359
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1291,6 +1294,10 @@ private void releaseAssignmentAndLeaveGroup(final Timer
timer) {
log.warn("Consumer triggered an unsubscribe event to leave the
group but couldn't " +
"complete it within {} ms. It will proceed to close.",
timer.timeoutMs());
} finally {
+ // Regardless of success or failure of the unsubscribe process,
it's important to process any background
+ // events in the hope that our
ConsumerRebalanceListenerCallbackNeededEvent is present and can be executed.
+ processBackgroundEvents();
Review Comment:
IIRC, #16974 ensures `pollOnClose` will return `LEAVE_GROUP` request even
though the callback is NOT executed. so why we need to process all background
events? Or the purpose is to make sure callback is executed even though the
timeout is small?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1227,13 +1227,14 @@ public void close(Duration timeout) {
private void close(Duration timeout, boolean swallowException) {
log.trace("Closing the Kafka consumer");
+ boolean wasInterrupted = Thread.interrupted();
Review Comment:
the classic consumer does not reset the interrupted status, so this may
introduce some behavior change to consumer listener? for example, consumer
listener CAN'T see the interrupt anymore?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]