> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > Some high level comments.
> > 
> > 1. While most of the replica states are now managed in ReplicaStateMachine, 
> > there are a few still managed in TopicDeletionManager through 
> > haltedTopicsForDeletion and topicDeletionInProgress. It probably would be 
> > clearer if those are managed in ReplicaStateMachine too. 
> > topicDeletionInProgress seems redundant since it equals to at least one of 
> > the replicas in ReplicaDeletionStarted state. We can just add a helper 
> > function in ReplicaStateMachine. We may need to add a new  replica state in 
> > ReplicaStateManager to represent haltedTopicsForDeletion, but perhaps we 
> > can just reuse ReplicaDeletionFailed (and give it a more general name).
> > 
> > 2. The actual deletion logic is split between TopicDeletionManager and 
> > DeleteTopicsThread, which makes it a bit hard to read. I was thinking that 
> > TopicDeletionManager only has methods for synchronization with other 
> > threads (through the condition) and all real work will be included in 
> > DeleteTopicsThread. Compared with partition reassignment, the logic in 
> > topic deletion is a bit harder to read. Part of the reason is that in 
> > partition reassignment, all the logic is linearly written in one method. In 
> > topic deletion, the logic is not linear since it's driven by various 
> > callbacks. Perhaps just by putting all the logic in one way and put them 
> > close to each other will help. Also, a bunch of helper methods in 
> > TopicDeletionManager like the following should really be in 
> > ReplicaStateMachine.
> > isAtLeastOneReplicaInDeletionStartedState()
> > replicasInState()
> > AllReplicasForTopicDeleted()
> > 
> > 3. When a topic is in the process of being deleted, we prevent future 
> > operations like partition re-assignment and leader rebalancing on that 
> > topic. However, if one of those operations is already started, we allow 
> > topic deletion to start, which will then get blocked by those operations. 
> > Another way to do that is if a topic is to be deleted, we don't start the 
> > deletion until other ongoing operations like partition re-assignment finish 
> > (once finished, they can't be started again since they will see topic being 
> > deleted). This way, the logic in DeleteTopicsThread will be somewhat 
> > simpler since we don't have to check if it can interfere with other 
> > operations.
> > 
> > 4. In TopicDeletionManager, when doing wait/notify (and changing internal 
> > states), we expect the caller to hold the lock. All callers probably do 
> > hold the locks. However, I am wondering if it's better to get the lock 
> > anyway in TopicDeletionManager to make it more self contained. The locks 
> > are re-entrant. So locking it again won't hurt.
> > 
> > 5. TopicDeletionManager: It seems that replicas in ReplicaDeletionStarted 
> > state remain in that state until the topic is successfully deleted. So, it 
> > seems that when calling startReplicaDeletion(), we can pass in replicas 
> > already in ReplicaDeletionSuccessful state. However, transitioning from 
> > ReplicaDeletionSuccessful to ReplicaDeletionStarted is not allowed.
> > 
> >

1. Topic states have nothing to do with replica state machine. 
ReplicaStateMachine manages the states for individual replicas and as such no 
logic from topic deletion should leak there

2. Moved DeleteTopicsThread to be an inner class of TopicDeletionManager. This 
reduces the public APIs of TopicDeletionManager. And the logic in partition 
reassignment is easier since we don't handle the hard problem of deleting data 
on dead replicas there. Moved some APIs to ReplicaStateMachine

3. You have a point. That logic is redundant in the DeleteTopicThread. Removed 
it. 

4. I considered that and then decided against it simply because the 
TopicDeletionManager is not self sufficient. All APIs are used in some context 
that only KafkaController knows about. If we acquire the lock inside 
TopicDeletionManager, then we are essentially allowing those APIs to be invoked 
all by themselves, which will not do the right thing today.

5. I think what you are saying is that the transition from 
ReplicaDeletionSuccessful to OfflineReplica (on retry) doesn't make sense. 
Logically, replica deletion should be retried only if the deletion failed. 
Since the controller knows about the deletion status, it can retry only for 
replicas that are not deleted yet. However, that obviously complicates the 
retry logic to filter only on failed replicas. 


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala, lines 42-43
> > <https://reviews.apache.org/r/17460/diff/8/?file=470129#file470129line42>
> >
> >     If the deletion of a replica is started and another failed broker is 
> > started immediately afterward, will we be missing the only chance of 
> > starting the deletion of the replica on the newly started broker (assuming 
> > there is a replica there not yet deleted)?

No, since deletion is halted when the broker goes down and resumed and retried 
when the broker comes up. Both broker startup and shutdown zk callbacks are 
serialized on the same lock. 


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala, lines 66-67
> > <https://reviews.apache.org/r/17460/diff/8/?file=470129#file470129line66>
> >
> >     Can just use topicsToBeDeleted. Could we just merge this block and the 
> > previous block in the same foreach?

Nope. The previous block processes all topics for which deletion is in progress 
and can mark some for deletion retry. These should be retried in the 2nd block.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, lines 84-85
> > <https://reviews.apache.org/r/17460/diff/8/?file=470130#file470130line84>
> >
> >     Why do we need to read from ZK, instead of from the cache?

I think this is no longer required after my last refactor


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, lines 384-385
> > <https://reviews.apache.org/r/17460/diff/8/?file=470130#file470130line384>
> >
> >     For replicas that are being deleted, should we move them to 
> > OnlineReplica state?

This will not work. On broker startup, it expects the full replica list from 
the controller and uses that to write the various checkpoint files. If we miss 
a partition there before it is deleted, it's entries from all those checkpoint 
files will disappear causing unexpected behavior until deletion is complete.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/KafkaController.scala, lines 466-472
> > <https://reviews.apache.org/r/17460/diff/8/?file=470130#file470130line466>
> >
> >     Should we disallow adding partitions when a topic is being deleted?

This is already done. Look at AddPartitionsListener


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala, lines 
> > 116-118
> > <https://reviews.apache.org/r/17460/diff/8/?file=470133#file470133line116>
> >
> >     Some of the state transitions are missing, e.g., ReplicaDeletionFailed 
> > -> ReplicaDeletionStarted.

This state change shouldn't exist. Removed it.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/controller/TopicDeletionManager.scala, lines 
> > 169-170
> > <https://reviews.apache.org/r/17460/diff/8/?file=470134#file470134line169>
> >
> >     It's still not very clear to me why delete topic thread needs to be 
> > notified. failReplicaDeletion() is called during the processing of a delete 
> > replica response, do we expect the delete topic thread to receive another 
> > response?

It has to be triggered to retry deletion. Otherwise topic deletion may never 
complete.


> On Feb. 5, 2014, 10:34 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala, lines 65-66
> > <https://reviews.apache.org/r/17460/diff/8/?file=470144#file470144line65>
> >
> >     We are stopping the leader replica, which may not be the rightmost 
> > broker in the broker list.

We are not stopping the leader replica in 
testResumeDeleteTopicWithRecoveredFollower()


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33699
-----------------------------------------------------------


On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 5, 2014, 5:31 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock 
> instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to 
> TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, 
> unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup 
> of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests 
> pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the 
> new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that 
> are queued up for deletion, since topic deletetion is halted until partition 
> reassignment can finish anyway. We need to let partition reassignment finish 
> (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking 
> around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to