----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17460/#review33699 -----------------------------------------------------------
Some high level comments. 1. While most of the replica states are now managed in ReplicaStateMachine, there are a few still managed in TopicDeletionManager through haltedTopicsForDeletion and topicDeletionInProgress. It probably would be clearer if those are managed in ReplicaStateMachine too. topicDeletionInProgress seems redundant since it equals to at least one of the replicas in ReplicaDeletionStarted state. We can just add a helper function in ReplicaStateMachine. We may need to add a new replica state in ReplicaStateManager to represent haltedTopicsForDeletion, but perhaps we can just reuse ReplicaDeletionFailed (and give it a more general name). 2. The actual deletion logic is split between TopicDeletionManager and DeleteTopicsThread, which makes it a bit hard to read. I was thinking that TopicDeletionManager only has methods for synchronization with other threads (through the condition) and all real work will be included in DeleteTopicsThread. Compared with partition reassignment, the logic in topic deletion is a bit harder to read. Part of the reason is that in partition reassignment, all the logic is linearly written in one method. In topic deletion, the logic is not linear since it's driven by various callbacks. Perhaps just by putting all the logic in one way and put them close to each other will help. Also, a bunch of helper methods in TopicDeletionManager like the following should really be in ReplicaStateMachine. isAtLeastOneReplicaInDeletionStartedState() replicasInState() AllReplicasForTopicDeleted() 3. When a topic is in the process of being deleted, we prevent future operations like partition re-assignment and leader rebalancing on that topic. However, if one of those operations is already started, we allow topic deletion to start, which will then get blocked by those operations. Another way to do that is if a topic is to be deleted, we don't start the deletion until other ongoing operations like partition re-assignment finish (once finished, they can't be started again since they will see topic being deleted). This way, the logic in DeleteTopicsThread will be somewhat simpler since we don't have to check if it can interfere with other operations. 4. In TopicDeletionManager, when doing wait/notify (and changing internal states), we expect the caller to hold the lock. All callers probably do hold the locks. However, I am wondering if it's better to get the lock anyway in TopicDeletionManager to make it more self contained. The locks are re-entrant. So locking it again won't hurt. 5. TopicDeletionManager: It seems that replicas in ReplicaDeletionStarted state remain in that state until the topic is successfully deleted. So, it seems that when calling startReplicaDeletion(), we can pass in replicas already in ReplicaDeletionSuccessful state. However, transitioning from ReplicaDeletionSuccessful to ReplicaDeletionStarted is not allowed. core/src/main/scala/kafka/controller/DeleteTopicsThread.scala <https://reviews.apache.org/r/17460/#comment63339> If the deletion of a replica is started and another failed broker is started immediately afterward, will we be missing the only chance of starting the deletion of the replica on the newly started broker (assuming there is a replica there not yet deleted)? core/src/main/scala/kafka/controller/DeleteTopicsThread.scala <https://reviews.apache.org/r/17460/#comment63340> Can just use topicsToBeDeleted. Could we just merge this block and the previous block in the same foreach? core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment63290> Why do we need to read from ZK, instead of from the cache? core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment63292> For replicas that are being deleted, should we move them to OnlineReplica state? core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment63332> Should we disallow adding partitions when a topic is being deleted? core/src/main/scala/kafka/controller/ReplicaStateMachine.scala <https://reviews.apache.org/r/17460/#comment63296> Could we add the new replica states in the comment? core/src/main/scala/kafka/controller/ReplicaStateMachine.scala <https://reviews.apache.org/r/17460/#comment63297> Some of the state transitions are missing, e.g., ReplicaDeletionFailed -> ReplicaDeletionStarted. core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63337> This method should be private and it would be better if it's placed close to onPartitionDeletion(). core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63338> It's still not very clear to me why delete topic thread needs to be notified. failReplicaDeletion() is called during the processing of a delete replica response, do we expect the delete topic thread to receive another response? core/src/main/scala/kafka/controller/TopicDeletionManager.scala <https://reviews.apache.org/r/17460/#comment63336> This comment seems to be outdated? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63356> These seems to be common for most tests. Could we share them? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63344> Those lines are common among all tests. Could we make a util function and reuse? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63345> Should we do those before the topic is deleted? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63346> We are stopping the leader replica, which may not be the rightmost broker in the broker list. core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63347> Do we need this comment? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63348> Should we shutdown the follower before topic deletion to make that the delete process is indeed paused? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63349> We are only testing produce request. core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63351> Could we just fold these two tests into the produceRequest test instead of testing from the beginning? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63353> Shutting down the controller may not change the leader. It seems that we should just shut down the preferred replica? core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala <https://reviews.apache.org/r/17460/#comment63354> Shutting down the controller may not change the leader. It seems that we should just shut down the preferred replica? - Jun Rao On Feb. 5, 2014, 5:31 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17460/ > ----------------------------------------------------------- > > (Updated Feb. 5, 2014, 5:31 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-330 > https://issues.apache.org/jira/browse/KAFKA-330 > > > Repository: kafka > > > Description > ------- > > Joel's review suggestions - Changed the controllerLock instances to inLock > instead of synchronized, fixed some logging > > > Removed init() API from TopicDeletionManager and added docs to > TopicDeletionManager to describe the lifecycle of topic deletion > > > Updated docs for the new states. Removed the changes to log4j.properties > > > Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, > unit tests working > > > Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup > of some APIs pending > > > Changed controller to reference APIs in TopicDeletionManager. All unit tests > pass > > > Introduced a TopicDeletionManager. KafkaController changes pending to use the > new TopicDeletionManager > > > Addressed Guozhang's review comments > > > Fixed docs in a few places > > > Fixed the resume logic for partition reassignment to also include topics that > are queued up for deletion, since topic deletetion is halted until partition > reassignment can finish anyway. We need to let partition reassignment finish > (since it started before topic deletion) so that topic deletion can resume > > > Organized imports > > > Moved offline replica handling to controller failover > > > Reading replica assignment from zookeeper instead of local cache > > > Deleting unused APIs > > > Reverting the change to the stop replica request protocol. Instead hacking > around with callbacks > > > All functionality and all unit tests working > > > Rebased with trunk after controller cleanup patch > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/admin/TopicCommand.scala > 842c11047cca0531fbc572fdb25523244ba2b626 > core/src/main/scala/kafka/api/ControlledShutdownResponse.scala > a80aa4924cfe9a4670591d03258dd82c428bc3af > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > a984878fbd8147b21211829a49de511fd1335421 > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 820f0f57b00849a588a840358d07f3a4a31772d4 > core/src/main/scala/kafka/api/StopReplicaResponse.scala > d7e36308263aec2298e8adff8f22e18212e33fca > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > ea8485b479155b479c575ebc89a4f73086c872cb > core/src/main/scala/kafka/controller/DeleteTopicsThread.scala PRE-CREATION > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/controller/PartitionStateMachine.scala > ac4262a403fc73edaecbddf55858703c640b11c0 > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala > 483559aa64726c51320d18b64a1b48f8fb2905a0 > core/src/main/scala/kafka/controller/TopicDeletionManager.scala > PRE-CREATION > core/src/main/scala/kafka/network/BlockingChannel.scala > d22dabdf4fc2346c5487b9fd94cadfbcab70040d > core/src/main/scala/kafka/server/KafkaApis.scala > bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 9dca55c9254948f1196ba17e1d3ebacdcd66be0c > core/src/main/scala/kafka/server/OffsetCheckpoint.scala > b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b > core/src/main/scala/kafka/server/ReplicaManager.scala > f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 > core/src/main/scala/kafka/server/TopicConfigManager.scala > 42e98dd66f3269e6e3a8210934dabfd65df2dba9 > core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala > b189619bdc1b0d2bba8e8f88467fce014be96ccd > core/src/main/scala/kafka/utils/ZkUtils.scala > b42e52b8e5668383b287b2a86385df65e51b5108 > core/src/test/scala/unit/kafka/admin/AdminTest.scala > 59de1b469fece0b28e1d04dcd7b7015c12576abb > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 8df0982a1e71e3f50a073c4ae181096d32914f3e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 03e6266ffdad5891ec81df786bd094066b78b4c0 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 426b1a7bea1d83a64081f2c6b672c88c928713b7 > > Diff: https://reviews.apache.org/r/17460/diff/ > > > Testing > ------- > > Several integration tests added to test - > > 1. Topic deletion when all replica brokers are alive > 2. Halt and resume topic deletion after a follower replica is restarted > 3. Halt and resume topic deletion after a controller failover > 4. Request handling during topic deletion > 5. Topic deletion and partition reassignment in parallel > 6. Topic deletion and preferred replica election in parallel > 7. Topic deletion and per topic config changes in parallel > > > Thanks, > > Neha Narkhede > >