[ 
https://issues.apache.org/jira/browse/KAFKA-13849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17526409#comment-17526409
 ] 

RivenSun commented on KAFKA-13849:
----------------------------------

Hi [~guozhang]  [~hachikuji]  , [~showuon]
Could you give some suggestions for this issue?
Thanks.

> All topics whose cleanup policy  contains `compact` are not executing 
> deleteLogStartOffsetBreachedSegments as expected
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13849
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13849
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.0.1
>            Reporter: RivenSun
>            Priority: Major
>
> This should be a *regression* bug.
> https://issues.apache.org/jira/browse/KAFKA-7400
> Currently the logic in LogManager.cleanupLogs() does not follow the above 
> JIRA.
> Analyze the source code of LogManager.cleanupLogs(): 
> 1. When getting deletableLogs, the logs with `compact` enabled are filtered 
> out in the two branches of if-else below.
> {code:java}
> // clean current logs.
> val deletableLogs = {
>   if (cleaner != null) {
>     // prevent cleaner from working on same partitions when changing cleanup 
> policy
>     cleaner.pauseCleaningForNonCompactedPartitions()
>   } else {
>     currentLogs.filter {
>       case (_, log) => !log.config.compact
>     }
>   }
> }{code}
> 2. But in the subsequent UnifiedLog.deleteOldSegments method, from the 
> comments and code logic of this method, the topic of enable `compact` also 
> wants to execute the deleteLogStartOffsetBreachedSegments method.
> {code:java}
> /**
>  * If topic deletion is enabled, delete any local log segments that have 
> either expired due to time based retention
>  * or because the log size is > retentionSize.
>  *
>  * Whether or not deletion is enabled, delete any local log segments that are 
> before the log start offset
>  */
> def deleteOldSegments(): Int = {
>   if (config.delete) {
>     deleteLogStartOffsetBreachedSegments() +
>       deleteRetentionSizeBreachedSegments() +
>       deleteRetentionMsBreachedSegments()
>   } else {
>     deleteLogStartOffsetBreachedSegments()
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to