[
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13472121#comment-13472121
]
Jun Rao commented on KAFKA-42:
------------------------------
Thanks for patch v4.
AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved seems to fail.
[2012-10-08 21:30:06,005] ERROR [PartitionsReassignedListener on 0]: Error
completing reassignment of partition [test, 0]
(kafka.controller.PartitionsReassignedListener:102)
kafka.common.KafkaException: Only replicas out of the new set of replicas 2,3
for partition [test, 0] to be reassigned are alive. Failing partition
reassignment
at
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:512)
at
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495)
at
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at
kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2012-10-08 21:30:06,280] ERROR [PartitionsReassignedListener on 0]: Error
completing reassignment of partition [test, 0]
(kafka.controller.PartitionsReassignedListener:102)
org.I0Itec.zkclient.exception.ZkInterruptedException:
java.lang.InterruptedException
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
at kafka.utils.ZkUtils$.readDataMaybeNull(ZkUtils.scala:363)
at kafka.utils.ZkUtils$.getLeaderAndIsrForPartition(ZkUtils.scala:78)
at
kafka.controller.KafkaController.areReplicasInIsr(KafkaController.scala:323)
at
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:183)
at
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.liftedTree1$1(KafkaController.scala:509)
at
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:495)
at
kafka.controller.PartitionsReassignedListener$$anonfun$handleDataChange$2.apply(KafkaController.scala:489)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at
kafka.controller.PartitionsReassignedListener.handleDataChange(KafkaController.scala:489)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:925)
at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
... 13 more
[2012-10-08 21:30:06,377] ERROR [Replica state machine on Controller 3]: Error
while changing state of replica 2 for partition [test, 0] to OnlineReplica
(kafka.controller.ReplicaStateMachine:102)
java.lang.AssertionError: assertion failed: Replica 2 for partition [test, 0]
should be in the NewReplica,OnlineReplica,OfflineReplica states before moving
to OnlineReplica state. Instead it is in NonExistentReplica state
at scala.Predef$.assert(Predef.scala:91)
at
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
at
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
at
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187)
at
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
at
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186)
at
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
at
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
at
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
at
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
at
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
at
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2012-10-08 21:30:06,379] ERROR [Replica state machine on Controller 3]: Error
while changing state of replica 3 for partition [test, 0] to OnlineReplica
(kafka.controller.ReplicaStateMachine:102)
java.lang.AssertionError: assertion failed: Replica 3 for partition [test, 0]
should be in the NewReplica,OnlineReplica,OfflineReplica states before moving
to OnlineReplica state. Instead it is in NonExistentReplica state
at scala.Predef$.assert(Predef.scala:91)
at
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
at
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:130)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
at
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply$mcVI$sp(KafkaController.scala:187)
at
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
at
kafka.controller.KafkaController$$anonfun$onPartitionReassignment$1.apply(KafkaController.scala:186)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:186)
at
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
at
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
at
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
at
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
at
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
at
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2012-10-08 21:30:06,381] ERROR [Replica state machine on Controller 3]: Error
while changing state of replica 0 for partition [test, 0] to OfflineReplica
(kafka.controller.ReplicaStateMachine:102)
java.lang.AssertionError: assertion failed: Replica 0 for partition [test, 0]
should be in the NewReplica,OnlineReplica states before moving to
OfflineReplica state. Instead it is in NonExistentReplica state
at scala.Predef$.assert(Predef.scala:91)
at
kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:194)
at
kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:156)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
at
kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:86)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:86)
at
kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply$mcVI$sp(KafkaController.scala:363)
at
kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362)
at
kafka.controller.KafkaController$$anonfun$stopOldReplicasOfReassignedPartition$1.apply(KafkaController.scala:362)
at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
at
kafka.controller.KafkaController.stopOldReplicasOfReassignedPartition(KafkaController.scala:362)
at
kafka.controller.KafkaController.onPartitionReassignment(KafkaController.scala:193)
at
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:300)
at
kafka.controller.KafkaController$$anonfun$initializeReassignedPartitionsContext$5.apply(KafkaController.scala:299)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at
kafka.controller.KafkaController.initializeReassignedPartitionsContext(KafkaController.scala:299)
at
kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:284)
at
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:79)
at
kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:52)
at
kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:55)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:94)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Partition should have been reassigned to 0, 1 expected:<List(2, 3)> but
was:<List(0, 1)>
junit.framework.AssertionFailedError: Partition should have been reassigned to
0, 1 expected:<List(2, 3)> but was:<List(0, 1)>
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.failNotEquals(Assert.java:277)
at junit.framework.Assert.assertEquals(Assert.java:64)
at
kafka.admin.AdminTest.testResumePartitionReassignmentAfterLeaderWasMoved(AdminTest.scala:361)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at junit.framework.TestCase.runTest(TestCase.java:164)
> Support rebalancing the partitions with replication
> ---------------------------------------------------
>
> Key: KAFKA-42
> URL: https://issues.apache.org/jira/browse/KAFKA-42
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Jun Rao
> Assignee: Neha Narkhede
> Priority: Blocker
> Labels: features
> Fix For: 0.8
>
> Attachments: kafka-42-v1.patch, kafka-42-v2.patch, kafka-42-v3.patch,
> kafka-42-v4.patch
>
> Original Estimate: 240h
> Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from
> one set of brokers to another, online.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira