Repository: kafka Updated Branches: refs/heads/trunk c82118b19 -> 93fbda4c5
MINOR: Remove redundant CRC validation for non-compressed records in older message formats Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #2881 from hachikuji/fix-redundant-crc-check Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93fbda4c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93fbda4c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93fbda4c Branch: refs/heads/trunk Commit: 93fbda4c577870c96c3ab86c29b60c04dc9183b9 Parents: c82118b Author: Jason Gustafson <ja...@confluent.io> Authored: Fri Apr 21 10:35:33 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Apr 21 10:35:33 2017 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogValidator.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/93fbda4c/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index ae3d846..0616d41 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -76,7 +76,14 @@ private[kafka] object LogValidator extends Logging { timestampDiffMaxMs: Long, compactedTopic: Boolean): Unit = { if (!record.hasMagic(batch.magic)) throw new InvalidRecordException(s"Log record magic does not match outer magic ${batch.magic}") - record.ensureValid() + + // verify the record-level CRC only if this is one of the deep entries of a compressed message + // set for magic v0 and v1. For non-compressed messages, there is no inner record for magic v0 and v1, + // so we depend on the batch-level CRC check in Log.analyzeAndValidateRecords(). For magic v2 and above, + // there is no record-level CRC to check. + if (batch.magic <= RecordBatch.MAGIC_VALUE_V1 && batch.isCompressed) + record.ensureValid() + ensureNotControlRecord(record) validateKey(record, compactedTopic) validateTimestamp(batch, record, now, timestampType, timestampDiffMaxMs)