-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33699
-----------------------------------------------------------


Some high level comments.

1. While most of the replica states are now managed in ReplicaStateMachine, 
there are a few still managed in TopicDeletionManager through 
haltedTopicsForDeletion and topicDeletionInProgress. It probably would be 
clearer if those are managed in ReplicaStateMachine too. 
topicDeletionInProgress seems redundant since it equals to at least one of the 
replicas in ReplicaDeletionStarted state. We can just add a helper function in 
ReplicaStateMachine. We may need to add a new  replica state in 
ReplicaStateManager to represent haltedTopicsForDeletion, but perhaps we can 
just reuse ReplicaDeletionFailed (and give it a more general name).

2. The actual deletion logic is split between TopicDeletionManager and 
DeleteTopicsThread, which makes it a bit hard to read. I was thinking that 
TopicDeletionManager only has methods for synchronization with other threads 
(through the condition) and all real work will be included in 
DeleteTopicsThread. Compared with partition reassignment, the logic in topic 
deletion is a bit harder to read. Part of the reason is that in partition 
reassignment, all the logic is linearly written in one method. In topic 
deletion, the logic is not linear since it's driven by various callbacks. 
Perhaps just by putting all the logic in one way and put them close to each 
other will help. Also, a bunch of helper methods in TopicDeletionManager like 
the following should really be in ReplicaStateMachine.
isAtLeastOneReplicaInDeletionStartedState()
replicasInState()
AllReplicasForTopicDeleted()

3. When a topic is in the process of being deleted, we prevent future 
operations like partition re-assignment and leader rebalancing on that topic. 
However, if one of those operations is already started, we allow topic deletion 
to start, which will then get blocked by those operations. Another way to do 
that is if a topic is to be deleted, we don't start the deletion until other 
ongoing operations like partition re-assignment finish (once finished, they 
can't be started again since they will see topic being deleted). This way, the 
logic in DeleteTopicsThread will be somewhat simpler since we don't have to 
check if it can interfere with other operations.

4. In TopicDeletionManager, when doing wait/notify (and changing internal 
states), we expect the caller to hold the lock. All callers probably do hold 
the locks. However, I am wondering if it's better to get the lock anyway in 
TopicDeletionManager to make it more self contained. The locks are re-entrant. 
So locking it again won't hurt.

5. TopicDeletionManager: It seems that replicas in ReplicaDeletionStarted state 
remain in that state until the topic is successfully deleted. So, it seems that 
when calling startReplicaDeletion(), we can pass in replicas already in 
ReplicaDeletionSuccessful state. However, transitioning from 
ReplicaDeletionSuccessful to ReplicaDeletionStarted is not allowed.




core/src/main/scala/kafka/controller/DeleteTopicsThread.scala
<https://reviews.apache.org/r/17460/#comment63339>

    If the deletion of a replica is started and another failed broker is 
started immediately afterward, will we be missing the only chance of starting 
the deletion of the replica on the newly started broker (assuming there is a 
replica there not yet deleted)?



core/src/main/scala/kafka/controller/DeleteTopicsThread.scala
<https://reviews.apache.org/r/17460/#comment63340>

    Can just use topicsToBeDeleted. Could we just merge this block and the 
previous block in the same foreach?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63290>

    Why do we need to read from ZK, instead of from the cache?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63292>

    For replicas that are being deleted, should we move them to OnlineReplica 
state?



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63332>

    Should we disallow adding partitions when a topic is being deleted?



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63296>

    Could we add the new replica states in the comment?



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63297>

    Some of the state transitions are missing, e.g., ReplicaDeletionFailed -> 
ReplicaDeletionStarted.



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63337>

    This method should be private and it would be better if it's placed close 
to onPartitionDeletion().



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63338>

    It's still not very clear to me why delete topic thread needs to be 
notified. failReplicaDeletion() is called during the processing of a delete 
replica response, do we expect the delete topic thread to receive another 
response?



core/src/main/scala/kafka/controller/TopicDeletionManager.scala
<https://reviews.apache.org/r/17460/#comment63336>

    This comment seems to be outdated?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63356>

    These seems to be common for most tests. Could we share them?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63344>

    Those lines are common among all tests. Could we make a util function and 
reuse?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63345>

    Should we do those before the topic is deleted?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63346>

    We are stopping the leader replica, which may not be the rightmost broker 
in the broker list.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63347>

    Do we need this comment?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63348>

    Should we shutdown the follower before topic deletion to make that the 
delete process is indeed paused?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63349>

    We are only testing produce request.



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63351>

    Could we just fold these two tests into the produceRequest test instead of 
testing from the beginning?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63353>

    Shutting down the controller may not change the leader. It seems that we 
should just shut down the preferred replica?



core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
<https://reviews.apache.org/r/17460/#comment63354>

    Shutting down the controller may not change the leader. It seems that we 
should just shut down the preferred replica?


- Jun Rao


On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 5, 2014, 5:31 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock 
> instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to 
> TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, 
> unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup 
> of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests 
> pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the 
> new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that 
> are queued up for deletion, since topic deletetion is halted until partition 
> reassignment can finish anyway. We need to let partition reassignment finish 
> (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking 
> around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to