[ https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joel Koshy updated KAFKA-340: ----------------------------- Attachment: KAFKA-340-v3.patch Thanks for the reviews. I did a lot of stand-alone testing, which I will continue while you review this. There are probably several corner cases and bounce sequences that are yet to be tested. For e.g., an (albeit non-critical) caveat in not using zk to record the shutting down brokers is that on controller failover, the new controller will be unaware of the fact. E.g., say there are two brokers (0, 1), one topic with replication factor 2. 0 is the leader and controller. Shutdown command on broker 1. Shutdown command on broker 0 (which will give an incomplete status). At this point only 0 is in leaderAndIsr. If you send a SIGTERM to broker 0, then broker 1 will become the leader again. > 3.2 If removing the replica from isr is unnecessary, why do we still have to > send the leader and isr request to the > leader ? Fixed this. Also in v2, I had an additional requirement on removing a replica from ISR: if it is the only replica in ISR, then it cannot be removed. i.e., you would then end up with a leaderAndIsr with a leader, but an empty ISR which I felt does not really make sense. Anyway, I did not really need to do this in v3 - when shutting down a broker, I only remove it from ISR if the partition leadership is not on the broker that is shutting down. Working through this comment led me to wonder - when the last replica of a partition goes offline, the leaderAndIsr path isn't updated - i.e., the zk path still says the leader is "x" although "x" is offline. It is non-critical though since clients won't be able to communicate with it, but maybe leader election should enter an invalid broker id in such cases. > 3.4 What is the point of sending leader and isr request at the end of > shutdownBroker, since the OfflineReplica state > change would've taken care of that anyway. It seems like you just need to > send the stop replica request with the delete > partitions flag turned off, no ? I still need (as an optimization) to send the leader and isr request to the leaders of all partitions that are present on the shutting down broker so it can remove the shutting down broker from its inSyncReplicas cache (in Partition.scala) so it no longer waits for acks from the shutting down broker if a producer request's num-acks is set to -1. Otherwise, we have to wait for the leader to "organically" shrink the ISR. This also applies to partitions which are moved (i.e., partitions for which the shutting down broker was the leader): the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr request to the shutting down broker as well (to tell it that it is no longer the leader) at which point it will start up a replica fetcher and re-enter the ISR. So in fact, there is actually not much point in removing the "current leader" from the ISR in the ControlledShutdownLeaderSelector.selectLeader. Jun's comments Thanks for patch v2. Some more comments and clarification: > 20. ControllerContext.liveBrokers: We already filter out shutdown brokers in > the getter. Do we still need to do the > filtering at the setter? It is not required but doesn't hurt. So I had left it there so the filter on the getter has less elements to deal with. Although I don't think it matters at all to remove it from the setter - it's all in memory and we're dealing with very small sets. Anyway, removed it from the setter in v3. > 21. KafkaController.shutdownBroker(): > 21.1 We probably should synchronize this method to allow only 1 outstanding > shutdown operation at a time. Various maps > in ControllerContext are updated in this method and they are not concurrent. I think it should be fine, since those map modifications are guarded by the controller context lock. That said, adding a shutdown lock should make it safer so I added a brokerShutdownLock. I don't think anything in there can block indefinitely (i.e., if there is, then we would need a way to cancel the operation). > 21.2 My understanding is that this method returns the number of partitions > whose leader is still on this broker. If so, > shouldn't partitionsRemaining be updated after leaders are moved? Also, could > you add a comment to this method to > describe what it does and what the return value is? Yes - it is updated. i.e., invoking the method will give the most current value. That is why I needed to call it again at the end. 21.3 I think we need to hold the controller lock when calling maybeRemoveReplicaFromIsr. The controller context is not being accessed here - or am I missing something? 21.4 We should send stopReplica requests to all partitions on this broker, not just partitions who are followers, right? Correct - sorry, I don't fully recall the reason I had that filter in there. I think it might have been to avoid the issue described in 3.2 of Neha's comments - it was possible to end up with leaderAndIsr that had leader=0, isr = 1, which does not make sense. In any event, it is fixed now. Other comments are addressed. > Implement clean shutdown in 0.8 > ------------------------------- > > Key: KAFKA-340 > URL: https://issues.apache.org/jira/browse/KAFKA-340 > Project: Kafka > Issue Type: Sub-task > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Joel Koshy > Priority: Blocker > Labels: bugs > Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch, > KAFKA-340-v3.patch > > Original Estimate: 168h > Remaining Estimate: 168h > > If we are shutting down a broker when the ISR of a partition includes only > that broker, we could lose some messages that have been previously committed. > For clean shutdown, we need to guarantee that there is at least 1 other > broker in ISR after the broker is shut down. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira