[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17156214#comment-17156214 ]
Jerry Wei edited comment on KAFKA-10134 at 7/12/20, 6:29 AM: ------------------------------------------------------------- [~guozhang], I have three brokers and 10 consumers. When I restart one of consumers, some of other consumers will be with high CPU issue. {code:java} // from KafkaConsumer.java (a fine fix) // private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout); if (includeMetadataInTimeout) { // try to update assignment metadata BUT do not need to block on the timer if we still have // some assigned partitions, since even if we are 1) in the middle of a rebalance // or 2) have partitions with unknown starting positions we may still want to return some data // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms // to never block on completing the rebalance procedure if there's any if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) { updateAssignmentMetadataIfNeeded(timer); } else { final Timer updateMetadataTimer = time.timer(0L); updateAssignmentMetadataIfNeeded(updateMetadataTimer); timer.update(updateMetadataTimer.currentTimeMs()); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } {code} {code:java} // from KafkaConsumer.java (last commit) // private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout); if (includeMetadataInTimeout) { // try to update assignment metadata BUT do not need to block on the timer for join group updateAssignmentMetadataIfNeeded(timer, false); } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) { log.warn("Still waiting for metadata"); } } {code} Per the above two commits I have one question about `updateAssignmentMetadataIfNeeded(timer, false);` why the second parameter is false? Per my understanding when it's false, actually it's same as before `updateAssignmentMetadataIfNeeded(time.timer(0L));` I've tested w/ true, it looks fine for us and I'm thinking when it's w/ true, the behavior is similar to [the commit|https://github.com/apache/kafka/pull/8934/commits/333a967ec22ea22babf32b18349b76b6552a2fac]. was (Author: zhowei): [~guozhang], I have three brokers and 10 consumers. When I restart one of consumers, some of other consumers will be with high CPU issue. {code:java} // from KafkaConsumer.java (a fine fix) // private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout); if (includeMetadataInTimeout) { // try to update assignment metadata BUT do not need to block on the timer if we still have // some assigned partitions, since even if we are 1) in the middle of a rebalance // or 2) have partitions with unknown starting positions we may still want to return some data // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms // to never block on completing the rebalance procedure if there's any if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) { updateAssignmentMetadataIfNeeded(timer); } else { final Timer updateMetadataTimer = time.timer(0L); updateAssignmentMetadataIfNeeded(updateMetadataTimer); timer.update(updateMetadataTimer.currentTimeMs()); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } {code} {code:java} // from KafkaConsumer.java (last commit) // private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout); if (includeMetadataInTimeout) { // try to update assignment metadata BUT do not need to block on the timer for join group updateAssignmentMetadataIfNeeded(timer, false); } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) { log.warn("Still waiting for metadata"); } } {code} Per the above two commits I have one question about `updateAssignmentMetadataIfNeeded(timer, false);` why the second parameter is false? Per my understanding when it's false, actually it's same as before `updateAssignmentMetadataIfNeeded(time.timer(0L));` I've tested w/ true, it looks fine for us and I'm thinking when it's w/ true, the behavior is similar to [ the commit|https://github.com/apache/kafka/pull/8934/commits/333a967ec22ea22babf32b18349b76b6552a2fac]. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > ------------------------------------------------------------------------ > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 2.5.0 > Reporter: Sean Guo > Assignee: Guozhang Wang > Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable > [0x00007fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 > runnable [0x00007fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)