[ 
https://issues.apache.org/jira/browse/KAFKA-14238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605926#comment-17605926
 ] 

Jose Armando Garcia Sancio commented on KAFKA-14238:
----------------------------------------------------

It looks like delete policy needs to be set to either delete or compact:
{code:java}
        .define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy, 
ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc,
          KafkaConfig.LogCleanupPolicyProp)
{code}
Neither is correct for KRaft topics. KIP-630 talks about adding a third policy 
called snapshot:
{code:java}
The __cluster_metadata topic will have snapshot as the cleanup.policy. {code}
https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ProposedChanges

> KRaft replicas can delete segments not included in a snapshot
> -------------------------------------------------------------
>
>                 Key: KAFKA-14238
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14238
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, kraft
>            Reporter: Jose Armando Garcia Sancio
>            Assignee: Jose Armando Garcia Sancio
>            Priority: Blocker
>             Fix For: 3.3.0
>
>
> We see this in the log
> {code:java}
> Deleting segment LogSegment(baseOffset=243864, size=9269150, 
> lastModifiedTime=1662486784182, largestRecordTimestamp=Some(1662486784160)) 
> due to retention time 604800000ms breach based on the largest record 
> timestamp in the segment {code}
> This then cause {{KafkaRaftClient}} to throw an exception when sending 
> batches to the listener:
> {code:java}
>  java.lang.IllegalStateException: Snapshot expected since next offset of 
> org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@195461949 
> is 0, log start offset is 369668 and high-watermark is 547379
>       at 
> org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$4(KafkaRaftClient.java:312)
>       at java.base/java.util.Optional.orElseThrow(Optional.java:403)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$5(KafkaRaftClient.java:311)
>       at java.base/java.util.OptionalLong.ifPresent(OptionalLong.java:165)
>       at 
> org.apache.kafka.raft.KafkaRaftClient.updateListenersProgress(KafkaRaftClient.java:309){code}
> The on disk state for the cluster metadata partition confirms this:
> {code:java}
>  ls __cluster_metadata-0/
> 00000000000000369668.index
> 00000000000000369668.log
> 00000000000000369668.timeindex
> 00000000000000503411.index
> 00000000000000503411.log
> 00000000000000503411.snapshot
> 00000000000000503411.timeindex
> 00000000000000548746.snapshot
> leader-epoch-checkpoint
> partition.metadata
> quorum-state{code}
> Noticed that there are no {{checkpoint}} files and the log doesn't have a 
> segment at base offset 0.
> This is happening because the {{LogConfig}} used for KRaft sets the retention 
> policy to {{delete}} which causes the method {{deleteOldSegments}} to delete 
> old segments even if there are no snaspshot for it. For KRaft, Kafka should 
> only delete segment that breach the log start offset.
> Log configuration for KRaft:
> {code:java}
>       val props = new Properties()
>       props.put(LogConfig.MaxMessageBytesProp, 
> config.maxBatchSizeInBytes.toString)
>       props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes))
>       props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis))
>       props.put(LogConfig.FileDeleteDelayMsProp, 
> Int.box(Defaults.FileDeleteDelayMs))
>       LogConfig.validateValues(props)
>       val defaultLogConfig = LogConfig(props){code}
> Segment deletion code:
> {code:java}
>      def deleteOldSegments(): Int = {
>       if (config.delete) {
>         deleteLogStartOffsetBreachedSegments() +
>           deleteRetentionSizeBreachedSegments() +
>           deleteRetentionMsBreachedSegments()
>       } else {
>         deleteLogStartOffsetBreachedSegments()
>       }
>     }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to