[ https://issues.apache.org/jira/browse/KAFKA-14238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605926#comment-17605926 ]
Jose Armando Garcia Sancio commented on KAFKA-14238: ---------------------------------------------------- It looks like delete policy needs to be set to either delete or compact: {code:java} .define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy, ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc, KafkaConfig.LogCleanupPolicyProp) {code} Neither is correct for KRaft topics. KIP-630 talks about adding a third policy called snapshot: {code:java} The __cluster_metadata topic will have snapshot as the cleanup.policy. {code} https://cwiki.apache.org/confluence/display/KAFKA/KIP-630%3A+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ProposedChanges > KRaft replicas can delete segments not included in a snapshot > ------------------------------------------------------------- > > Key: KAFKA-14238 > URL: https://issues.apache.org/jira/browse/KAFKA-14238 > Project: Kafka > Issue Type: Bug > Components: core, kraft > Reporter: Jose Armando Garcia Sancio > Assignee: Jose Armando Garcia Sancio > Priority: Blocker > Fix For: 3.3.0 > > > We see this in the log > {code:java} > Deleting segment LogSegment(baseOffset=243864, size=9269150, > lastModifiedTime=1662486784182, largestRecordTimestamp=Some(1662486784160)) > due to retention time 604800000ms breach based on the largest record > timestamp in the segment {code} > This then cause {{KafkaRaftClient}} to throw an exception when sending > batches to the listener: > {code:java} > java.lang.IllegalStateException: Snapshot expected since next offset of > org.apache.kafka.controller.QuorumController$QuorumMetaLogListener@195461949 > is 0, log start offset is 369668 and high-watermark is 547379 > at > org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$4(KafkaRaftClient.java:312) > at java.base/java.util.Optional.orElseThrow(Optional.java:403) > at > org.apache.kafka.raft.KafkaRaftClient.lambda$updateListenersProgress$5(KafkaRaftClient.java:311) > at java.base/java.util.OptionalLong.ifPresent(OptionalLong.java:165) > at > org.apache.kafka.raft.KafkaRaftClient.updateListenersProgress(KafkaRaftClient.java:309){code} > The on disk state for the cluster metadata partition confirms this: > {code:java} > ls __cluster_metadata-0/ > 00000000000000369668.index > 00000000000000369668.log > 00000000000000369668.timeindex > 00000000000000503411.index > 00000000000000503411.log > 00000000000000503411.snapshot > 00000000000000503411.timeindex > 00000000000000548746.snapshot > leader-epoch-checkpoint > partition.metadata > quorum-state{code} > Noticed that there are no {{checkpoint}} files and the log doesn't have a > segment at base offset 0. > This is happening because the {{LogConfig}} used for KRaft sets the retention > policy to {{delete}} which causes the method {{deleteOldSegments}} to delete > old segments even if there are no snaspshot for it. For KRaft, Kafka should > only delete segment that breach the log start offset. > Log configuration for KRaft: > {code:java} > val props = new Properties() > props.put(LogConfig.MaxMessageBytesProp, > config.maxBatchSizeInBytes.toString) > props.put(LogConfig.SegmentBytesProp, Int.box(config.logSegmentBytes)) > props.put(LogConfig.SegmentMsProp, Long.box(config.logSegmentMillis)) > props.put(LogConfig.FileDeleteDelayMsProp, > Int.box(Defaults.FileDeleteDelayMs)) > LogConfig.validateValues(props) > val defaultLogConfig = LogConfig(props){code} > Segment deletion code: > {code:java} > def deleteOldSegments(): Int = { > if (config.delete) { > deleteLogStartOffsetBreachedSegments() + > deleteRetentionSizeBreachedSegments() + > deleteRetentionMsBreachedSegments() > } else { > deleteLogStartOffsetBreachedSegments() > } > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)