Repository: kafka Updated Branches: refs/heads/trunk 6118ecb59 -> 8e4b3dca7
KAFKA-2903; FileRecords.read doesn't handle size > sizeInBytes when start is not zero Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #4158 from ijuma/kafka-2903-file-records-read-slice-size-greater Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e4b3dca Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e4b3dca Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e4b3dca Branch: refs/heads/trunk Commit: 8e4b3dca7b1bcef0e4875fea4fa79377998616c3 Parents: 6118ecb Author: Ismael Juma <ism...@juma.me.uk> Authored: Mon Oct 30 11:06:11 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Oct 30 11:06:11 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/common/record/FileRecords.java | 10 ++--- .../kafka/common/record/FileRecordsTest.java | 41 ++++++++++++++++---- 2 files changed, 38 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8e4b3dca/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index ae99db3..e907abc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -136,12 +136,10 @@ public class FileRecords extends AbstractRecords implements Closeable { if (size < 0) throw new IllegalArgumentException("Invalid size: " + size); - final int end; - // handle integer overflow - if (this.start + position + size < 0) - end = sizeInBytes(); - else - end = Math.min(this.start + position + size, sizeInBytes()); + int end = this.start + position + size; + // handle integer overflow or if end is beyond the end of the file + if (end < 0 || end >= start + sizeInBytes()) + end = start + sizeInBytes(); return new FileRecords(file, channel, this.start + position, end, true); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8e4b3dca/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 6df7c2d..53ac200 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -118,17 +118,44 @@ public class FileRecordsTest { @Test public void testRead() throws IOException { FileRecords read = fileRecords.read(0, fileRecords.sizeInBytes()); + assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes()); TestUtils.checkEquals(fileRecords.batches(), read.batches()); List<RecordBatch> items = batches(read); + RecordBatch first = items.get(0); + + // read from second message until the end + read = fileRecords.read(first.sizeInBytes(), fileRecords.sizeInBytes() - first.sizeInBytes()); + assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes()); + assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read)); + + // read from second message and size is past the end of the file + read = fileRecords.read(first.sizeInBytes(), fileRecords.sizeInBytes()); + assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes()); + assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read)); + + // read from second message and position + size overflows + read = fileRecords.read(first.sizeInBytes(), Integer.MAX_VALUE); + assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes()); + assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read)); + + // read from second message and size is past the end of the file on a view/slice + read = fileRecords.read(1, fileRecords.sizeInBytes() - 1) + .read(first.sizeInBytes() - 1, fileRecords.sizeInBytes()); + assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes()); + assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read)); + + // read from second message and position + size overflows on a view/slice + read = fileRecords.read(1, fileRecords.sizeInBytes() - 1) + .read(first.sizeInBytes() - 1, Integer.MAX_VALUE); + assertEquals(fileRecords.sizeInBytes() - first.sizeInBytes(), read.sizeInBytes()); + assertEquals("Read starting from the second message", items.subList(1, items.size()), batches(read)); + + // read a single message starting from second message RecordBatch second = items.get(1); - - read = fileRecords.read(second.sizeInBytes(), fileRecords.sizeInBytes()); - assertEquals("Try a read starting from the second message", - items.subList(1, 3), batches(read)); - - read = fileRecords.read(second.sizeInBytes(), second.sizeInBytes()); - assertEquals("Try a read of a single message starting from the second message", + read = fileRecords.read(first.sizeInBytes(), second.sizeInBytes()); + assertEquals(second.sizeInBytes(), read.sizeInBytes()); + assertEquals("Read a single message starting from the second message", Collections.singletonList(second), batches(read)); }