[
https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
José Armando García Sancio updated KAFKA-18723:
-----------------------------------------------
Fix Version/s: 4.0.1
> KRaft must handle corrupted records in the fetch response
> ---------------------------------------------------------
>
> Key: KAFKA-18723
> URL: https://issues.apache.org/jira/browse/KAFKA-18723
> Project: Kafka
> Issue Type: Bug
> Components: kraft
> Reporter: José Armando García Sancio
> Assignee: José Armando García Sancio
> Priority: Major
> Fix For: 3.9.1, 3.8.2, 3.7.3, 4.0.1
>
>
> It is possible for a KRaft replica to send corrupted records to the fetching
> replicas in the FETCH response. This is because there is a race between when
> the FETCH response gets generated by the KRaft IO thread and when the network
> thread, or linux kernel, reads the byte position in the log segment.
> This race can generated corrupted records if the KRaft replica performed a
> truncation after the FETCH response was created but before the network thread
> read the bytes from the log segment.
> I have seen the following errors:
> {code:java}
> [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread]
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault -
> Encountered fatal fault: Unexpected error in raft IO thread
> org.apache.kafka.common.KafkaException: Append failed unexpectedly
> at
> kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
> at
> kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
> at
> org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
> at
> org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
> at
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
> at
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
> at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
> at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
> at
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
> and
> {code:java}
> [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread]
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault -
> Encountered fatal fault: Unexpected error in raft IO thread"
> org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less
> than the minimum record overhead (14)"{code}
> This race also exists with Kafka's ISR based topic partition. In that case
> the replica fetcher catches all CorruptRecordException and
> InvalidRecordException.
> {code:java}
> } catch {
> case ime@(_: CorruptRecordException | _:
> InvalidRecordException) =>
> // we log the error and continue. This ensures two
> things
> // 1. If there is a corrupt message in a topic
> partition, it does not bring the fetcher thread
> // down and cause other topic partition to also lag
> // 2. If the message is corrupt due to a transient
> state in the log (truncation, partial writes
> // can cause this), we simply continue and should
> get fixed in the subsequent fetches
> error(s"Found invalid messages during fetch for
> partition $topicPartition " +
> s"offset ${currentFetchState.fetchOffset}", ime)
> partitionsWithError += topicPartition
> {code}
> The KRaft implementation doesn't handle this case:
> {code:java}
> } else {
> Records records =
> FetchResponse.recordsOrFail(partitionResponse);
> if (records.sizeInBytes() > 0) {
> appendAsFollower(records);
> }
> OptionalLong highWatermark =
> partitionResponse.highWatermark() < 0 ?
> OptionalLong.empty() :
> OptionalLong.of(partitionResponse.highWatermark());
> updateFollowerHighWatermark(state, highWatermark);
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)