[ 
https://issues.apache.org/jira/browse/KAFKA-10438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10438.
------------------------------------
    Fix Version/s: 2.8.0
       Resolution: Fixed

> Lazy initialization of record header to reduce memory usage in validating 
> records
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-10438
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10438
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>            Priority: Minor
>             Fix For: 2.8.0
>
>
> {code}
>   private def validateRecord(batch: RecordBatch, topicPartition: 
> TopicPartition, record: Record, batchIndex: Int, now: Long,
>                              timestampType: TimestampType, 
> timestampDiffMaxMs: Long, compactedTopic: Boolean,
>                              brokerTopicStats: BrokerTopicStats): 
> Option[ApiRecordError] = {
>     if (!record.hasMagic(batch.magic)) {
>       brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
>       return Some(ApiRecordError(Errors.INVALID_RECORD, new 
> RecordError(batchIndex,
>         s"Record $record's magic does not match outer magic ${batch.magic} in 
> topic partition $topicPartition.")))
>     }
>     // verify the record-level CRC only if this is one of the deep entries of 
> a compressed message
>     // set for magic v0 and v1. For non-compressed messages, there is no 
> inner record for magic v0 and v1,
>     // so we depend on the batch-level CRC check in 
> Log.analyzeAndValidateRecords(). For magic v2 and above,
>     // there is no record-level CRC to check.
>     if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) {
>       try {
>         record.ensureValid()
>       } catch {
>         case e: InvalidRecordException =>
>           
> brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
>           throw new CorruptRecordException(e.getMessage + s" in topic 
> partition $topicPartition.")
>       }
>     }
>     validateKey(record, batchIndex, topicPartition, compactedTopic, 
> brokerTopicStats).orElse {
>       validateTimestamp(batch, record, batchIndex, now, timestampType, 
> timestampDiffMaxMs)
>     }
>   }
> {code}
> There is no checks for header key so instantiating key (bytes to string) is 
> unnecessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to