-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17460/#review33961
-----------------------------------------------------------


Already checked-in so this is really a follow-up review.

My overall take on the implementation is that it is (perhaps - because I'm
not 100 percent sure myself) complex mainly to handle corner cases which are
rare but I think recoverable. i.e., if we assume (and it may not be a valid
assumption) that topics will not/should never be deleted when there is live
traffic to a topic then just the call-backs and user-issued re-attempts on
failed deletion would be sufficient. We can talk more about that, but what
you have implemented is definitely more convenient and complete for the user.

Also, I encountered this problem while trying it out:
- bring up a broker
- ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc 
--sync
  < send a few messages >
- ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic abc
- I looked at state-change.log and made sure deletion completed
- ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic abc 
--sync
  < can never produce >

I see these on the server:

[2014-02-07 18:34:21,229] WARN [KafkaApi-0] Produce request with correlation id 
2 from client  on partition [abc,0] failed due to Partition [abc,0] doesn't 
exist on 0 (kafka.server.KafkaApis)
[2014-02-07 18:34:21,229] INFO [KafkaApi-0] Send the close connection response 
due to error handling produce request [clientId = , correlationId = 2, 
topicAndPartition = [abc,0]] with Ack=0 (kafka.server.KafkaApis)
[2014-02-07 18:34:26,755] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
[2014-02-07 18:34:26,756] WARN [KafkaApi-0] Produce request with correlation id 
9 from client  on partition [abc,1] failed due to Partition [abc,1] doesn't 
exist on 0 (kafka.server.KafkaApis)
[2014-02-07 18:34:26,756] INFO [KafkaApi-0] Send the close connection response 
due to error handling produce request [clientId = , correlationId = 9, 
topicAndPartition = [abc,1]] with Ack=0 (kafka.server.KafkaApis)

I had to bounce the broker to be able to produce again.

What did I do wrong? I can debug this later, but I'm going home soon :)



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63806>

    if -> until



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63896>

    I think foldLeft's of this form can be simplified and made clearer by using 
exists.
    
    e.g., here:
    .filter(r => r._2.exists((res, r) => 
!controllerContext.liveBrokerIds.contains(r)))



core/src/main/scala/kafka/controller/KafkaController.scala
<https://reviews.apache.org/r/17460/#comment63817>

    You mean topics halted (due to replicas on dead brokers) or ineligible (due 
to reassignment/preferred leader election) correct? Can you update the message?



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63866>

    logging can be updated - i.e., not necessary online -> offline.
    
    Should probably use (%s to %s).format (currState, targetState) here and 
elsewhere in handleStateChange.



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63868>

    yay..
    



core/src/main/scala/kafka/controller/PartitionStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63871>

    non-existent topics if any should be removed from the delete topics zk 
path. However, can this ever happen?
    



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/17460/#comment63812>

    Would prefer ReplicaDeletionIneligible:
    - since it isn't really a failure... i.e., we should eventually resume.
    - and I prefer "Ineligible" to "Halted" because I think it is weird to have 
replicas on dead brokers to come back up _have_ to go through a state called 
ReplicaDeletion_Failed_ if there was in fact no attempt at deletion.


- Joel Koshy


On Feb. 6, 2014, 7:37 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/17460/
> -----------------------------------------------------------
> 
> (Updated Feb. 6, 2014, 7:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-330
>     https://issues.apache.org/jira/browse/KAFKA-330
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Guozhang's follow up comments
> 
> 
> Simplified the logic in DeleteTopicThread and removed topicDeletionInProgress 
> set. Also removed the ReplicaDeletionStarted->OfflineReplica state change, 
> which means that successfully deleted replicas will not be retried unless 
> there is a controller failover
> 
> 
> Refactored tests per Jun and Guozhang's feedback
> 
> 
> Code refactor to address Jun's and Guozhang's review comments. Tests refactor 
> pending
> 
> 
> Joel's review suggestions - Changed the controllerLock instances to inLock 
> instead of synchronized, fixed some logging
> 
> 
> Removed init() API from TopicDeletionManager and added docs to 
> TopicDeletionManager to describe the lifecycle of topic deletion
> 
> 
> Updated docs for the new states. Removed the changes to log4j.properties
> 
> 
> Cleanup unused APIs, consolidated APIs of TopicDeletionManager, added docs, 
> unit tests working
> 
> 
> Moved deletion states into ReplicaStateMachine. All unit tests pass. Cleanup 
> of some APIs pending
> 
> 
> Changed controller to reference APIs in TopicDeletionManager. All unit tests 
> pass
> 
> 
> Introduced a TopicDeletionManager. KafkaController changes pending to use the 
> new TopicDeletionManager
> 
> 
> Addressed Guozhang's review comments
> 
> 
> Fixed docs in a few places
> 
> 
> Fixed the resume logic for partition reassignment to also include topics that 
> are queued up for deletion, since topic deletetion is halted until partition 
> reassignment can finish anyway. We need to let partition reassignment finish 
> (since it started before topic deletion) so that topic deletion can resume
> 
> 
> Organized imports
> 
> 
> Moved offline replica handling to controller failover
> 
> 
> Reading replica assignment from zookeeper instead of local cache
> 
> 
> Deleting unused APIs
> 
> 
> Reverting the change to the stop replica request protocol. Instead hacking 
> around with callbacks
> 
> 
> All functionality and all unit tests working
> 
> 
> Rebased with trunk after controller cleanup patch
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/AdminUtils.scala 
> a167756f0fd358574c8ccb42c5c96aaf13def4f5 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 842c11047cca0531fbc572fdb25523244ba2b626 
>   core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
> a80aa4924cfe9a4670591d03258dd82c428bc3af 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> a984878fbd8147b21211829a49de511fd1335421 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 820f0f57b00849a588a840358d07f3a4a31772d4 
>   core/src/main/scala/kafka/api/StopReplicaResponse.scala 
> d7e36308263aec2298e8adff8f22e18212e33fca 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 54dd7bd4e195cc2ff4637ac93e2f9b681e316024 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> ea8485b479155b479c575ebc89a4f73086c872cb 
>   core/src/main/scala/kafka/controller/KafkaController.scala 
> a0267ae2670e8d5f365e49ec0fa5db1f62b815bf 
>   core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
> fd9200f3bf941aab54df798bb5899eeb552ea3a3 
>   core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
> ac4262a403fc73edaecbddf55858703c640b11c0 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
> 483559aa64726c51320d18b64a1b48f8fb2905a0 
>   core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> d22dabdf4fc2346c5487b9fd94cadfbcab70040d 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> bd7940b80ca1f1fa4a671c49cf6be1aeec2bbd7e 
>   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
> 9dca55c9254948f1196ba17e1d3ebacdcd66be0c 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> b5719f89f79b9f2df4b6cb0f1c869b6eae9f8a7b 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> f9d10d385cee49a1e3be8c82e3ffa22ef87a8fd6 
>   core/src/main/scala/kafka/server/TopicConfigManager.scala 
> 42e98dd66f3269e6e3a8210934dabfd65df2dba9 
>   core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala 
> b189619bdc1b0d2bba8e8f88467fce014be96ccd 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> b42e52b8e5668383b287b2a86385df65e51b5108 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
> 59de1b469fece0b28e1d04dcd7b7015c12576abb 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> 8df0982a1e71e3f50a073c4ae181096d32914f3e 
>   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
> 9aea67b140e50c6f9f1868ce2e2aac9e7530fa77 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> c0475d07a778ff957ad266c08a7a81ea500debd2 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> 03e6266ffdad5891ec81df786bd094066b78b4c0 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 426b1a7bea1d83a64081f2c6b672c88c928713b7 
> 
> Diff: https://reviews.apache.org/r/17460/diff/
> 
> 
> Testing
> -------
> 
> Several integration tests added to test -
> 
> 1. Topic deletion when all replica brokers are alive
> 2. Halt and resume topic deletion after a follower replica is restarted
> 3. Halt and resume topic deletion after a controller failover
> 4. Request handling during topic deletion
> 5. Topic deletion and partition reassignment in parallel
> 6. Topic deletion and preferred replica election in parallel
> 7. Topic deletion and per topic config changes in parallel
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>

Reply via email to