[jira] [Commented] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-17 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690231#comment-17690231
 ] 

RivenSun commented on KAFKA-14729:
--

In fact, this is due to the abnormal code of the business side, which applied 
for a large amount of memory in a short period of time, reaching the maximum 
heap size requested by the java program.
In turn, it may cause the heartbeat thread to throw an OOM exception when 
creating HeartbeatRequest.

And OOM is just one of the abnormal exit conditions of heartbeatThread.

In short, when the heartbeat thread exits normally or abnormally, the closed 
field should be updated to true.


[~showuon] Thanks for your reply.

> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to 
> the abnormal exit of the heartbeat thread
> -
>
> Key: KAFKA-14729
> URL: https://issues.apache.org/jira/browse/KAFKA-14729
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.3.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the 
> `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer 
> clientId=consumer-5, groupId=*_dev_VA] Heartbeat thread failed due to 
> unexpected error java.lang.OutOfMemoryError: Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the 
> log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the 
> `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take 
> the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will 
> never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return 
> {color:#ff}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, 
> and the `kafkaConsumer#pollForFetches(timer)` method will always return very 
> quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new 
> judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
> return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690218#comment-17690218
 ] 

Luke Chen commented on KAFKA-14729:
---

The change makes sense to me. But I'm more interested in knowing why the 
heartbeat thread will be OOM? resource leak? Do you have any clue?

 

> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to 
> the abnormal exit of the heartbeat thread
> -
>
> Key: KAFKA-14729
> URL: https://issues.apache.org/jira/browse/KAFKA-14729
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.3.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the 
> `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer 
> clientId=consumer-5, groupId=*_dev_VA] Heartbeat thread failed due to 
> unexpected error java.lang.OutOfMemoryError: Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the 
> log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the 
> `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take 
> the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will 
> never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return 
> {color:#ff}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, 
> and the `kafkaConsumer#pollForFetches(timer)` method will always return very 
> quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new 
> judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
> return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-16 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690166#comment-17690166
 ] 

RivenSun commented on KAFKA-14729:
--

1. After careful consideration, in the case of KafkaConsumer's normal close, in 
the `finally` module of heartbeatThread `run()` method:
finally \{
log.debug("Heartbeat thread has closed");
closed = true;
}
2.The `AbstractCoordinator#timeToNextHeartbeat(now)` method is modified as 
follows:
{code:java}
protected synchronized long timeToNextHeartbeat(long now) {
// if we have not joined the group or we are preparing rebalance,
// we don't need to send heartbeats
if (state.hasNotJoinedGroup() ||
(heartbeatThread != null && (heartbeatThread.hasFailed() || 
heartbeatThread.closed)))
return Long.MAX_VALUE;
return heartbeat.timeToNextHeartbeat(now);
}{code}
The case of `heartbeatThread==null` is not considered. Because 
`heartbeatThread` will be updated to *null* only after KafkaConsumer calls the 
`close()` method.

> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to 
> the abnormal exit of the heartbeat thread
> -
>
> Key: KAFKA-14729
> URL: https://issues.apache.org/jira/browse/KAFKA-14729
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.3.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the 
> `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer 
> clientId=consumer-5, 
> groupId=asyncmq_local_us_dev-webcachesync_fcd02f8e-4f7e-4829-93ed-e8fa9cdc81f2_dev_VA]
>  Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: 
> Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the 
> log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the 
> `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take 
> the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will 
> never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return 
> {color:#ff}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, 
> and the `kafkaConsumer#pollForFetches(timer)` method will always return very 
> quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new 
> judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
> return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

2023-02-16 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17690151#comment-17690151
 ] 

RivenSun commented on KAFKA-14729:
--

Hi [~guozhang]   , [~showuon]
Could you give some suggestions for this issue?
Thanks.

> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to 
> the abnormal exit of the heartbeat thread
> -
>
> Key: KAFKA-14729
> URL: https://issues.apache.org/jira/browse/KAFKA-14729
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.3.0
>Reporter: RivenSun
>Assignee: RivenSun
>Priority: Major
> Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the 
> `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer 
> clientId=consumer-5, 
> groupId=asyncmq_local_us_dev-webcachesync_fcd02f8e-4f7e-4829-93ed-e8fa9cdc81f2_dev_VA]
>  Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: 
> Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the 
> log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the 
> `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take 
> the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will 
> never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return 
> {color:#ff}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, 
> and the `kafkaConsumer#pollForFetches(timer)` method will always return very 
> quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new 
> judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
> return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)