[ https://issues.apache.org/jira/browse/KAFKA-19458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gaurav Narula updated KAFKA-19458: ---------------------------------- Description: Successive {{AlterReplicaLogDirsRequest}} to change log directory of a given topic partition may cause log segment leak. Consider the following scenario: 1. A request tries to change the logdir for topic partition {{tp}} from {{d1}} to {{d2}}. 2. The handler invokes {{replicaManager#alterReplicaLogDirs}} 3. A future replica is created as a result of the above method invoking {{partition#maybeCreateFutureReplica}} and cleaning for {{tp}} is disabled as {{logManager#abortAndPauseCleaning}} is invoked. 4. Now, *before* the previous request is completed, let's assume another request to change the logdir from {{d2}} to {{d3}} 5. This time, {{replicaManager#alterReplicaLogDirs}}'s call to {{partition#futureReplicaDirChanged}} will return {{true}} and we remove the fetcher and unset the reference to {{futureLog}} in {{Partition}}. 6. We then re-create a future by invoking {{partition#maybeCreateFutureReplica}} with {{d3}} and pause log cleaning for {{tp}} *again*. 7. {{partition#maybeReplaceCurrentWithFutureReplica}} is invoked when the future has caught up and the callback in it swaps the future log for the local log and resumes cleaning by invoking {{LogManager#resumeCleaning}}. 8. The above decrements the count in {{LogCleaningState.logCleaningPaused}} from {{2}} to {{1}}. Cleanup for {{tp}} is therefore paused until a broker restart was: Successive {{AlterReplicaLogDirsRequest}} to change log directory of a given topic partition may cause log segment leak. Consider the following scenario: 1. A request tries to change the logdir for topic partition {{tp}} from {{d1}} to {{d2}}. 2. The handler invokes {{replicaManager#alterReplicaLogDirs}} 3. A future replica is created as a result of the above method invoking {{partition#maybeCreateFutureReplica}} and cleaning for {{tp}} is disabled as {{logManager#abortAndPauseCleaning}} is invoked. 4. Now, *before* the previous request is completed, let's assume another request to change the logdir from {{d2}} to {{d3}} 5. This time, {{replicaManager#alterReplicaLogDirs}}'s call to {{partition#futureReplicaDirChanged}} will return {{true}} and we remove the fetcher and unset the reference to {{futureLog}} in {{Partition}}. 6. We then re-create a future by invoking {{partition#maybeCreateFutureReplica}} with {{d3}} and pause log cleaning for {{tp}} *again*. 7. {{partition#maybeReplaceCurrentWithFutureReplica}} is invoked when the future has caught up and the callback in it swaps the future log for the local log and resumes cleaning by invoking {{LogManager#resumeCleaning}}. 8. The above decrements the count in {{LogCleaningState.logCleaningPaused}} from {{2}} to {{1}}. Cleanup for this tp is therefore paused until a broker restart > Successive AlterReplicaLogDirsRequest on a topic partition may leak log > segments > -------------------------------------------------------------------------------- > > Key: KAFKA-19458 > URL: https://issues.apache.org/jira/browse/KAFKA-19458 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.9.1, 4.0.0, 4.1.0 > Reporter: Gaurav Narula > Assignee: Gaurav Narula > Priority: Major > > Successive {{AlterReplicaLogDirsRequest}} to change log directory of a given > topic partition may cause log segment leak. Consider the following scenario: > 1. A request tries to change the logdir for topic partition {{tp}} from > {{d1}} to {{d2}}. > 2. The handler invokes {{replicaManager#alterReplicaLogDirs}} > 3. A future replica is created as a result of the above method invoking > {{partition#maybeCreateFutureReplica}} and cleaning for {{tp}} is disabled as > {{logManager#abortAndPauseCleaning}} is invoked. > 4. Now, *before* the previous request is completed, let's assume another > request to change the logdir from {{d2}} to {{d3}} > 5. This time, {{replicaManager#alterReplicaLogDirs}}'s call to > {{partition#futureReplicaDirChanged}} will return {{true}} and we remove the > fetcher and unset the reference to {{futureLog}} in {{Partition}}. > 6. We then re-create a future by invoking > {{partition#maybeCreateFutureReplica}} with {{d3}} and pause log cleaning for > {{tp}} *again*. > 7. {{partition#maybeReplaceCurrentWithFutureReplica}} is invoked when the > future has caught up and the callback in it swaps the future log for the > local log and resumes cleaning by invoking {{LogManager#resumeCleaning}}. > 8. The above decrements the count in {{LogCleaningState.logCleaningPaused}} > from {{2}} to {{1}}. Cleanup for {{tp}} is therefore paused until a broker > restart -- This message was sent by Atlassian Jira (v8.20.10#820010)