Tom Crayford created KAFKA-4596:
-----------------------------------

             Summary: KIP-73 rebalance throttling breaks on plans for specific 
partitions
                 Key: KAFKA-4596
                 URL: https://issues.apache.org/jira/browse/KAFKA-4596
             Project: Kafka
          Issue Type: Bug
         Environment: Kafka 0.10.1.1
            Reporter: Tom Crayford


The reassign-partitions.sh command fails if you both *throttle* and give it a 
specific partition reassignment. For example, upon reassigning 
{code}__consumer_offsets{code} partition 19, you get the following error:

{code}
Save this to use as the --reassignment-json-file option during rollback
Warning: You must run Verify periodically, until the reassignment completes, to 
ensure the throttle is removed. You can also alter the throttle by rerunning 
the Execute command passing a new value.
The throttle limit was set to 1048576 B/s
Partitions reassignment failed due to key not found: [__consumer_offsets,30]
java.util.NoSuchElementException: key not found: [__consumer_offsets,30]
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:59)
        at scala.collection.MapLike$class.apply(MapLike.scala:141)
        at scala.collection.AbstractMap.apply(Map.scala:59)
        at 
kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
2)
        at 
kafka.admin.ReassignPartitionsCommand$$anonfun$kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions$1.apply(ReassignPartitionsCommand.scala:37
1)
        at 
scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at 
scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at 
scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
        at 
kafka.admin.ReassignPartitionsCommand.kafka$admin$ReassignPartitionsCommand$$preRebalanceReplicaForMovingPartitions(ReassignPartitionsCommand.scala:371)
        at 
kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:347)
        at 
kafka.admin.ReassignPartitionsCommand$$anonfun$assignThrottledReplicas$2.apply(ReassignPartitionsCommand.scala:343)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
kafka.admin.ReassignPartitionsCommand.assignThrottledReplicas(ReassignPartitionsCommand.scala:343)
        at 
kafka.admin.ReassignPartitionsCommand.maybeThrottle(ReassignPartitionsCommand.scala:317)
        at 
kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:387)
        at 
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:165)
        at 
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:149)
        at 
kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:46)
        at 
kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
{code}

This effectively breaks the throttling feature unless you want to rebalance 
many many partitions at once.

For reference the command that was run is:

{code}
kafka-reassign-partitions.sh --reassignment-json-file 
9b137f13_7e91_4757_a7b9_554ff458e7df.3d4269d5-e829
-4e86-9d05-91e9e2fcb7e7.reassignment_plan.json --throttle 1048576'
{code}

and the contents of the plan file is:

{code}
{"version":1,"partitions":[{"topic":"__consumer_offsets","partition":19,"replicas":[2,1,0]}
{code}

This seems like a simple logic error to me, where we're trying to look up a 
partition that's not been proposed, when we should not be. It looks like the 
logic assumes that {code}Map.apply{code} doesn't error if the lookup value 
isn't found, when in fact it does.

I checked that this cluster does indeed have the __consumer_offsets topic 
populated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to