Repository: kafka Updated Branches: refs/heads/trunk 3fdbba1c7 -> 51fc50ed0
KAFKA-5031; Follow-up with small cleanups/improvements Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #3363 from hachikuji/KAFKA-5031-FOLLOWUP Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51fc50ed Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51fc50ed Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51fc50ed Branch: refs/heads/trunk Commit: 51fc50ed0ba7aa4d0e618f9103a1531680489139 Parents: 3fdbba1 Author: Jason Gustafson <[email protected]> Authored: Sat Jun 17 12:48:34 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 17 12:48:38 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/record/DefaultRecord.java | 25 +++--- .../kafka/common/record/DefaultRecordBatch.java | 2 +- .../common/record/InvalidRecordException.java | 4 + .../kafka/common/record/DefaultRecordTest.java | 84 ++++++++++++++++++++ 4 files changed, 104 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index 9b7f327..109528f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -293,7 +293,8 @@ public class DefaultRecord implements Record { ByteBuffer recordBuffer = ByteBuffer.allocate(sizeOfBodyInBytes); input.readFully(recordBuffer.array(), 0, sizeOfBodyInBytes); int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; - return readFrom(recordBuffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); + return readFrom(recordBuffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, + baseSequence, logAppendTime); } public static DefaultRecord readFrom(ByteBuffer buffer, @@ -306,11 +307,13 @@ public class DefaultRecord implements Record { return null; int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + sizeOfBodyInBytes; - return readFrom(buffer, totalSizeInBytes, baseOffset, baseTimestamp, baseSequence, logAppendTime); + return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, baseOffset, baseTimestamp, + baseSequence, logAppendTime); } private static DefaultRecord readFrom(ByteBuffer buffer, int sizeInBytes, + int sizeOfBodyInBytes, long baseOffset, long baseTimestamp, int baseSequence, @@ -349,19 +352,24 @@ public class DefaultRecord implements Record { if (numHeaders < 0) throw new InvalidRecordException("Found invalid number of record headers " + numHeaders); + final Header[] headers; if (numHeaders == 0) - return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, Record.EMPTY_HEADERS); + headers = Record.EMPTY_HEADERS; + else + headers = readHeaders(buffer, numHeaders); - Header[] headers = readHeaders(buffer, numHeaders, recordStart, sizeInBytes); + // validate whether we have read all header bytes in the current record + if (buffer.position() - recordStart != sizeOfBodyInBytes) + throw new InvalidRecordException("Invalid record size: expected to read " + sizeOfBodyInBytes + + " bytes in record payload, but instead read " + (buffer.position() - recordStart)); return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); } catch (BufferUnderflowException | IllegalArgumentException e) { - throw new InvalidRecordException("Invalid header data or number of headers declared for the record, reason for failure was " - + e.getMessage()); + throw new InvalidRecordException("Found invalid record structure", e); } } - private static Header[] readHeaders(ByteBuffer buffer, int numHeaders, int recordStart, int sizeInBytes) { + private static Header[] readHeaders(ByteBuffer buffer, int numHeaders) { Header[] headers = new Header[numHeaders]; for (int i = 0; i < numHeaders; i++) { int headerKeySize = ByteUtils.readVarint(buffer); @@ -382,9 +390,6 @@ public class DefaultRecord implements Record { headers[i] = new RecordHeader(headerKey, headerValue); } - // validate whether we have read all header bytes in the current record - if (buffer.position() - recordStart != sizeInBytes - ByteUtils.sizeOfVarint(sizeInBytes)) - throw new InvalidRecordException("Invalid header data or number of headers declared for the record"); return headers; } http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 5a7e27a..f933f41 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -259,7 +259,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe try { return inputStream.read() == -1; } catch (IOException e) { - return false; + throw new KafkaException("Error checking for remaining bytes after reading batch", e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java index ffd09a4..49f6166 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java +++ b/clients/src/main/java/org/apache/kafka/common/record/InvalidRecordException.java @@ -26,4 +26,8 @@ public class InvalidRecordException extends CorruptRecordException { super(s); } + public InvalidRecordException(String message, Throwable cause) { + super(message, cause); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/51fc50ed/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java index b9c923d..3ff73c9 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ByteUtils; import org.junit.Test; import java.io.DataOutputStream; @@ -131,6 +132,89 @@ public class DefaultRecordTest { record.headers()), logRecord.sizeInBytes()); } + @Test(expected = InvalidRecordException.class) + public void testInvalidKeySize() { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + int keySize = 105; // use a key size larger than the full message + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(keySize, buf); + buf.position(buf.limit()); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidValueSize() throws IOException { + byte attributes = 0; + long timestampDelta = 2; + int offsetDelta = 1; + int sizeOfBodyInBytes = 100; + int valueSize = 105; // use a value size larger than the full message + + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(timestampDelta, buf); + ByteUtils.writeVarint(offsetDelta, buf); + ByteUtils.writeVarint(-1, buf); // null key + ByteUtils.writeVarint(valueSize, buf); + buf.position(buf.limit()); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testUnderflowReadingTimestamp() { + byte attributes = 0; + int sizeOfBodyInBytes = 1; + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes)); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testUnderflowReadingVarlong() { + byte attributes = 0; + int sizeOfBodyInBytes = 2; // one byte for attributes, one byte for partial timestamp + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 1); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + buf.put(attributes); + ByteUtils.writeVarlong(156, buf); // needs 2 bytes to represent + buf.position(buf.limit() - 1); + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + + @Test(expected = InvalidRecordException.class) + public void testInvalidVarlong() { + byte attributes = 0; + int sizeOfBodyInBytes = 11; // one byte for attributes, 10 bytes for max timestamp + ByteBuffer buf = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 1); + ByteUtils.writeVarint(sizeOfBodyInBytes, buf); + int recordStartPosition = buf.position(); + + buf.put(attributes); + ByteUtils.writeVarlong(Long.MAX_VALUE, buf); // takes 10 bytes + buf.put(recordStartPosition + 10, Byte.MIN_VALUE); // use an invalid final byte + + buf.flip(); + DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null); + } + @Test public void testSerdeNoSequence() throws IOException { ByteBuffer key = ByteBuffer.wrap("hi".getBytes());
