[ 
https://issues.apache.org/jira/browse/KAFKA-340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-340:
-----------------------------

    Attachment: KAFKA-340-v2.patch

Responses inlined. The other comments are addressed in the patch.

Neha's comments:

> 1. KafkaController
> 1.1 It seems like we might need to move the filter to the getter instead of 
> the setter for liveBrokerIds. This is
>     because if the shutting down broker list changes after setting the value 
> for live brokers and reading it, the list
>     of live brokers might either miss out alive brokers or include dead 
> brokers.

Good catch - fixed this. I left the filter on the setter as well, although it 
isn't strictly needed. Also added a
liveOrShuttingDownBrokerIds method as leaderAndIsr requests would otherwise 
only be sent to live brokers.

> 1.2 Since both alive and shutting down broker lists are set, you can use the 
> set difference notation instead of
>     individually reading and filtering out unwanted brokers. Not sure how 
> sets are implemented under the covers in
>     Scala, but my guess is that the set difference method is more efficient.

I would have, but the sets are of different underlying types (Broker vs. 
broker-id (int))

> 1.3 Can we rename replicatedPartitionsBrokerLeads to 
> replicatePartitionsThisBrokerLeads ?

The main reason I didn't name it with "this" is that it could mean either the 
shutting-down broker or the controller
where this code executes. Ideally it should be called say, 
replicatedPartitionsShuttingDownBrokerLeads which seemed
too verbose. Would partitionsToMove be better?

> 1.4 The leader movement sets the isr correctly for the partition by removing 
> the current leader as well as any broker
>     that is shutting down from the isr. Then, the replica is marked as 
> offline which tries to remove the broker from
>     the isr again. Let's ignore the fact that the OfflineReplica state change 
> reads the isr from zookeeper since we are
>     going to address that in a separate JIRA already and I don't think we 
> should optimize too early here. What we can
>     do is add a check to OfflineReplica state change that doesn't do the 
> zookeeper write and rpc request if the
>     replica is already not in the isr.

That sounds good - made that change (in a helper function since I needed 
similar logic in the shutdown procedure).

> 1.6 If the shutdownBroker API throws a runtime exception, what value is 
> returned to the admin command ?

Throws an exception - i.e. no return.

> 1.7 In shutdownBroker, the controllerLock is acquired and released atleast 
> four times. Probably you wanted to release
>     the lock between the move for each partition in order to not block on an 
> unimportant operation like broker
>     shutdown. It is usually wise to release a lock for blocking operations to 
> avoid deadlock/starvation. So, I didn't
>     get why the lock is released for sending the stop replica request. It 
> seems like the lock can be acquired up until
>     the list of partitions to be moved is computed ? This acquire/release 
> lock multiple times approach is always
>     tricky, but right now after reading the code once, I'm not sure if this 
> particular API would run into any race
>     condition or not. So far, my intuition is that the only bad thing that 
> can happen is we miss out on moving some
>     leaders from the broker, which is not a critical operation anyway.

I could refactor this a bit, but not sure it saves much. It is definitely 
subtle - i.e., potential interference with
other operations (e.g., partition reassignment) when the lock is released. I 
did a pass of all usages of
liveBrokerIds, liveBrokers and I think it's fine. If a partition is reassigned 
before the leadership moves that is
also fine as there is a check (if (currLeader == id)) before attempting moving 
leadership.

> 2. StopReplicaRequest
> 2.1 I think it is a little awkward to overload the stop replica request with 
> no partitions to mean that it is not
>     supposed to shutdown the replica fetcher thread. In the future, if we 
> find a use case where we need to do this
>     only for some partitions, we might need to change the stop replica 
> request format. What do people think about
>     adding a flag to the stop replica request that tells the broker if it 
> should just shutdown the replica fetcher
>     thread or delete the replica as well ?

Not very sure if this is similar to Jun's comment. Instead of explicitly 
specifying all the partitions, not
specifying anything in the partition set meant "all partitions" (although I 
removed this in v2). I agree that it is
helpful to introduce a flag. Also see response to Jun's comment. Also, fixed a 
bug in addStopReplicaRequestForBrokers
in the controller channel manager - basically the immutable set value wasn't 
getting updated.

Jun's comments

> 10. KafkaController.shutdownBroker:
> 10.1 Just stopping the follower in the broker to be shut down doesn't make it 
> faster to fall out of leader's isr.
>      This is because the leader will still need to wait for the timeout 
> before dropping the broker out of the isr.
>      The controller will need to shrink isr and send a leaderAndIsr request 
> to each of the leaders.
>      If we do this, there is probably no need for the wildcard stopReplica 
> request.

I think it helps to some degree - i.e., otherwise the replica on the shutting 
down broker will remain in ISR for the
partitions it is following. Pre-emptively stopping the fetchers will encourage 
it to fall out (yes after the timeout)
but the shutdown operation itself could take some time.

I like your suggestion of actively shrinking the ISR - made that change. I 
think we still need to do a
stopReplicaFetcher in order to stop the fetchers (or it will re-enter ISR soon 
enough). Also, we need to send it a
leaderAndIsr request for the partitions being moved (to tell it that it is no 
longer the leader). That would cause it
to start up replica fetchers to the new leaders. So what I ended up doing is: 
after all the partition movements are
complete, send a stop replica request, and update leaderAndIsr for all 
partitions present on the shutting-down broker
that removes it from isr.

Somehow, I still think it is better to use StopReplica to force a "full 
shutdown" of the replica fetcher on the
shutting down broker. Right now it is possible for a fetch request to be sent 
from the broker to the (new) leader
at the same time the leaderAndIsr is shrunk to exclude the broker. The leader 
could then re-expand the ISR.

> 11.2 In the leader election logic, there is no need to make sure that the new 
> leader is not the current leader. The
>      customized controllerContext.liveBrokerIds should have filtered out the 
> current leader (which is shut down by the
>      jmx operation).

Actually, I needed to change this to include shutting down brokers (since the 
leaderAndIsr request also needs to
be sent to the shutting down broker) so I still need to do check.

> 12. StopReplicaRequest: Agree with Neha here. We need to add a flag to 
> distinguish between the case that we just want
>     to stop the replica and the case that we want to stop the replica and 
> delete its data. The latter will be used
>     in reassigning partitions (and delete topics in the future).

Done. I went with a "global" delete partitions flag (as opposed to 
per-partition) in the request format. We can
discuss whether we should make this a per-partition flag.

                
> Implement clean shutdown in 0.8
> -------------------------------
>
>                 Key: KAFKA-340
>                 URL: https://issues.apache.org/jira/browse/KAFKA-340
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>            Priority: Blocker
>              Labels: bugs
>         Attachments: KAFKA-340-v1.patch, KAFKA-340-v2.patch
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> If we are shutting down a broker when the ISR of a partition includes only 
> that broker, we could lose some messages that have been previously committed. 
> For clean shutdown, we need to guarantee that there is at least 1 other 
> broker in ISR after the broker is shut down.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to