Tom Crayford created KAFKA-4596:
-----------------------------------
Summary: KIP-73 rebalance throttling breaks on plans for specific
partitions
Key: KAFKA-4596
URL: https://issues.apache.org/jira/browse/KAFKA-4596
Project: Kafka
Issue Type: Bug
Environment: Kafka 0.10.1.1
Reporter: Tom Crayford
The reassign-partitions.sh command fails if you both *throttle* and give it a
specific partition reassignment. For example, upon reassigning
{code}__consumer_offsets{code} partition 19, you get the following error:
{code}
Save this to use as the --reassignment-json-file option during rollback
Warning: You must run Verify periodically, until the reassignment completes, to
ensure the throttle is removed. You can also alter the throttle by rerunning
the Execute command passing a new value.
The throttle limit was set to 1048576 B/s
Partitions reassignment failed due to key not found: [__consumer_offsets,30]
java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at
kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
2)
at
kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
1)
at
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at
kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
at
kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
at
kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
at
kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
at
kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
at
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
at
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
at
kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
at
kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
{code}
This effectively breaks the throttling feature unless you want to rebalance
many many partitions at once.
For reference the command that was run is:
{code}
kafka-reassign-partitions.sh --reassignment-json-file
9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
-4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
{code}
and the contents of the plan file is:
{code}
{"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
{code}
This seems like a simple logic error to me, where we're trying to look up a
partition that's not been proposed, when we should not be. It looks like the
logic assumes that {code}Map.apply{code} doesn't error if the lookup value
isn't found, when in fact it does.
I checked that this cluster does indeed have the __consumer_offsets topic
populated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)