[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17156214#comment-17156214
 ] 

Jerry Wei commented on KAFKA-10134:
-----------------------------------

[~guozhang], I have three brokers and 10 consumers. When I restart one of 
consumers, some of other consumers will be with high CPU issue. 


{code:java}
// from KafkaConsumer.java (a fine fix)
// private ConsumerRecords<K, V> poll(final Timer timer, final boolean 
includeMetadataInTimeout);
                if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to 
block on the timer if we still have
                    // some assigned partitions, since even if we are 1) in the 
middle of a rebalance
                    // or 2) have partitions with unknown starting positions we 
may still want to return some data
                    // as long as there are some partitions fetchable; NOTE we 
always use a timer with 0ms
                    // to never block on completing the rebalance procedure if 
there's any
                    if (subscriptions.fetchablePartitions(tp -> 
true).isEmpty()) {
                        updateAssignmentMetadataIfNeeded(timer);
                    } else {
                        final Timer updateMetadataTimer = time.timer(0L);
                        updateAssignmentMetadataIfNeeded(updateMetadataTimer);
                        timer.update(updateMetadataTimer.currentTimeMs());
                    }
                } else {
                    while 
(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                        log.warn("Still waiting for metadata");
                    }
                }
{code}


{code:java}
// from KafkaConsumer.java (last commit)
// private ConsumerRecords<K, V> poll(final Timer timer, final boolean 
includeMetadataInTimeout);

               if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to 
block on the timer for join group
                    updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while 
(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                        log.warn("Still waiting for metadata");
                    }
                }
{code}

Per the above two commits I have one question about 
`updateAssignmentMetadataIfNeeded(timer, false);` why the second parameter is 
false? Per my understanding when it's false, actually it's same as before 
`updateAssignmentMetadataIfNeeded(time.timer(0L));` 
I've tested w/ true, it looks fine for us and I'm thinking when it's w/ true, 
the behavior is similar to [ the 
commit|https://github.com/apache/kafka/pull/8934/commits/333a967ec22ea22babf32b18349b76b6552a2fac].

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-10134
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10134
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.5.0
>            Reporter: Sean Guo
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable  
> [0x00007fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 
> runnable  [0x00007fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to