Repository: kafka
Updated Branches:
  refs/heads/trunk 3db3d73de -> e41e78200


KAFKA-4776; Implement graceful handling for improperly formed compressed 
message sets

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jiangjie Qin <becket....@gmail.com>

Closes #2572 from hachikuji/KAFKA-4776


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e41e7820
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e41e7820
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e41e7820

Branch: refs/heads/trunk
Commit: e41e782006d807a1ca8098dbfb95b8ab2295d6af
Parents: 3db3d73
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Feb 19 20:53:19 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Sun Feb 19 20:53:19 2017 -0800

----------------------------------------------------------------------
 .../common/record/MemoryRecordsBuilder.java     | 17 ++++++---
 .../org/apache/kafka/common/record/Record.java  |  1 -
 .../kafka/common/record/RecordsIterator.java    | 11 +++++-
 .../common/record/MemoryRecordsBuilderTest.java | 12 ++++++
 .../kafka/common/record/SimpleRecordTest.java   | 39 ++++++++++++++++++++
 5 files changed, 71 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 39f21a9..731d1dc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -154,13 +154,18 @@ public class MemoryRecordsBuilder {
             throw new KafkaException(e);
         }
 
-        if (compressionType != CompressionType.NONE)
-            writerCompressedWrapperHeader();
+        if (numRecords == 0L) {
+            buffer().position(initPos);
+            builtRecords = MemoryRecords.EMPTY;
+        } else {
+            if (compressionType != CompressionType.NONE)
+                writerCompressedWrapperHeader();
 
-        ByteBuffer buffer = buffer().duplicate();
-        buffer.flip();
-        buffer.position(initPos);
-        builtRecords = MemoryRecords.readableRecords(buffer.slice());
+            ByteBuffer buffer = buffer().duplicate();
+            buffer.flip();
+            buffer.position(initPos);
+            builtRecords = MemoryRecords.readableRecords(buffer.slice());
+        }
     }
 
     private void writerCompressedWrapperHeader() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java 
b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 51bbe35..e3d0b76 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -522,7 +522,6 @@ public final class Record {
         return crc;
     }
 
-
     /**
      * Write a record using raw fields (without validation). This should only 
be used in testing.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java 
b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 07c9197..430db30 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -146,8 +146,11 @@ public class RecordsIterator extends 
AbstractIterator<LogEntry> {
             this.wrapperMagic = wrapperRecord.magic();
 
             CompressionType compressionType = wrapperRecord.compressionType();
-            ByteBuffer buffer = wrapperRecord.value();
-            DataInputStream stream = new 
DataInputStream(compressionType.wrapForInput(new ByteBufferInputStream(buffer),
+            ByteBuffer wrapperValue = wrapperRecord.value();
+            if (wrapperValue == null)
+                throw new InvalidRecordException("Found invalid compressed 
record set with null value");
+
+            DataInputStream stream = new 
DataInputStream(compressionType.wrapForInput(new 
ByteBufferInputStream(wrapperValue),
                     wrapperRecord.magic()));
             LogInputStream logStream = new DataLogInputStream(stream, 
maxMessageSize);
 
@@ -180,6 +183,10 @@ public class RecordsIterator extends 
AbstractIterator<LogEntry> {
                     }
                     logEntries.addLast(logEntry);
                 }
+
+                if (logEntries.isEmpty())
+                    throw new InvalidRecordException("Found invalid compressed 
record set with no inner records");
+
                 if (wrapperMagic > Record.MAGIC_VALUE_V0)
                     this.absoluteBaseOffset = wrapperRecordOffset - 
logEntries.getLast().offset();
                 else

http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 02ee75e..4c2a71e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -43,6 +43,18 @@ public class MemoryRecordsBuilderTest {
     }
 
     @Test
+    public void testWriteEmptyRecordSet() {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        buffer.position(bufferOffset);
+
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V0, compressionType,
+                TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+        MemoryRecords records = builder.build();
+        assertEquals(0, records.sizeInBytes());
+        assertEquals(bufferOffset, buffer.position());
+    }
+
+    @Test
     public void testCompressionRateV0() {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e41e7820/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
index e4c4a67..ca78fc9 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
@@ -16,16 +16,55 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class SimpleRecordTest {
 
+    @Test(expected = InvalidRecordException.class)
+    public void testCompressedIterationWithNullValue() throws Exception {
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        DataOutputStream out = new DataOutputStream(new 
ByteBufferOutputStream(buffer));
+        LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1);
+        Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, null, 
CompressionType.GZIP, TimestampType.CREATE_TIME);
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        for (Record record : records.records())
+            fail("Iteration should have caused invalid record error");
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testCompressedIterationWithEmptyRecords() throws Exception {
+        ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64);
+        OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new 
ByteBufferOutputStream(emptyCompressedValue),
+                Record.MAGIC_VALUE_V1, 64);
+        gzipOutput.close();
+        emptyCompressedValue.flip();
+
+        ByteBuffer buffer = ByteBuffer.allocate(128);
+        DataOutputStream out = new DataOutputStream(new 
ByteBufferOutputStream(buffer));
+        LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1 + 
emptyCompressedValue.remaining());
+        Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, 
Utils.toArray(emptyCompressedValue),
+                CompressionType.GZIP, TimestampType.CREATE_TIME);
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        for (Record record : records.records())
+            fail("Iteration should have caused invalid record error");
+    }
+
     /* This scenario can happen if the record size field is corrupt and we end 
up allocating a buffer that is too small */
     @Test(expected = InvalidRecordException.class)
     public void testIsValidWithTooSmallBuffer() {

Reply via email to