[ 
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850505#comment-17850505
 ] 

Chia-Ping Tsai edited comment on KAFKA-15305 at 5/29/24 9:37 PM:
-----------------------------------------------------------------

KAFKA-16639 needs this ticket to fix root cause. As this ticket described, 
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a 
heartbeat request to leave group, `ConsumerNetworkThread` will move it to 
`NetworkClient` and then exit the waiting ...

It seems to me the simple solution is to add a method "hasInFlightRequests" to 
"networkClientDelegate", and then we change the while condition [0] from 
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to 
"timer.notExpired() && networkClientDelegate.hasInFlightRequests()".

{code:java}
    boolean hasInFlightRequests() {
        return client.hasInFlightRequests();
    }
{code}

{code:java}
        do {
            networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
            timer.update();
        } while (timer.notExpired() && 
networkClientDelegate.hasInFlightRequests());
{code}

[~kirktrue] WDYT? 

[0] 
https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301


was (Author: chia7712):
KAFKA-16639 needs this ticket to fix root cause. As this ticket described, 
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a 
heartbeat request to leave group, `ConsumerNetworkThread` will move it to 
`NetworkClient` and then exit the waiting ...

It seems to me the simple solution is to add a method "hasInFlightRequests" to 
"networkClientDelegate", and then we change the while condition from 
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to 
"timer.notExpired() && networkClientDelegate.hasInFlightRequests()".

{code:java}
    boolean hasInFlightRequests() {
        return client.hasInFlightRequests();
    }
{code}

{code:java}
        do {
            networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
            timer.update();
        } while (timer.notExpired() && 
networkClientDelegate.hasInFlightRequests());
{code}

[~kirktrue] WDYT? 



> The background thread should try to process the remaining task until the 
> shutdown timer is expired
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15305
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Philip Nee
>            Assignee: Kirk True
>            Priority: Major
>              Labels: consumer-threading-refactor, timeout
>             Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a 
> grace period to process things before shutting down.  The background thread 
> currently doesn't do that, when close() is initiated, it will immediately 
> close all of its dependencies.
>  
> This might not be desirable because there could be remaining tasks to be 
> processed before closing.  Maybe the correct things to do is to first stop 
> accepting API request, second, let the runOnce() continue to run before the 
> shutdown timer expires, then we can force closing all of its dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to