Repository: kafka Updated Branches: refs/heads/trunk 3db3d73de -> e41e78200
KAFKA-4776; Implement graceful handling for improperly formed compressed message sets Author: Jason Gustafson <ja...@confluent.io> Reviewers: Jiangjie Qin <becket....@gmail.com> Closes #2572 from hachikuji/KAFKA-4776 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e41e7820 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e41e7820 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e41e7820 Branch: refs/heads/trunk Commit: e41e782006d807a1ca8098dbfb95b8ab2295d6af Parents: 3db3d73 Author: Jason Gustafson <ja...@confluent.io> Authored: Sun Feb 19 20:53:19 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Sun Feb 19 20:53:19 2017 -0800 ---------------------------------------------------------------------- .../common/record/MemoryRecordsBuilder.java | 17 ++++++--- .../org/apache/kafka/common/record/Record.java | 1 - .../kafka/common/record/RecordsIterator.java | 11 +++++- .../common/record/MemoryRecordsBuilderTest.java | 12 ++++++ .../kafka/common/record/SimpleRecordTest.java | 39 ++++++++++++++++++++ 5 files changed, 71 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 39f21a9..731d1dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -154,13 +154,18 @@ public class MemoryRecordsBuilder { throw new KafkaException(e); } - if (compressionType != CompressionType.NONE) - writerCompressedWrapperHeader(); + if (numRecords == 0L) { + buffer().position(initPos); + builtRecords = MemoryRecords.EMPTY; + } else { + if (compressionType != CompressionType.NONE) + writerCompressedWrapperHeader(); - ByteBuffer buffer = buffer().duplicate(); - buffer.flip(); - buffer.position(initPos); - builtRecords = MemoryRecords.readableRecords(buffer.slice()); + ByteBuffer buffer = buffer().duplicate(); + buffer.flip(); + buffer.position(initPos); + builtRecords = MemoryRecords.readableRecords(buffer.slice()); + } } private void writerCompressedWrapperHeader() { http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index 51bbe35..e3d0b76 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -522,7 +522,6 @@ public final class Record { return crc; } - /** * Write a record using raw fields (without validation). This should only be used in testing. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java index 07c9197..430db30 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java @@ -146,8 +146,11 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { this.wrapperMagic = wrapperRecord.magic(); CompressionType compressionType = wrapperRecord.compressionType(); - ByteBuffer buffer = wrapperRecord.value(); - DataInputStream stream = new DataInputStream(compressionType.wrapForInput(new ByteBufferInputStream(buffer), + ByteBuffer wrapperValue = wrapperRecord.value(); + if (wrapperValue == null) + throw new InvalidRecordException("Found invalid compressed record set with null value"); + + DataInputStream stream = new DataInputStream(compressionType.wrapForInput(new ByteBufferInputStream(wrapperValue), wrapperRecord.magic())); LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize); @@ -180,6 +183,10 @@ public class RecordsIterator extends AbstractIterator<LogEntry> { } logEntries.addLast(logEntry); } + + if (logEntries.isEmpty()) + throw new InvalidRecordException("Found invalid compressed record set with no inner records"); + if (wrapperMagic > Record.MAGIC_VALUE_V0) this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset(); else http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 02ee75e..4c2a71e 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -43,6 +43,18 @@ public class MemoryRecordsBuilderTest { } @Test + public void testWriteEmptyRecordSet() { + ByteBuffer buffer = ByteBuffer.allocate(128); + buffer.position(bufferOffset); + + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType, + TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); + MemoryRecords records = builder.build(); + assertEquals(0, records.sizeInBytes()); + assertEquals(bufferOffset, buffer.position()); + } + + @Test public void testCompressionRateV0() { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.position(bufferOffset); http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java index e4c4a67..ca78fc9 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java @@ -16,16 +16,55 @@ */ package org.apache.kafka.common.record; +import org.apache.kafka.common.utils.Utils; import org.junit.Test; +import java.io.DataOutputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SimpleRecordTest { + @Test(expected = InvalidRecordException.class) + public void testCompressedIterationWithNullValue() throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(128); + DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer)); + LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1); + Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, null, CompressionType.GZIP, TimestampType.CREATE_TIME); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + for (Record record : records.records()) + fail("Iteration should have caused invalid record error"); + } + + @Test(expected = InvalidRecordException.class) + public void testCompressedIterationWithEmptyRecords() throws Exception { + ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64); + OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue), + Record.MAGIC_VALUE_V1, 64); + gzipOutput.close(); + emptyCompressedValue.flip(); + + ByteBuffer buffer = ByteBuffer.allocate(128); + DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer)); + LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1 + emptyCompressedValue.remaining()); + Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, Utils.toArray(emptyCompressedValue), + CompressionType.GZIP, TimestampType.CREATE_TIME); + + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + for (Record record : records.records()) + fail("Iteration should have caused invalid record error"); + } + /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */ @Test(expected = InvalidRecordException.class) public void testIsValidWithTooSmallBuffer() {