----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/17460/#review33961 -----------------------------------------------------------
Already checked-in so this is really a follow-up review. My overall take on the implementation is that it is (perhaps - because I'm not 100 percent sure myself) complex mainly to handle corner cases which are rare but I think recoverable. i.e., if we assume (and it may not be a valid assumption) that topics will not/should never be deleted when there is live traffic to a topic then just the call-backs and user-issued re-attempts on failed deletion would be sufficient. We can talk more about that, but what you have implemented is definitely more convenient and complete for the user. Also, I encountered this problem while trying it out: - bring up a broker - ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc --sync < send a few messages > - ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic abc - I looked at state-change.log and made sure deletion completed - ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc --sync < can never produce > I see these on the server: [2014-02-07 18:34:21,229] WARN [KafkaApi-0] Produce request with correlation id 2 from client on partition [abc,0] failed due to Partition [abc,0] doesn't exist on 0 (kafka.server.KafkaApis) [2014-02-07 18:34:21,229] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 2, topicAndPartition = [abc,0]] with Ack=0 (kafka.server.KafkaApis) [2014-02-07 18:34:26,755] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) [2014-02-07 18:34:26,756] WARN [KafkaApi-0] Produce request with correlation id 9 from client on partition [abc,1] failed due to Partition [abc,1] doesn't exist on 0 (kafka.server.KafkaApis) [2014-02-07 18:34:26,756] INFO [KafkaApi-0] Send the close connection response due to error handling produce request [clientId = , correlationId = 9, topicAndPartition = [abc,1]] with Ack=0 (kafka.server.KafkaApis) I had to bounce the broker to be able to produce again. What did I do wrong? I can debug this later, but I'm going home soon :) core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment63806> if -> until core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment63896> I think foldLeft's of this form can be simplified and made clearer by using exists. e.g., here: .filter(r => r._2.exists((res, r) => !controllerContext.liveBrokerIds.contains(r))) core/src/main/scala/kafka/controller/KafkaController.scala <https://reviews.apache.org/r/17460/#comment63817> You mean topics halted (due to replicas on dead brokers) or ineligible (due to reassignment/preferred leader election) correct? Can you update the message? core/src/main/scala/kafka/controller/PartitionStateMachine.scala <https://reviews.apache.org/r/17460/#comment63866> logging can be updated - i.e., not necessary online -> offline. Should probably use (%s to %s).format (currState, targetState) here and elsewhere in handleStateChange. core/src/main/scala/kafka/controller/PartitionStateMachine.scala <https://reviews.apache.org/r/17460/#comment63868> yay.. core/src/main/scala/kafka/controller/PartitionStateMachine.scala <https://reviews.apache.org/r/17460/#comment63871> non-existent topics if any should be removed from the delete topics zk path. However, can this ever happen? core/src/main/scala/kafka/controller/ReplicaStateMachine.scala <https://reviews.apache.org/r/17460/#comment63812> Would prefer ReplicaDeletionIneligible: - since it isn't really a failure... i.e., we should eventually resume. - and I prefer "Ineligible" to "Halted" because I think it is weird to have replicas on dead brokers to come back up _have_ to go through a state called ReplicaDeletion_Failed_ if there was in fact no attempt at deletion. - Joel Koshy On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/17460/ > ----------------------------------------------------------- > > (Updated Feb. 6, 2014, 7:37 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-330 > https://issues.apache.org/jira/browse/KAFKA-330 > > > Repository: kafka > > > Description > ------- > > Addressed Guozhang's follow up comments > > > Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress > set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, > which means that successfully deleted replicas will not be retried unless > there is a controller failover > > > Refactored tests per Jun and Guozhang's feedback > > > Code refactor to address Jun's and Guozhang's review comments. Tests refactor > pending > > > Joel's review suggestions - Changed the controllerLock instances to inLock > instead of synchronized, fixed some logging > > > Removed init() API from TopicDeletionManager and added docs to > TopicDeletionManager to describe the lifecycle of topic deletion > > > Updated docs for the new states. Removed the changes to log4j.properties > > > Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, > unit tests working > > > Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup > of some APIs pending > > > Changed controller to reference APIs in TopicDeletionManager. All unit tests > pass > > > Introduced a TopicDeletionManager. KafkaController changes pending to use the > new TopicDeletionManager > > > Addressed Guozhang's review comments > > > Fixed docs in a few places > > > Fixed the resume logic for partition reassignment to also include topics that > are queued up for deletion, since topic deletetion is halted until partition > reassignment can finish anyway. We need to let partition reassignment finish > (since it started before topic deletion) so that topic deletion can resume > > > Organized imports > > > Moved offline replica handling to controller failover > > > Reading replica assignment from zookeeper instead of local cache > > > Deleting unused APIs > > > Reverting the change to the stop replica request protocol. Instead hacking > around with callbacks > > > All functionality and all unit tests working > > > Rebased with trunk after controller cleanup patch > > > Diffs > ----- > > core/src/main/scala/kafka/admin/AdminUtils.scala > a167756f0fd358574c8ccb42c5c96aaf13def4f5 > core/src/main/scala/kafka/admin/TopicCommand.scala > 842c11047cca0531fbc572fdb25523244ba2b626 > core/src/main/scala/kafka/api/ControlledShutdownResponse.scala > a80aa4924cfe9a4670591d03258dd82c428bc3af > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > a984878fbd8147b21211829a49de511fd1335421 > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 820f0f57b00849a588a840358d07f3a4a31772d4 > core/src/main/scala/kafka/api/StopReplicaResponse.scala > d7e36308263aec2298e8adff8f22e18212e33fca > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > ea8485b479155b479c575ebc89a4f73086c872cb > core/src/main/scala/kafka/controller/KafkaController.scala > a0267ae2670e8d5f365e49ec0fa5db1f62b815bf > core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala > fd9200f3bf941aab54df798bb5899eeb552ea3a3 > core/src/main/scala/kafka/controller/PartitionStateMachine.scala > ac4262a403fc73edaecbddf55858703c640b11c0 > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala > 483559aa64726c51320d18b64a1b48f8fb2905a0 > core/src/main/scala/kafka/controller/TopicDeletionManager.scala > PRE-CREATION > core/src/main/scala/kafka/network/BlockingChannel.scala > d22dabdf4fc2346c5487b9fd94cadfbcab70040d > core/src/main/scala/kafka/server/KafkaApis.scala > bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e > core/src/main/scala/kafka/server/KafkaHealthcheck.scala > 9dca55c9254948f1196ba17e1d3ebacdcd66be0c > core/src/main/scala/kafka/server/OffsetCheckpoint.scala > b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b > core/src/main/scala/kafka/server/ReplicaManager.scala > f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 > core/src/main/scala/kafka/server/TopicConfigManager.scala > 42e98dd66f3269e6e3a8210934dabfd65df2dba9 > core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala > b189619bdc1b0d2bba8e8f88467fce014be96ccd > core/src/main/scala/kafka/utils/ZkUtils.scala > b42e52b8e5668383b287b2a86385df65e51b5108 > core/src/test/scala/unit/kafka/admin/AdminTest.scala > 59de1b469fece0b28e1d04dcd7b7015c12576abb > core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 8df0982a1e71e3f50a073c4ae181096d32914f3e > core/src/test/scala/unit/kafka/server/LogOffsetTest.scala > 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala > c0475d07a778ff957ad266c08a7a81ea500debd2 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala > 03e6266ffdad5891ec81df786bd094066b78b4c0 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 426b1a7bea1d83a64081f2c6b672c88c928713b7 > > Diff: https://reviews.apache.org/r/17460/diff/ > > > Testing > ------- > > Several integration tests added to test - > > 1. Topic deletion when all replica brokers are alive > 2. Halt and resume topic deletion after a follower replica is restarted > 3. Halt and resume topic deletion after a controller failover > 4. Request handling during topic deletion > 5. Topic deletion and partition reassignment in parallel > 6. Topic deletion and preferred replica election in parallel > 7. Topic deletion and per topic config changes in parallel > > > Thanks, > > Neha Narkhede > >