[ https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen resolved KAFKA-14729. ------------------------------- Fix Version/s: 3.5.0 Resolution: Fixed > The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to > the abnormal exit of the heartbeat thread > --------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-14729 > URL: https://issues.apache.org/jira/browse/KAFKA-14729 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 3.3.0 > Reporter: RivenSun > Assignee: RivenSun > Priority: Major > Fix For: 3.5.0 > > Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt > > > h2. case situation: > 1. The business program occupies a large amount of memory, causing the > `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally. > {code:java} > 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer > clientId=consumer-5, groupId=*****_dev_VA] Heartbeat thread failed due to > unexpected error java.lang.OutOfMemoryError: Java heap space {code} > 2. The finally module of the heartbeat thread ` run()` method only prints the > log, but does not update the value of `AbstractCoordinator.state`. > 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the > `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take > the value `timeToNextHeartbeat(now)`. > 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will > never be updated again. > And the `AbstractCoordinator.state` field value will always be {*}STABLE{*}, > So the `timeToNextHeartbeat(long now)` method will return > {color:#ff0000}0{color}. > 0 will be passed to the underlying `networkClient#poll` method. > > In the end, the user calls the `poll(duration)` method in an endless loop, > and the `kafkaConsumer#pollForFetches(timer)` method will always return very > quickly, taking up a lot of cpu. > > h2. solution: > 1. Refer to the note of `MemberState.STABLE` : > {code:java} > the client has joined and is sending heartbeats.{code} > When the heartbeat thread exits, in `finally` module, we should add code: > {code:java} > state = MemberState.UNJOINED; > closed = true;{code} > 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new > judgment condition: `heartbeatThread.hasFailed()` > {code:java} > if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed()) > return Long.MAX_VALUE; > return heartbeat.timeToNextHeartbeat(now);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)