[ 
https://issues.apache.org/jira/browse/KAFKA-6134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6134:
-----------------------------------
    Description: 
We've had a couple users reporting spikes in memory usage when the controller 
is performing partition reassignment in 0.11. After investigation, we found 
that the controller event queue was using most of the retained memory. In 
particular, we found several thousand {{PartitionReassignment}} objects, each 
one containing one fewer partition than the previous one (see the attached 
image).

>From the code, it seems clear why this is happening. We have a watch on the 
>partition reassignment path which adds the {{PartitionReassignment}} object to 
>the event queue:

{code}
  override def handleDataChange(dataPath: String, data: Any): Unit = {
    val partitionReassignment = 
ZkUtils.parsePartitionReassignmentData(data.toString)
    eventManager.put(controller.PartitionReassignment(partitionReassignment))
  }
{code}

In the {{PartitionReassignment}} event handler, we iterate through all of the 
partitions in the reassignment. After we complete reassignment for each 
partition, we remove that partition and update the node in zookeeper. 

{code}
    // remove this partition from that list
    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
topicAndPartition
    // write the new list to zookeeper
  
zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
{code}

This triggers the handler above which adds a new event in the queue. So what 
you get is an n^2 increase in memory where n is the number of partitions.

  was:
We've had a couple users reporting spikes in memory usage when the controller 
is performing partition reassignment in 0.11. After investigation, we found 
that the controller event queue was using most of the retained memory. In 
particular, we found several thousand {{PartitionReassignment}} objects, each 
one containing one fewer partition than the previous one:

!Screen Shot 2017-10-26 at 3.05.40 PM.png|thumbnail!.

>From the code, it seems clear why this is happening. We have a watch on the 
>partition reassignment path which adds the {{PartitionReassignment}} object to 
>the event queue:

{code}
  override def handleDataChange(dataPath: String, data: Any): Unit = {
    val partitionReassignment = 
ZkUtils.parsePartitionReassignmentData(data.toString)
    eventManager.put(controller.PartitionReassignment(partitionReassignment))
  }
{code}

In the {{PartitionReassignment}} event handler, we iterate through all of the 
partitions in the reassignment. After we complete reassignment for each 
partition, we remove that partition and update the node in zookeeper. 

{code}
    // remove this partition from that list
    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
topicAndPartition
    // write the new list to zookeeper
  
zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
{code}

This triggers the handler above which adds a new event in the queue. So what 
you get is an n^2 increase in memory where n is the number of partitions.


> High memory usage on controller during partition reassignment
> -------------------------------------------------------------
>
>                 Key: KAFKA-6134
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6134
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.11.0.0, 0.11.0.1
>            Reporter: Jason Gustafson
>         Attachments: Screen Shot 2017-10-26 at 3.05.40 PM.png
>
>
> We've had a couple users reporting spikes in memory usage when the controller 
> is performing partition reassignment in 0.11. After investigation, we found 
> that the controller event queue was using most of the retained memory. In 
> particular, we found several thousand {{PartitionReassignment}} objects, each 
> one containing one fewer partition than the previous one (see the attached 
> image).
> From the code, it seems clear why this is happening. We have a watch on the 
> partition reassignment path which adds the {{PartitionReassignment}} object 
> to the event queue:
> {code}
>   override def handleDataChange(dataPath: String, data: Any): Unit = {
>     val partitionReassignment = 
> ZkUtils.parsePartitionReassignmentData(data.toString)
>     eventManager.put(controller.PartitionReassignment(partitionReassignment))
>   }
> {code}
> In the {{PartitionReassignment}} event handler, we iterate through all of the 
> partitions in the reassignment. After we complete reassignment for each 
> partition, we remove that partition and update the node in zookeeper. 
> {code}
>     // remove this partition from that list
>     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - 
> topicAndPartition
>     // write the new list to zookeeper
>   
> zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
> {code}
> This triggers the handler above which adds a new event in the queue. So what 
> you get is an n^2 increase in memory where n is the number of partitions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to