[ https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neha Narkhede updated KAFKA-42: ------------------------------- Attachment: kafka-42-v1.patch This is a pretty tricky feature. Since it involves multiple state changes before reassignment can be marked complete, there are many failure conditions to think about and handle recovery correctly 1. Admin tool changes 1.1 Added a new admin command reassign-partition. Right now, it handles one partition, since I thought the failure/exit conditions and error messages are simpler to handle. But if people think we should add multiple partitions support in the same command invocation, that is fine too. 1.2 Added a new reassignPartition(topic, partition, RAR) API that registers a data change listener on /admin/reassign_partitions path and then creates the /admin/reassign_partitions={} path in zookeeper. It waits until that path is deleted from zookeeper. Once it is deleted, it checks if AR == RAR. If yes, it reports success otherwise failure. 1.3 Added a shutdown hook to handle command cancellation by the admin. In this case, it checks if reassignment was completed or not and logs the output accordingly. 2. Controller changes Reassigning replicas for a partition goes through a few stages - RAR = Reassigned replicas AR = Original list of replicas for partition 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR 2. Start new replicas RAR - AR. 3. Wait until new replicas are in sync with the leader 4. If the leader is not in RAR, elect a new leader from RAR 5. Stop old replicas AR - RAR 6. Write new AR 7. Remove partition from the /admin/reassign_partitions path The above state changes steps are inside the onPartitionReassignment() callback in KafkaController.scala 3. Partition Reassignment failure cases Broadly there are 2 types of failures we need to worry about - 1. Controller failover 2. Runtime error at the broker hosting the replica Let's go through the failure cases and recovery - 1. If the controller fails over between steps 1 and 2, the new controller on startup will read the non-empty admin path and just restart the partition reassignment process from scratch 2a. If the controller fails over between steps 2 and 3 above, the new controller will check if the new replicas are in sync with the leader or not. In either case, it will resume partition reassignment for the partitions listed in the admin path 2b. If, for some reason, the broker is not able to start the replicas, the isr listener for reassigned partitions will not trigger. So, the controller will not resume partition reassignment process for that partition. After some time, the admin command can be killed and it will report failure and delete the admin path so it can be retried. 3. If the controller fails over between steps 4 and 5, the new controller will realize that the new replicas are already in sync. If the new leader is part of the new replicas and is alive, it will not trigger leader re-election. Else it will re-elect the leader from amongst the live reassigned replicas. 4a. If the controller fails over between steps 5 and 6, the new controller resumes partition reassignment and repeats steps 4 onwards 4b. If, for some reason, the broker does not complete the leader state change, the partition after reassignment will be offline. This is a problem we have today even for leader election of newly created partitions. The controller doesn't wait for an acknowledgement from the broker for the make-leader state change. Nevertheless, the broker can fail even after sending a successful ack, so there isn't much value in waiting for an ack. However, I think the leader broker should expose an mbean to signify the availability of a partition. If people think this is a good idea, I can file a bug to fix this. 5. If the controller fails over between steps 6 and 7, it deletes the partition from the admin path marking the completion of this partition's reassignment. The partition reassignment zookeeper listener should record partition to be reassigned only if RAR not equal AR. 4. PartitionReassignedListener Starts the partition reassignment process unless - 1. Partition previously existed 2. New replicas are the same as existing replicas 3. Any replica in the new set of replicas are dead If any of the above conditions are satisfies, it logs an error and removes the partition from list of reassigned partitions notifying the admin command about the failure/completion. 5. PartitionLeaderSelector Added a self transition on the OnlinePartition state change. This is because, with cluster expansion and preferred replica leader election features, we need to move the leader for online partitions as well. Added partition leader selector module since we have 3 different ways of selecting the leader for a partition - 1. Offline leader selector - Pick an alive in sync replica as the leader. Otherwise, pick an alive assigned replica 2. Reassigned partition leader selector - Pick one of the alive in-sync reassigned replicas as the new leader 3. Preferred replica leader selector - Pick the preferred replica as the new leader 4. Testing 6. Replica state machine changes Added 2 new states to the replica state machine - 1. NewReplica : The controller can create new replicas during partition reassignment. In this state, a replica can only get become follower state change request. Valid previous state is NonExistentReplica 2. OnlineReplica : Once a replica is started and part of the assigned replicas for its partition, it is in this state. In this state, it can get either become leader or become follower state change requests. Valid previous state are NewReplica, OnlineReplica or OfflineReplica 3. OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica is down. Valid previous state are NewReplica, OnlineReplica 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica 7. Added 6 unit test cases to test - 1. Partition reassignment with leader of the partition in the new list of replicas 2. Partition reassignment with leader of the partition NOT in the new list of replicas 3. Partition reassignment with existing assigned replicas NOT overlapping with new list of replicas 4. Partition reassignment for a non existing partition. This is a negative test case 5. Partition reassignment for a partition that was completed upto step 6 by previous controller. This tests if after controller failover, it handles marking that partition's reassignment as completed. 6. Partition reassignment for a partition that was completed upto step 3 by previous controller. This tests if after controller failover, it handles leader re-election correctly and completes rest of the partition reassignment process. > Support rebalancing the partitions with replication > --------------------------------------------------- > > Key: KAFKA-42 > URL: https://issues.apache.org/jira/browse/KAFKA-42 > Project: Kafka > Issue Type: Bug > Components: core > Reporter: Jun Rao > Assignee: Neha Narkhede > Priority: Blocker > Labels: features > Fix For: 0.8 > > Attachments: kafka-42-v1.patch > > Original Estimate: 240h > Remaining Estimate: 240h > > As new brokers are added, we need to support moving partition replicas from > one set of brokers to another, online. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira