Repository: kafka Updated Branches: refs/heads/trunk 286411cbb -> d2acd676c
MINOR: More graceful handling of buffers that are too small in Record's `isValid` and `ensureValid` Also add tests and make `Crc32.update` perform the same argument checks as `java.util.zip.CRC32`. Author: Ismael Juma <[email protected]> Reviewers: Gwen Shapira Closes #1672 from ijuma/record-is-valid-should-be-more-robust Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2acd676 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2acd676 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2acd676 Branch: refs/heads/trunk Commit: d2acd676c3eb0c11d0042bc3b9ae314165c68443 Parents: 286411c Author: Ismael Juma <[email protected]> Authored: Tue Sep 6 17:34:46 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Sep 6 17:34:46 2016 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 15 +++-- .../org/apache/kafka/common/record/Record.java | 15 +++-- .../org/apache/kafka/common/utils/Crc32.java | 3 + .../kafka/common/record/SimpleRecordTest.java | 66 ++++++++++++++++++++ 4 files changed, 87 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index aa5cdbe..eb876a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; @@ -617,12 +618,14 @@ public class Fetcher<K, V> { private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) { Record record = logEntry.record(); - if (this.checkCrcs && !record.isValid()) - throw new KafkaException("Record for partition " + partition + " at offset " - + logEntry.offset() + " is corrupt (stored crc = " + record.checksum() - + ", computed crc = " - + record.computeChecksum() - + ")"); + if (this.checkCrcs) { + try { + record.ensureValid(); + } catch (InvalidRecordException e) { + throw new KafkaException("Record for partition " + partition + " at offset " + logEntry.offset() + + " is invalid, cause: " + e.getMessage()); + } + } try { long offset = logEntry.offset(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 77e4f68..09cb80d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -256,18 +256,21 @@ public final class Record { * Returns true if the crc stored with the record matches the crc computed off the record contents */ public boolean isValid() { - return checksum() == computeChecksum(); + return size() >= CRC_LENGTH && checksum() == computeChecksum(); } /** * Throw an InvalidRecordException if isValid is false for this record */ public void ensureValid() { - if (!isValid()) - throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum() - + ", computed crc = " - + computeChecksum() - + ")"); + if (!isValid()) { + if (size() < CRC_LENGTH) + throw new InvalidRecordException("Record is corrupt (crc could not be retrieved as the record is too " + + "small, size = " + size() + ")"); + else + throw new InvalidRecordException("Record is corrupt (stored crc = " + checksum() + + ", computed crc = " + computeChecksum() + ")"); + } } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java index caa0058..48af070 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32.java @@ -72,6 +72,9 @@ public class Crc32 implements Checksum { @Override public void update(byte[] b, int off, int len) { + if (off < 0 || len < 0 || off > b.length - len) + throw new ArrayIndexOutOfBoundsException(); + int localCrc = crc; while (len > 7) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2acd676/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java new file mode 100644 index 0000000..aabadfe --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class SimpleRecordTest { + + /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */ + @Test + public void testIsValidWithTooSmallBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(2); + Record record = new Record(buffer); + assertFalse(record.isValid()); + try { + record.ensureValid(); + fail("InvalidRecordException should have been thrown"); + } catch (InvalidRecordException e) { } + } + + @Test + public void testIsValidWithChecksumMismatch() { + ByteBuffer buffer = ByteBuffer.allocate(4); + // set checksum + buffer.putInt(2); + Record record = new Record(buffer); + assertFalse(record.isValid()); + try { + record.ensureValid(); + fail("InvalidRecordException should have been thrown"); + } catch (InvalidRecordException e) { } + } + + @Test + public void testIsValidWithFourBytesBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(4); + Record record = new Record(buffer); + // it is a bit weird that we return `true` in this case, we could extend the definition of `isValid` to + // something like the following to detect a clearly corrupt record: + // return size() >= recordSize(0, 0) && checksum() == computeChecksum(); + assertTrue(record.isValid()); + // no exception should be thrown + record.ensureValid(); + } + +}
