[ 
https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690231#comment-17690231
 ] 

RivenSun commented on KAFKA-14729:
----------------------------------

In fact, this is due to the abnormal code of the business side, which applied 
for a large amount of memory in a short period of time, reaching the maximum 
heap size requested by the java program.
In turn, it may cause the heartbeat thread to throw an OOM exception when 
creating HeartbeatRequest.

And OOM is just one of the abnormal exit conditions of heartbeatThread.

In short, when the heartbeat thread exits normally or abnormally, the closed 
field should be updated to true.


[~showuon] Thanks for your reply.

> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to 
> the abnormal exit of the heartbeat thread
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14729
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14729
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.3.0
>            Reporter: RivenSun
>            Assignee: RivenSun
>            Priority: Major
>         Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the 
> `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer 
> clientId=consumer-5, groupId=*****_dev_VA] Heartbeat thread failed due to 
> unexpected error java.lang.OutOfMemoryError: Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the 
> log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the 
> `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take 
> the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will 
> never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return 
> {color:#ff0000}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, 
> and the `kafkaConsumer#pollForFetches(timer)` method will always return very 
> quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new 
> judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
>     return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to