[ 
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13472121#comment-13472121
 ] 

Jun Rao commented on KAFKA-42:
------------------------------

Thanks for patch v4. 
AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved seems to fail.

[2012-10-08 21:30:06,005] ERROR [PartitionsReassignedListener on 0]: Error 
completing reassignment of partition [test, 0] 
(kafka.controller.PartitionsReassignedListener:102)
kafka.common.KafkaException: Only  replicas out of the new set of replicas 2,3 
for partition [test, 0] to be reassigned are alive. Failing partition 
reassignment
        at 
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:512)
        at 
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495)
        at 
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at 
kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2012-10-08 21:30:06,280] ERROR [PartitionsReassignedListener on 0]: Error 
completing reassignment of partition [test, 0] 
(kafka.controller.PartitionsReassignedListener:102)
org.I0Itec.zkclient.exception.ZkInterruptedException: 
java.lang.InterruptedException
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:363)
        at kafka.utils.ZkUtils$.getLeaderAndIsrForPartition(ZkUtils.scala:78)
        at 
kafka.controller.KafkaController.areReplicasInIsr(KafkaController.scala:323)
        at 
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:183)
        at 
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:509)
        at 
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495)
        at 
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at 
kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:485)
        at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:925)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 13 more
[2012-10-08 21:30:06,377] ERROR [Replica state machine on Controller 3]: Error 
while changing state of replica 2 for partition [test, 0] to OnlineReplica 
(kafka.controller.ReplicaStateMachine:102)
java.lang.AssertionError: assertion failed: Replica 2 for partition [test, 0] 
should be in the NewReplica,OnlineReplica,OfflineReplica states before moving 
to OnlineReplica state. Instead it is in NonExistentReplica state
        at scala.Predef$.assert(Predef.scala:91)
        at 
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
        at 
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130)
        at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
        at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
        at 
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187)
        at 
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
        at 
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at 
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186)
        at 
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
        at 
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
        at 
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
        at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
        at 
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
        at 
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
        at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2012-10-08 21:30:06,379] ERROR [Replica state machine on Controller 3]: Error 
while changing state of replica 3 for partition [test, 0] to OnlineReplica 
(kafka.controller.ReplicaStateMachine:102)
java.lang.AssertionError: assertion failed: Replica 3 for partition [test, 0] 
should be in the NewReplica,OnlineReplica,OfflineReplica states before moving 
to OnlineReplica state. Instead it is in NonExistentReplica state
        at scala.Predef$.assert(Predef.scala:91)
        at 
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
        at 
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130)
        at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
        at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
        at 
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187)
        at 
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
        at 
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at 
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186)
        at 
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
        at 
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
        at 
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
        at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
        at 
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
        at 
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
        at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2012-10-08 21:30:06,381] ERROR [Replica state machine on Controller 3]: Error 
while changing state of replica 0 for partition [test, 0] to OfflineReplica 
(kafka.controller.ReplicaStateMachine:102)
java.lang.AssertionError: assertion failed: Replica 0 for partition [test, 0] 
should be in the NewReplica,OnlineReplica states before moving to 
OfflineReplica state. Instead it is in NonExistentReplica state
        at scala.Predef$.assert(Predef.scala:91)
        at 
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
        at 
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:156)
        at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
        at 
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
        at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at 
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
        at 
kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply$mcVI$sp(KafkaController.scala:363)
        at 
kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362)
        at 
kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362)
        at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
        at 
kafka.controller.KafkaController.stopOldReplicasOfReassignedPartition(KafkaController.scala:362)
        at 
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:193)
        at 
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
        at 
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
        at 
kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
        at 
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
        at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
        at 
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
        at 
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
        at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
        at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but 
was:<List(0, 1)>
junit.framework.AssertionFailedError: Partition should have been reassigned to 
0, 1 expected:<List(2, 3)> but was:<List(0, 1)>
        at junit.framework.Assert.fail(Assert.java:47)
        at junit.framework.Assert.failNotEquals(Assert.java:277)
        at junit.framework.Assert.assertEquals(Assert.java:64)
        at 
kafka.admin.AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved(AdminTest.scala:361)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at junit.framework.TestCase.runTest(TestCase.java:164)

                
> Support rebalancing the partitions with replication
> ---------------------------------------------------
>
>                 Key: KAFKA-42
>                 URL: https://issues.apache.org/jira/browse/KAFKA-42
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Jun Rao
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: features
>             Fix For: 0.8
>
>         Attachments: kafka-42-v1.patch, kafka-42-v2.patch, kafka-42-v3.patch, 
> kafka-42-v4.patch
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from 
> one set of brokers to another, online.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to