This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fcb15e3 KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type overflow (#4928) fcb15e3 is described below commit fcb15e357c1b818d2d543dc9db3e011ddc1fbf5e Author: Roman Khlebnov <suppie...@gmail.com> AuthorDate: Wed May 9 03:07:50 2018 +0300 KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type overflow (#4928) Switch from sum operations to subtraction to avoid type casting in checks and type overflow during `FlieLogInputStream` work, especially in cases where property `log.segment.bytes` was set close to the `Integer.MAX_VALUE` and used as a `position` inside `nextBatch()` function. Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> --- .../kafka/common/record/FileLogInputStream.java | 4 ++-- .../kafka/common/record/FileLogInputStreamTest.java | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index a1e3a2f..92e8864 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -60,7 +60,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil @Override public FileChannelRecordBatch nextBatch() throws IOException { FileChannel channel = fileRecords.channel(); - if (position + HEADER_SIZE_UP_TO_MAGIC >= end) + if (position >= end - HEADER_SIZE_UP_TO_MAGIC) return null; logHeaderBuffer.rewind(); @@ -75,7 +75,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " + "overhead (%d) in file %s.", size, LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file())); - if (position + LOG_OVERHEAD + size > end) + if (position > end - LOG_OVERHEAD - size) return null; byte magic = logHeaderBuffer.get(MAGIC_OFFSET); diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java index 95b2a0c..77aaae8 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java @@ -112,8 +112,8 @@ public class FileLogInputStreamTest { SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), new SimpleRecord(234280L, "b".getBytes(), "2".getBytes()) - }; + SimpleRecord[] secondBatchRecords = new SimpleRecord[]{ new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()), new SimpleRecord(897839L, null, "4".getBytes()), @@ -152,8 +152,8 @@ public class FileLogInputStreamTest { SimpleRecord[] firstBatchRecords = new SimpleRecord[]{ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()), new SimpleRecord(234280L, "b".getBytes(), "2".getBytes()) - }; + SimpleRecord[] secondBatchRecords = new SimpleRecord[]{ new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()), new SimpleRecord(897839L, null, "4".getBytes()), @@ -204,6 +204,22 @@ public class FileLogInputStreamTest { } } + @Test + public void testNextBatchSelectionWithMaxedParams() throws IOException { + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, Integer.MAX_VALUE, Integer.MAX_VALUE); + assertNull(logInputStream.nextBatch()); + } + } + + @Test + public void testNextBatchSelectionWithZeroedParams() throws IOException { + try (FileRecords fileRecords = FileRecords.open(tempFile())) { + FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, 0); + assertNull(logInputStream.nextBatch()); + } + } + private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence, boolean isTransactional, SimpleRecord ... records) { assertEquals(producerId, batch.producerId()); -- To stop receiving notification emails like this one, please contact j...@apache.org.