[ https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564575#comment-17564575 ]
Doguscan Namal commented on KAFKA-13953: ---------------------------------------- I put the part of the data that I was able to read. Changing the leadership of the partition did not help, so I assume that all of them are corrupted. (data retention is expired now, so unfortunately I could not verify it) * It did not corrupted in the RecordBatch but rather in the record level. I could read all of the data in that batch up to the problematic offset. * It fails in the following line [https://github.com/apache/kafka/blob/2.5.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L296-L297] . sizeOfBodyInBytes is read as -155493822, which should have been 1156. * I overrode that value for this offset and it was able to read the record data up to its 3rd header among its 5 headers, up to here [https://github.com/apache/kafka/blob/2.5.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L545] * It failed because headerValueSize was read as 1991988702, which should have been 13. I realized that `ByteUtils.readVarint` reading 5 bytes of data instead of 1. * Even overriding this headerValueSize did not help after this point. On next read headerKeySize was read as -58 and processing failed. This makes me think that Kafka only has the partial record for this offset. And although there are more records in this batch after this point none of them are accessible because this particular offset is corrupted. Q1) Could it be possible for producer to send a corrupted batch? Due to zero-copy maybe it just copied the received content to the data log file? Q2) I also see `ProduceMessageConversionsPerSec` metric for this topic. Could it be related to it i.e. message conversion? * Topic is configured to use zstd compression * Kafka version 2.5.1 Here is a data from my println when this record is being read: recordStart:0 attributes: 0 timestampDelta: 391 timestamp 1656027641475 offset: 88062375700 sequence:40017233 key: java.nio.HeapByteBuffer[pos=0 lim=25 cap=1199] value: java.nio.HeapByteBuffer[pos=0 lim=961 cap=1149] numHeaders: 5 headerValueSize: 12 capacity: 147 headerValueSize: 8 capacity: 122 headerValueSize: 1991988702 capacity: 90 > kafka Console consumer fails with CorruptRecordException > --------------------------------------------------------- > > Key: KAFKA-13953 > URL: https://issues.apache.org/jira/browse/KAFKA-13953 > Project: Kafka > Issue Type: Bug > Components: consumer, controller, core > Affects Versions: 2.7.0 > Reporter: Aldan Brito > Priority: Blocker > > Kafka consumer fails with corrupt record exception. > {code:java} > opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*:<port> > --topic BQR-PULL-DEFAULT --from-beginning > > /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest > [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating > consumer process: (kafka.tools.ConsoleConsumer$) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record > to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > 0 is less than the minimum record overhead (14) > Processed a total of 15765197 messages {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)