[ https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13472121#comment-13472121 ]
Jun Rao commented on KAFKA-42: ------------------------------ Thanks for patch v4. AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved seems to fail. [2012-10-08 21:30:06,005] ERROR [PartitionsReassignedListener on 0]: Error completing reassignment of partition [test, 0] (kafka.controller.PartitionsReassignedListener:102) kafka.common.KafkaException: Only replicas out of the new set of replicas 2,3 for partition [test, 0] to be reassigned are alive. Failing partition reassignment at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:512) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2012-10-08 21:30:06,280] ERROR [PartitionsReassignedListener on 0]: Error completing reassignment of partition [test, 0] (kafka.controller.PartitionsReassignedListener:102) org.I0Itec.zkclient.exception.ZkInterruptedException: java.lang.InterruptedException at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:363) at kafka.utils.ZkUtils$.getLeaderAndIsrForPartition(ZkUtils.scala:78) at kafka.controller.KafkaController.areReplicasInIsr(KafkaController.scala:323) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:183) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:509) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495) at kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:925) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) ... 13 more [2012-10-08 21:30:06,377] ERROR [Replica state machine on Controller 3]: Error while changing state of replica 2 for partition [test, 0] to OnlineReplica (kafka.controller.ReplicaStateMachine:102) java.lang.AssertionError: assertion failed: Replica 2 for partition [test, 0] should be in the NewReplica,OnlineReplica,OfflineReplica states before moving to OnlineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:91) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2012-10-08 21:30:06,379] ERROR [Replica state machine on Controller 3]: Error while changing state of replica 3 for partition [test, 0] to OnlineReplica (kafka.controller.ReplicaStateMachine:102) java.lang.AssertionError: assertion failed: Replica 3 for partition [test, 0] should be in the NewReplica,OnlineReplica,OfflineReplica states before moving to OnlineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:91) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2012-10-08 21:30:06,381] ERROR [Replica state machine on Controller 3]: Error while changing state of replica 0 for partition [test, 0] to OfflineReplica (kafka.controller.ReplicaStateMachine:102) java.lang.AssertionError: assertion failed: Replica 0 for partition [test, 0] should be in the NewReplica,OnlineReplica states before moving to OfflineReplica state. Instead it is in NonExistentReplica state at scala.Predef$.assert(Predef.scala:91) at kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194) at kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:156) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86) at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply$mcVI$sp(KafkaController.scala:363) at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362) at kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362) at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) at kafka.controller.KafkaController.stopOldReplicasOfReassignedPartition(KafkaController.scala:362) at kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:193) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300) at kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55) at kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but was:<List(0, 1)> junit.framework.AssertionFailedError: Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but was:<List(0, 1)> at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.failNotEquals(Assert.java:277) at junit.framework.Assert.assertEquals(Assert.java:64) at kafka.admin.AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved(AdminTest.scala:361) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at junit.framework.TestCase.runTest(TestCase.java:164) > Support rebalancing the partitions with replication > --------------------------------------------------- > > Key: KAFKA-42 > URL: https://issues.apache.org/jira/browse/KAFKA-42 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Assignee: Neha Narkhede > Priority: Blocker > Labels: features > Fix For: 0.8 > > Attachments: kafka-42-v1.patch, kafka-42-v2.patch, kafka-42-v3.patch, > kafka-42-v4.patch > > Original Estimate: 240h > Remaining Estimate: 240h > > As new brokers are added, we need to support moving partition replicas from > one set of brokers to another, online. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira