[ https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850505#comment-17850505 ]
Chia-Ping Tsai edited comment on KAFKA-15305 at 5/30/24 1:17 PM: ----------------------------------------------------------------- KAFKA-16639 needs this ticket to fix root cause. As this ticket described, `ConsumerNetworkThread` does not honor the close timeout. Even though we put a heartbeat request to leave group, `ConsumerNetworkThread` will move it to `NetworkClient` and then exit the waiting ... It seems to me the simple solution is to add a method "hasInFlightRequests" to "networkClientDelegate", and then we change the while condition [0] from "timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to "timer.notExpired() && networkClientDelegate.hasInFlightRequests()". {code:java} boolean hasPendingRequessts() { return client.hasInFlightRequests() || !networkClientDelegate.unsentRequests().isEmpty() } {code} {code:java} do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasPendingRequessts()); {code} [~kirktrue] WDYT? [0] https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301 was (Author: chia7712): KAFKA-16639 needs this ticket to fix root cause. As this ticket described, `ConsumerNetworkThread` does not honor the close timeout. Even though we put a heartbeat request to leave group, `ConsumerNetworkThread` will move it to `NetworkClient` and then exit the waiting ... It seems to me the simple solution is to add a method "hasInFlightRequests" to "networkClientDelegate", and then we change the while condition [0] from "timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to "timer.notExpired() && networkClientDelegate.hasInFlightRequests()". {code:java} boolean hasInFlightRequests() { return client.hasInFlightRequests(); } {code} {code:java} do { networkClientDelegate.poll(timer.remainingMs(), timer.currentTimeMs()); timer.update(); } while (timer.notExpired() && networkClientDelegate.hasInFlightRequests()); {code} [~kirktrue] WDYT? [0] https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301 > The background thread should try to process the remaining task until the > shutdown timer is expired > -------------------------------------------------------------------------------------------------- > > Key: KAFKA-15305 > URL: https://issues.apache.org/jira/browse/KAFKA-15305 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Philip Nee > Assignee: Chia-Ping Tsai > Priority: Major > Labels: consumer-threading-refactor, timeout > Fix For: 3.8.0 > > > While working on https://issues.apache.org/jira/browse/KAFKA-15304 > close() API supplies a timeout parameter so that the consumer can have a > grace period to process things before shutting down. The background thread > currently doesn't do that, when close() is initiated, it will immediately > close all of its dependencies. > > This might not be desirable because there could be remaining tasks to be > processed before closing. Maybe the correct things to do is to first stop > accepting API request, second, let the runOnce() continue to run before the > shutdown timer expires, then we can force closing all of its dependencies. -- This message was sent by Atlassian Jira (v8.20.10#820010)