Chia-Ping Tsai created KAFKA-10438:
--------------------------------------
Summary: Lazy initialization of record header to reduce memory
usage in validating records
Key: KAFKA-10438
URL: https://issues.apache.org/jira/browse/KAFKA-10438
Project: Kafka
Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai
{code}
private def validateRecord(batch: RecordBatch, topicPartition:
TopicPartition, record: Record, batchIndex: Int, now: Long,
timestampType: TimestampType, timestampDiffMaxMs:
Long, compactedTopic: Boolean,
brokerTopicStats: BrokerTopicStats):
Option[ApiRecordError] = {
if (!record.hasMagic(batch.magic)) {
brokerTopicStats.allTopicsStats.invalidMagicNumberRecordsPerSec.mark()
return Some(ApiRecordError(Errors.INVALID_RECORD, new
RecordError(batchIndex,
s"Record $record's magic does not match outer magic ${batch.magic} in
topic partition $topicPartition.")))
}
// verify the record-level CRC only if this is one of the deep entries of a
compressed message
// set for magic v0 and v1. For non-compressed messages, there is no inner
record for magic v0 and v1,
// so we depend on the batch-level CRC check in
Log.analyzeAndValidateRecords(). For magic v2 and above,
// there is no record-level CRC to check.
if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) {
try {
record.ensureValid()
} catch {
case e: InvalidRecordException =>
brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
throw new CorruptRecordException(e.getMessage + s" in topic partition
$topicPartition.")
}
}
validateKey(record, batchIndex, topicPartition, compactedTopic,
brokerTopicStats).orElse {
validateTimestamp(batch, record, batchIndex, now, timestampType,
timestampDiffMaxMs)
}
}
{code}
There is no checks for header key so instantiating key (bytes to string) is
unnecessary.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)