RivenSun created KAFKA-14729: -------------------------------- Summary: The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread Key: KAFKA-14729 URL: https://issues.apache.org/jira/browse/KAFKA-14729 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.3.2, 3.3.0 Reporter: RivenSun Assignee: RivenSun Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
h2. case situation: 1. The business program occupies a large amount of memory, causing the `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally. {code:java} 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator:1105][kafka-coor][Consumer clientId=consumer-5, groupId=asyncmq_local_us_dev-webcachesync_fcd02f8e-4f7e-4829-93ed-e8fa9cdc81f2_dev_VA] Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: Java heap space {code} 2. The finally module of the heartbeat thread ` run()` method only prints the log, but does not update the value of `AbstractCoordinator.state`. 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take the value `timeToNextHeartbeat(now)`. 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will never be updated again. And the `AbstractCoordinator.state` field value will always be {*}STABLE{*}, So the `timeToNextHeartbeat(long now)` method will return {color:#FF0000}0{color}. 0 will be passed to the underlying `networkClient#poll` method. In the end, the user calls the `poll(duration)` method in an endless loop, and the `kafkaConsumer#pollForFetches(timer)` method will always return very quickly, taking up a lot of cpu. h2. solution: 1. Refer to the note of `MemberState.STABLE` : {code:java} the client has joined and is sending heartbeats.{code} When the heartbeat thread exits, in `finally` module, we should add code: {code:java} state = MemberState.UNJOINED; closed = true;{code} 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new judgment condition: `heartbeatThread.hasFailed()` {code:java} if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed()) return Long.MAX_VALUE; return heartbeat.timeToNextHeartbeat(now);{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)