This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new f794bd2  MINOR: Fix chunked down-conversion behavior when no valid 
batch exists after conversion (#5173)
f794bd2 is described below

commit f794bd276f183fe9c5d5287b7c7cdb6dc86713ba
Author: Dhruvil Shah <[email protected]>
AuthorDate: Thu Jun 14 23:00:33 2018 -0700

    MINOR: Fix chunked down-conversion behavior when no valid batch exists 
after conversion (#5173)
    
    We might decide to drop certain message batches during down-conversion 
because older clients might not be able to interpret them. One such example is 
control batches which are typically removed by the broker if down-conversion to 
V0 or V1 is required. This patch makes sure the chunked down-conversion 
implementation is able to handle such cases.
---
 .../common/record/LazyDownConversionRecords.java   |  44 +++--
 .../record/LazyDownConversionRecordsSend.java      |  32 ++-
 .../java/org/apache/kafka/common/utils/Utils.java  |  11 ++
 .../kafka/common/record/FileRecordsTest.java       |  23 +--
 .../record/LazyDownConversionRecordsTest.java      | 214 +++++++++++----------
 .../common/record/MemoryRecordsBuilderTest.java    |   5 +-
 docs/upgrade.html                                  |   8 +
 7 files changed, 174 insertions(+), 163 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index da14b5b..d58689d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Time;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -57,13 +56,15 @@ public class LazyDownConversionRecords implements 
BaseRecords {
         // need to make sure that we are able to accommodate one full batch of 
down-converted messages. The way we achieve
         // this is by having sizeInBytes method factor in the size of the 
first down-converted batch and return at least
         // its size.
-        AbstractIterator<? extends RecordBatch> it = records.batchIterator();
+        java.util.Iterator<ConvertedRecords> it = iterator(0);
         if (it.hasNext()) {
-            firstConvertedBatch = 
RecordsUtil.downConvert(Collections.singletonList(it.peek()), toMagic, 
firstOffset, time);
+            firstConvertedBatch = it.next();
             sizeInBytes = Math.max(records.sizeInBytes(), 
firstConvertedBatch.records().sizeInBytes());
         } else {
+            // If there are no messages we got after down-conversion, make 
sure we are able to send at least an overflow
+            // message to the consumer. Typically, the consumer would need to 
increase the fetch size in such cases.
             firstConvertedBatch = null;
-            sizeInBytes = 0;
+            sizeInBytes = 
LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH;
         }
     }
 
@@ -148,21 +149,28 @@ public class LazyDownConversionRecords implements 
BaseRecords {
                 return convertedBatch;
             }
 
-            if (!batchIterator.hasNext())
-                return allDone();
-
-            // Figure out batches we should down-convert based on the size 
constraints
-            List<RecordBatch> batches = new ArrayList<>();
-            boolean isFirstBatch = true;
-            long sizeSoFar = 0;
-            while (batchIterator.hasNext() &&
-                    (isFirstBatch || (batchIterator.peek().sizeInBytes() + 
sizeSoFar) <= maximumReadSize)) {
-                RecordBatch currentBatch = batchIterator.next();
-                batches.add(currentBatch);
-                sizeSoFar += currentBatch.sizeInBytes();
-                isFirstBatch = false;
+            while (batchIterator.hasNext()) {
+                List<RecordBatch> batches = new ArrayList<>();
+                boolean isFirstBatch = true;
+                long sizeSoFar = 0;
+
+                // Figure out batches we should down-convert based on the size 
constraints
+                while (batchIterator.hasNext() &&
+                        (isFirstBatch || (batchIterator.peek().sizeInBytes() + 
sizeSoFar) <= maximumReadSize)) {
+                    RecordBatch currentBatch = batchIterator.next();
+                    batches.add(currentBatch);
+                    sizeSoFar += currentBatch.sizeInBytes();
+                    isFirstBatch = false;
+                }
+                ConvertedRecords convertedRecords = 
RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+                // During conversion, it is possible that we drop certain 
batches because they do not have an equivalent
+                // representation in the message format we want to convert to. 
For example, V0 and V1 message formats
+                // have no notion of transaction markers which were introduced 
in V2 so they get dropped during conversion.
+                // We return converted records only when we have at least one 
valid batch of messages after conversion.
+                if (convertedRecords.records().sizeInBytes() > 0)
+                    return convertedRecords;
             }
-            return RecordsUtil.downConvert(batches, toMagic, firstOffset, 
time);
+            return allDone();
         }
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index e60e1ed..f0fab7d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
@@ -33,6 +32,7 @@ import java.util.Iterator;
 public final class LazyDownConversionRecordsSend extends 
RecordsSend<LazyDownConversionRecords> {
     private static final Logger log = 
LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
     private static final int MAX_READ_SIZE = 128 * 1024;
+    static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
 
     private RecordConversionStats recordConversionStats;
     private RecordsSend convertedRecordsWriter;
@@ -49,39 +49,31 @@ public final class LazyDownConversionRecordsSend extends 
RecordsSend<LazyDownCon
     public long writeTo(GatheringByteChannel channel, long previouslyWritten, 
int remaining) throws IOException {
         if (convertedRecordsWriter == null || 
convertedRecordsWriter.completed()) {
             MemoryRecords convertedRecords;
-
             // Check if we have more chunks left to down-convert
             if (convertedRecordsIterator.hasNext()) {
                 // Get next chunk of down-converted messages
                 ConvertedRecords<MemoryRecords> recordsAndStats = 
convertedRecordsIterator.next();
                 convertedRecords = recordsAndStats.records();
-
-                int sizeOfFirstConvertedBatch = 
convertedRecords.batchIterator().next().sizeInBytes();
-                if (previouslyWritten == 0 && sizeOfFirstConvertedBatch > 
size())
-                    throw new EOFException("Unable to send first batch 
completely." +
-                            " maximum_size: " + size() +
-                            " converted_records_size: " + 
sizeOfFirstConvertedBatch);
-
                 
recordConversionStats.add(recordsAndStats.recordConversionStats());
-                log.debug("Got lazy converted records for partition {} with 
length={}", topicPartition(), convertedRecords.sizeInBytes());
+                log.debug("Down-converted records for partition {} with 
length={}", topicPartition(), convertedRecords.sizeInBytes());
             } else {
-                if (previouslyWritten == 0)
-                    throw new EOFException("Unable to get the first batch of 
down-converted records");
-
-                // We do not have any records left to down-convert. Construct 
a "fake" message for the length remaining.
+                // We do not have any records left to down-convert. Construct 
an overflow message for the length remaining.
                 // This message will be ignored by the consumer because its 
length will be past the length of maximum
                 // possible response size.
                 // DefaultRecordBatch =>
                 //      BaseOffset => Int64
                 //      Length => Int32
                 //      ...
-                log.debug("Constructing fake message batch for partition {} 
for remaining length={}", topicPartition(), remaining);
-                ByteBuffer fakeMessageBatch = 
ByteBuffer.allocate(Math.max(Records.LOG_OVERHEAD, Math.min(remaining + 1, 
MAX_READ_SIZE)));
-                fakeMessageBatch.putLong(-1L);
-                fakeMessageBatch.putInt(remaining + 1);
-                convertedRecords = 
MemoryRecords.readableRecords(fakeMessageBatch);
-            }
+                ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
+                        Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, 
Math.min(remaining + 1, MAX_READ_SIZE)));
+                overflowMessageBatch.putLong(-1L);
 
+                // Fill in the length of the overflow batch. A valid batch 
must be at least as long as the minimum batch
+                // overhead.
+                overflowMessageBatch.putInt(Math.max(remaining + 1, 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
+                convertedRecords = 
MemoryRecords.readableRecords(overflowMessageBatch);
+                log.debug("Constructed overflow message batch for partition {} 
with length={}", topicPartition(), remaining);
+            }
             convertedRecordsWriter = new DefaultRecordsSend(destination(), 
convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
         }
         return convertedRecordsWriter.writeTo(channel);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ebe87ba..31fa01c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -112,6 +112,17 @@ public final class Utils {
     }
 
     /**
+     * Read a UTF8 string from the current position till the end of a byte 
buffer. The position of the byte buffer is
+     * not affected by this method.
+     *
+     * @param buffer The buffer to read from
+     * @return The UTF8 string
+     */
+    public static String utf8(ByteBuffer buffer) {
+        return utf8(buffer, buffer.remaining());
+    }
+
+    /**
      * Read a UTF8 string from a byte buffer at a given offset. Note that the 
position of the byte buffer
      * is not affected by this method.
      *
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index bbe84b2..f08652e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -38,6 +37,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -430,10 +430,6 @@ public class FileRecordsTest {
         }
     }
 
-    private String utf8(ByteBuffer buffer) {
-        return Utils.utf8(buffer, buffer.remaining());
-    }
-
     private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
                                              List<Long> initialOffsets,
                                              FileRecords fileRecords,
@@ -441,13 +437,11 @@ public class FileRecordsTest {
                                              byte toMagic,
                                              long firstOffset,
                                              Time time) {
-        long numBatches = 0;
         long minBatchSize = Long.MAX_VALUE;
         long maxBatchSize = Long.MIN_VALUE;
         for (RecordBatch batch : fileRecords.batches()) {
             minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
             maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
-            numBatches++;
         }
 
         // Test the normal down-conversion path
@@ -469,21 +463,6 @@ public class FileRecordsTest {
             Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
             while (it.hasNext())
                 convertedRecords.add(it.next().records());
-
-            // Check if chunking works as expected. The only way to 
predictably test for this is by testing the edge cases.
-            // 1. If maximum read size is greater than the size of all batches 
combined, we must get all down-conversion
-            //    records in exactly two batches; the first chunk is pre 
down-converted and returned, and the second chunk
-            //    contains the remaining batches.
-            // 2. If maximum read size is just smaller than the size of all 
batches combined, we must get results in two
-            //    chunks.
-            // 3. If maximum read size is less than the size of a single 
record, we get one batch in each chunk.
-            if (readSize >= fileRecords.sizeInBytes())
-                assertEquals(2, convertedRecords.size());
-            else if (readSize == fileRecords.sizeInBytes() - 1)
-                assertEquals(2, convertedRecords.size());
-            else if (readSize <= minBatchSize)
-                assertEquals(numBatches, convertedRecords.size());
-
             verifyConvertedRecords(initialRecords, initialOffsets, 
convertedRecords, compressionType, toMagic);
             convertedRecords.clear();
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
index 8765603..89c1aea 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -35,86 +34,126 @@ import java.util.Collection;
 import java.util.List;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.apache.kafka.test.TestUtils.tempFile;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@RunWith(value = Parameterized.class)
 public class LazyDownConversionRecordsTest {
-    private final CompressionType compressionType;
-    private final byte toMagic;
-    private final DownConversionTest test;
-
-    public LazyDownConversionRecordsTest(CompressionType compressionType, byte 
toMagic, DownConversionTest test) {
-        this.compressionType = compressionType;
-        this.toMagic = toMagic;
-        this.test = test;
+    /**
+     * Test the lazy down-conversion path in the presence of commit markers. 
When converting to V0 or V1, these batches
+     * are dropped. If there happen to be no more batches left to convert, we 
must get an overflow message batch after
+     * conversion.
+     */
+    @Test
+    public void testConversionOfCommitMarker() throws IOException {
+        MemoryRecords recordsToConvert = 
MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(), 
RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                1, (short) 1, new 
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+        MemoryRecords convertedRecords = convertRecords(recordsToConvert, 
(byte) 1, recordsToConvert.sizeInBytes());
+        ByteBuffer buffer = convertedRecords.buffer();
+
+        // read the offset and the batch length
+        buffer.getLong();
+        int sizeOfConvertedRecords = buffer.getInt();
+
+        // assert we got an overflow message batch
+        assertTrue(sizeOfConvertedRecords > buffer.limit());
+        assertFalse(convertedRecords.batchIterator().hasNext());
     }
 
-    enum DownConversionTest {
-        DEFAULT,
-        OVERFLOW,
-    }
+    @RunWith(value = Parameterized.class)
+    public static class ParameterizedConversionTest {
+        private final CompressionType compressionType;
+        private final byte toMagic;
 
-    @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}, 
test={2}")
-    public static Collection<Object[]> data() {
-        List<Object[]> values = new ArrayList<>();
-        for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= 
RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
-            for (DownConversionTest test : DownConversionTest.values()) {
-                values.add(new Object[]{CompressionType.NONE, toMagic, test});
-                values.add(new Object[]{CompressionType.GZIP, toMagic, test});
+        public ParameterizedConversionTest(CompressionType compressionType, 
byte toMagic) {
+            this.compressionType = compressionType;
+            this.toMagic = toMagic;
+        }
+
+        @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}")
+        public static Collection<Object[]> data() {
+            List<Object[]> values = new ArrayList<>();
+            for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <= 
RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
+                values.add(new Object[]{CompressionType.NONE, toMagic});
+                values.add(new Object[]{CompressionType.GZIP, toMagic});
             }
+            return values;
         }
-        return values;
-    }
 
-    @Test
-    public void doTestConversion() throws IOException {
-        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 
24L);
-
-        Header[] headers = {new RecordHeader("headerKey1", 
"headerValue1".getBytes()),
-                            new RecordHeader("headerKey2", 
"headerValue2".getBytes()),
-                            new RecordHeader("headerKey3", 
"headerValue3".getBytes())};
-
-        List<SimpleRecord> records = asList(
-                new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
-                new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
-                new SimpleRecord(3L, "k3".getBytes(), "hello 
again".getBytes()),
-                new SimpleRecord(4L, "k4".getBytes(), "goodbye for 
now".getBytes()),
-                new SimpleRecord(5L, "k5".getBytes(), "hello 
again".getBytes()),
-                new SimpleRecord(6L, "k6".getBytes(), "I sense 
indecision".getBytes()),
-                new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
-                new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes(), headers),
-                new SimpleRecord(9L, "k9".getBytes(), "ok, almost 
done".getBytes()),
-                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(), 
headers));
-        assertEquals("incorrect test setup", offsets.size(), records.size());
-
-        ByteBuffer buffer = ByteBuffer.allocate(1024);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 0; i < 3; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L);
-        for (int i = 3; i < 6; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
-                0L);
-        for (int i = 6; i < 10; i++)
-            builder.appendWithOffset(offsets.get(i), records.get(i));
-        builder.close();
-
-        buffer.flip();
+        /**
+         * Test the lazy down-conversion path.
+         */
+        @Test
+        public void testConversion() throws IOException {
+            doTestConversion(false);
+        }
+
+        /**
+         * Test the lazy down-conversion path where the number of bytes we 
want to convert is much larger than the
+         * number of bytes we get after conversion. This causes overflow 
message batch(es) to be appended towards the
+         * end of the converted output.
+         */
+        @Test
+        public void testConversionWithOverflow() throws IOException {
+            doTestConversion(true);
+        }
 
+        private void doTestConversion(boolean testConversionOverflow) throws 
IOException {
+            List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 
22L, 24L);
+
+            Header[] headers = {new RecordHeader("headerKey1", 
"headerValue1".getBytes()),
+                                new RecordHeader("headerKey2", 
"headerValue2".getBytes()),
+                                new RecordHeader("headerKey3", 
"headerValue3".getBytes())};
+
+            List<SimpleRecord> records = asList(
+                    new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+                    new SimpleRecord(2L, "k2".getBytes(), 
"goodbye".getBytes()),
+                    new SimpleRecord(3L, "k3".getBytes(), "hello 
again".getBytes()),
+                    new SimpleRecord(4L, "k4".getBytes(), "goodbye for 
now".getBytes()),
+                    new SimpleRecord(5L, "k5".getBytes(), "hello 
again".getBytes()),
+                    new SimpleRecord(6L, "k6".getBytes(), "I sense 
indecision".getBytes()),
+                    new SimpleRecord(7L, "k7".getBytes(), "what 
now".getBytes()),
+                    new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes(), headers),
+                    new SimpleRecord(9L, "k9".getBytes(), "ok, almost 
done".getBytes()),
+                    new SimpleRecord(10L, "k10".getBytes(), 
"finally".getBytes(), headers));
+            assertEquals("incorrect test setup", offsets.size(), 
records.size());
+
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+                    TimestampType.CREATE_TIME, 0L);
+            for (int i = 0; i < 3; i++)
+                builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+
+            builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                    0L);
+            for (int i = 3; i < 6; i++)
+                builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+
+            builder = MemoryRecords.builder(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+                    0L);
+            for (int i = 6; i < 10; i++)
+                builder.appendWithOffset(offsets.get(i), records.get(i));
+            builder.close();
+            buffer.flip();
+
+            MemoryRecords recordsToConvert = 
MemoryRecords.readableRecords(buffer);
+            int numBytesToConvert = recordsToConvert.sizeInBytes();
+            if (testConversionOverflow)
+                numBytesToConvert *= 2;
+
+            MemoryRecords convertedRecords = convertRecords(recordsToConvert, 
toMagic, numBytesToConvert);
+            verifyDownConvertedRecords(records, offsets, convertedRecords, 
compressionType, toMagic);
+        }
+    }
+
+    private static MemoryRecords convertRecords(MemoryRecords 
recordsToConvert, byte toMagic, int bytesToConvert) throws IOException {
         try (FileRecords inputRecords = FileRecords.open(tempFile())) {
-            MemoryRecords memoryRecords = 
MemoryRecords.readableRecords(buffer);
-            inputRecords.append(memoryRecords);
+            inputRecords.append(recordsToConvert);
             inputRecords.flush();
 
             LazyDownConversionRecords lazyRecords = new 
LazyDownConversionRecords(new TopicPartition("test", 1),
@@ -123,50 +162,27 @@ public class LazyDownConversionRecordsTest {
             File outputFile = tempFile();
             FileChannel channel = new RandomAccessFile(outputFile, 
"rw").getChannel();
 
-            // Size of lazy records is at least as much as the size of 
underlying records
-            assertTrue(lazyRecords.sizeInBytes() >= 
inputRecords.sizeInBytes());
-
-            int toWrite;
             int written = 0;
-            List<SimpleRecord> recordsBeingConverted;
-            List<Long> offsetsOfRecords;
-            switch (test) {
-                case DEFAULT:
-                    toWrite = inputRecords.sizeInBytes();
-                    recordsBeingConverted = records;
-                    offsetsOfRecords = offsets;
-                    break;
-                case OVERFLOW:
-                    toWrite = inputRecords.sizeInBytes() * 2;
-                    recordsBeingConverted = records;
-                    offsetsOfRecords = offsets;
-                    break;
-                default:
-                    throw new IllegalArgumentException();
-            }
-            while (written < toWrite)
-                written += lazySend.writeTo(channel, written, toWrite - 
written);
+            while (written < bytesToConvert)
+                written += lazySend.writeTo(channel, written, bytesToConvert - 
written);
 
             FileRecords convertedRecords = FileRecords.open(outputFile, true, 
(int) channel.size(), false);
             ByteBuffer convertedRecordsBuffer = 
ByteBuffer.allocate(convertedRecords.sizeInBytes());
             convertedRecords.readInto(convertedRecordsBuffer, 0);
-            MemoryRecords convertedMemoryRecords = 
MemoryRecords.readableRecords(convertedRecordsBuffer);
-            verifyDownConvertedRecords(recordsBeingConverted, 
offsetsOfRecords, convertedMemoryRecords, compressionType, toMagic);
 
+            // cleanup
             convertedRecords.close();
             channel.close();
-        }
-    }
 
-    private String utf8(ByteBuffer buffer) {
-        return Utils.utf8(buffer, buffer.remaining());
+            return MemoryRecords.readableRecords(convertedRecordsBuffer);
+        }
     }
 
-    private void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
-                                            List<Long> initialOffsets,
-                                            MemoryRecords downConvertedRecords,
-                                            CompressionType compressionType,
-                                            byte toMagic) {
+    private static void verifyDownConvertedRecords(List<SimpleRecord> 
initialRecords,
+                                                   List<Long> initialOffsets,
+                                                   MemoryRecords 
downConvertedRecords,
+                                                   CompressionType 
compressionType,
+                                                   byte toMagic) {
         int i = 0;
         for (RecordBatch batch : downConvertedRecords.batches()) {
             assertTrue("Magic byte should be lower than or equal to " + 
toMagic, batch.magic() <= toMagic);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 36b14a2..5d5221e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -30,6 +30,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 
+import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -568,10 +569,6 @@ public class MemoryRecordsBuilderTest {
         }
     }
 
-    private String utf8(ByteBuffer buffer) {
-        return Utils.utf8(buffer, buffer.remaining());
-    }
-
     @Test
     public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws 
Exception {
         ByteBuffer buffer = ByteBuffer.allocate(128);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 6119536..89c90d1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -119,6 +119,14 @@
     </li>
     <li><a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs";>KIP-290</a>
 adds the ability
         to define ACLs on prefixed resources, e.g. any topic starting with 
'foo'.</li>
+    <li><a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion";>KIP-283</a>
 improves message down-conversion
+        handling on Kafka broker, which has typically been a memory-intensive 
operation. The KIP adds a mechanism by which the operation becomes less memory 
intensive
+        by down-converting chunks of partition data at a time which helps put 
an upper bound on memory consumption. With this improvement, there is a change 
in
+        <code>FetchResponse</code> protocol behavior where the broker could 
send an oversized message batch towards the end of the response with an invalid 
offset.
+        Such oversized messages must be ignored by consumer clients, as is 
done by <code>KafkaConsumer</code>.
+    <p>KIP-283 also adds new topic and broker configurations 
<code>message.downconversion.enable</code> and 
<code>log.message.downconversion.enable</code> respectively
+       to control whether down-conversion is enabled. When disabled, broker 
does not perform any down-conversion and instead sends an 
<code>UNSUPPORTED_VERSION</code>
+       error to the client.</p></li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New 
Protocol Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to