[ https://issues.apache.org/jira/browse/KAFKA-4596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15801394#comment-15801394 ]
Ismael Juma commented on KAFKA-4596: ------------------------------------ Thanks for the report. It is true that the existing code assumes that all existing partitions will be in the proposed map. The workaround seems to be to include all the partitions in the JSON (even if many of them have the same replica assignment to do it incrementally). This should work although the Controller will log every time it processes the reassignment for a partition where there was no change. Would you like to submit a fix? > KIP-73 rebalance throttling breaks on plans for specific partitions > ------------------------------------------------------------------- > > Key: KAFKA-4596 > URL: https://issues.apache.org/jira/browse/KAFKA-4596 > Project: Kafka > Issue Type: Bug > Environment: Kafka 0.10.1.1 > Reporter: Tom Crayford > > The reassign-partitions.sh command fails if you both *throttle* and give it a > specific partition reassignment. For example, upon reassigning > {code}__consumer_offsets{code} partition 19, you get the following error: > {code} > Save this to use as the --reassignment-json-file option during rollback > Warning: You must run Verify periodically, until the reassignment completes, > to ensure the throttle is removed. You can also alter the throttle by > rerunning the Execute command passing a new value. > The throttle limit was set to 1048576 B/s > Partitions reassignment failed due to key not found: [__consumer_offsets,30] > java.util.NoSuchElementException: key not found: [__consumer_offsets,30] > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:59) > at > kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37 > 2) > at > kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37 > 1) > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371) > at > kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347) > at > kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343) > at > kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317) > at > kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387) > at > kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165) > at > kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149) > at > kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46) > at > kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala) > {code} > This effectively breaks the throttling feature unless you want to rebalance > many many partitions at once. > For reference the command that was run is: > {code} > kafka-reassign-partitions.sh --reassignment-json-file > 9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829 > -4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576' > {code} > and the contents of the plan file is: > {code} > {"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]} > {code} > This seems like a simple logic error to me, where we're trying to look up a > partition that's not been proposed, when we should not be. It looks like the > logic assumes that {code}Map.apply{code} doesn't error if the lookup value > isn't found, when in fact it does. > I checked that this cluster does indeed have the __consumer_offsets topic > populated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)