This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a8c17e3 MINOR: Fix chunked down-conversion behavior when no valid
batch exists after conversion (#5173)
a8c17e3 is described below
commit a8c17e36c3239f33925b061d99a5d1d1074bbc67
Author: Dhruvil Shah <[email protected]>
AuthorDate: Thu Jun 14 23:00:33 2018 -0700
MINOR: Fix chunked down-conversion behavior when no valid batch exists
after conversion (#5173)
We might decide to drop certain message batches during down-conversion
because older clients might not be able to interpret them. One such example is
control batches which are typically removed by the broker if down-conversion to
V0 or V1 is required. This patch makes sure the chunked down-conversion
implementation is able to handle such cases.
---
.../common/record/LazyDownConversionRecords.java | 44 +++--
.../record/LazyDownConversionRecordsSend.java | 32 ++-
.../java/org/apache/kafka/common/utils/Utils.java | 11 ++
.../kafka/common/record/FileRecordsTest.java | 23 +--
.../record/LazyDownConversionRecordsTest.java | 214 +++++++++++----------
.../common/record/MemoryRecordsBuilderTest.java | 5 +-
docs/upgrade.html | 8 +
7 files changed, 174 insertions(+), 163 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index da14b5b..d58689d 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -57,13 +56,15 @@ public class LazyDownConversionRecords implements
BaseRecords {
// need to make sure that we are able to accommodate one full batch of
down-converted messages. The way we achieve
// this is by having sizeInBytes method factor in the size of the
first down-converted batch and return at least
// its size.
- AbstractIterator<? extends RecordBatch> it = records.batchIterator();
+ java.util.Iterator<ConvertedRecords> it = iterator(0);
if (it.hasNext()) {
- firstConvertedBatch =
RecordsUtil.downConvert(Collections.singletonList(it.peek()), toMagic,
firstOffset, time);
+ firstConvertedBatch = it.next();
sizeInBytes = Math.max(records.sizeInBytes(),
firstConvertedBatch.records().sizeInBytes());
} else {
+ // If there are no messages we got after down-conversion, make
sure we are able to send at least an overflow
+ // message to the consumer. Typically, the consumer would need to
increase the fetch size in such cases.
firstConvertedBatch = null;
- sizeInBytes = 0;
+ sizeInBytes =
LazyDownConversionRecordsSend.MIN_OVERFLOW_MESSAGE_LENGTH;
}
}
@@ -148,21 +149,28 @@ public class LazyDownConversionRecords implements
BaseRecords {
return convertedBatch;
}
- if (!batchIterator.hasNext())
- return allDone();
-
- // Figure out batches we should down-convert based on the size
constraints
- List<RecordBatch> batches = new ArrayList<>();
- boolean isFirstBatch = true;
- long sizeSoFar = 0;
- while (batchIterator.hasNext() &&
- (isFirstBatch || (batchIterator.peek().sizeInBytes() +
sizeSoFar) <= maximumReadSize)) {
- RecordBatch currentBatch = batchIterator.next();
- batches.add(currentBatch);
- sizeSoFar += currentBatch.sizeInBytes();
- isFirstBatch = false;
+ while (batchIterator.hasNext()) {
+ List<RecordBatch> batches = new ArrayList<>();
+ boolean isFirstBatch = true;
+ long sizeSoFar = 0;
+
+ // Figure out batches we should down-convert based on the size
constraints
+ while (batchIterator.hasNext() &&
+ (isFirstBatch || (batchIterator.peek().sizeInBytes() +
sizeSoFar) <= maximumReadSize)) {
+ RecordBatch currentBatch = batchIterator.next();
+ batches.add(currentBatch);
+ sizeSoFar += currentBatch.sizeInBytes();
+ isFirstBatch = false;
+ }
+ ConvertedRecords convertedRecords =
RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
+ // During conversion, it is possible that we drop certain
batches because they do not have an equivalent
+ // representation in the message format we want to convert to.
For example, V0 and V1 message formats
+ // have no notion of transaction markers which were introduced
in V2 so they get dropped during conversion.
+ // We return converted records only when we have at least one
valid batch of messages after conversion.
+ if (convertedRecords.records().sizeInBytes() > 0)
+ return convertedRecords;
}
- return RecordsUtil.downConvert(batches, toMagic, firstOffset,
time);
+ return allDone();
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index e60e1ed..f0fab7d 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
@@ -33,6 +32,7 @@ import java.util.Iterator;
public final class LazyDownConversionRecordsSend extends
RecordsSend<LazyDownConversionRecords> {
private static final Logger log =
LoggerFactory.getLogger(LazyDownConversionRecordsSend.class);
private static final int MAX_READ_SIZE = 128 * 1024;
+ static final int MIN_OVERFLOW_MESSAGE_LENGTH = Records.LOG_OVERHEAD;
private RecordConversionStats recordConversionStats;
private RecordsSend convertedRecordsWriter;
@@ -49,39 +49,31 @@ public final class LazyDownConversionRecordsSend extends
RecordsSend<LazyDownCon
public long writeTo(GatheringByteChannel channel, long previouslyWritten,
int remaining) throws IOException {
if (convertedRecordsWriter == null ||
convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
-
// Check if we have more chunks left to down-convert
if (convertedRecordsIterator.hasNext()) {
// Get next chunk of down-converted messages
ConvertedRecords<MemoryRecords> recordsAndStats =
convertedRecordsIterator.next();
convertedRecords = recordsAndStats.records();
-
- int sizeOfFirstConvertedBatch =
convertedRecords.batchIterator().next().sizeInBytes();
- if (previouslyWritten == 0 && sizeOfFirstConvertedBatch >
size())
- throw new EOFException("Unable to send first batch
completely." +
- " maximum_size: " + size() +
- " converted_records_size: " +
sizeOfFirstConvertedBatch);
-
recordConversionStats.add(recordsAndStats.recordConversionStats());
- log.debug("Got lazy converted records for partition {} with
length={}", topicPartition(), convertedRecords.sizeInBytes());
+ log.debug("Down-converted records for partition {} with
length={}", topicPartition(), convertedRecords.sizeInBytes());
} else {
- if (previouslyWritten == 0)
- throw new EOFException("Unable to get the first batch of
down-converted records");
-
- // We do not have any records left to down-convert. Construct
a "fake" message for the length remaining.
+ // We do not have any records left to down-convert. Construct
an overflow message for the length remaining.
// This message will be ignored by the consumer because its
length will be past the length of maximum
// possible response size.
// DefaultRecordBatch =>
// BaseOffset => Int64
// Length => Int32
// ...
- log.debug("Constructing fake message batch for partition {}
for remaining length={}", topicPartition(), remaining);
- ByteBuffer fakeMessageBatch =
ByteBuffer.allocate(Math.max(Records.LOG_OVERHEAD, Math.min(remaining + 1,
MAX_READ_SIZE)));
- fakeMessageBatch.putLong(-1L);
- fakeMessageBatch.putInt(remaining + 1);
- convertedRecords =
MemoryRecords.readableRecords(fakeMessageBatch);
- }
+ ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
+ Math.max(MIN_OVERFLOW_MESSAGE_LENGTH,
Math.min(remaining + 1, MAX_READ_SIZE)));
+ overflowMessageBatch.putLong(-1L);
+ // Fill in the length of the overflow batch. A valid batch
must be at least as long as the minimum batch
+ // overhead.
+ overflowMessageBatch.putInt(Math.max(remaining + 1,
DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
+ convertedRecords =
MemoryRecords.readableRecords(overflowMessageBatch);
+ log.debug("Constructed overflow message batch for partition {}
with length={}", topicPartition(), remaining);
+ }
convertedRecordsWriter = new DefaultRecordsSend(destination(),
convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
}
return convertedRecordsWriter.writeTo(channel);
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index ebe87ba..31fa01c 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -112,6 +112,17 @@ public final class Utils {
}
/**
+ * Read a UTF8 string from the current position till the end of a byte
buffer. The position of the byte buffer is
+ * not affected by this method.
+ *
+ * @param buffer The buffer to read from
+ * @return The UTF8 string
+ */
+ public static String utf8(ByteBuffer buffer) {
+ return utf8(buffer, buffer.remaining());
+ }
+
+ /**
* Read a UTF8 string from a byte buffer at a given offset. Note that the
position of the byte buffer
* is not affected by this method.
*
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index bbe84b2..f08652e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Before;
@@ -38,6 +37,7 @@ import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.utf8;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -430,10 +430,6 @@ public class FileRecordsTest {
}
}
- private String utf8(ByteBuffer buffer) {
- return Utils.utf8(buffer, buffer.remaining());
- }
-
private void downConvertAndVerifyRecords(List<SimpleRecord> initialRecords,
List<Long> initialOffsets,
FileRecords fileRecords,
@@ -441,13 +437,11 @@ public class FileRecordsTest {
byte toMagic,
long firstOffset,
Time time) {
- long numBatches = 0;
long minBatchSize = Long.MAX_VALUE;
long maxBatchSize = Long.MIN_VALUE;
for (RecordBatch batch : fileRecords.batches()) {
minBatchSize = Math.min(minBatchSize, batch.sizeInBytes());
maxBatchSize = Math.max(maxBatchSize, batch.sizeInBytes());
- numBatches++;
}
// Test the normal down-conversion path
@@ -469,21 +463,6 @@ public class FileRecordsTest {
Iterator<ConvertedRecords> it = lazyRecords.iterator(readSize);
while (it.hasNext())
convertedRecords.add(it.next().records());
-
- // Check if chunking works as expected. The only way to
predictably test for this is by testing the edge cases.
- // 1. If maximum read size is greater than the size of all batches
combined, we must get all down-conversion
- // records in exactly two batches; the first chunk is pre
down-converted and returned, and the second chunk
- // contains the remaining batches.
- // 2. If maximum read size is just smaller than the size of all
batches combined, we must get results in two
- // chunks.
- // 3. If maximum read size is less than the size of a single
record, we get one batch in each chunk.
- if (readSize >= fileRecords.sizeInBytes())
- assertEquals(2, convertedRecords.size());
- else if (readSize == fileRecords.sizeInBytes() - 1)
- assertEquals(2, convertedRecords.size());
- else if (readSize <= minBatchSize)
- assertEquals(numBatches, convertedRecords.size());
-
verifyConvertedRecords(initialRecords, initialOffsets,
convertedRecords, compressionType, toMagic);
convertedRecords.clear();
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
index 8765603..89c1aea 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -35,86 +34,126 @@ import java.util.Collection;
import java.util.List;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.utf8;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-@RunWith(value = Parameterized.class)
public class LazyDownConversionRecordsTest {
- private final CompressionType compressionType;
- private final byte toMagic;
- private final DownConversionTest test;
-
- public LazyDownConversionRecordsTest(CompressionType compressionType, byte
toMagic, DownConversionTest test) {
- this.compressionType = compressionType;
- this.toMagic = toMagic;
- this.test = test;
+ /**
+ * Test the lazy down-conversion path in the presence of commit markers.
When converting to V0 or V1, these batches
+ * are dropped. If there happen to be no more batches left to convert, we
must get an overflow message batch after
+ * conversion.
+ */
+ @Test
+ public void testConversionOfCommitMarker() throws IOException {
+ MemoryRecords recordsToConvert =
MemoryRecords.withEndTransactionMarker(0, Time.SYSTEM.milliseconds(),
RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ 1, (short) 1, new
EndTransactionMarker(ControlRecordType.COMMIT, 0));
+ MemoryRecords convertedRecords = convertRecords(recordsToConvert,
(byte) 1, recordsToConvert.sizeInBytes());
+ ByteBuffer buffer = convertedRecords.buffer();
+
+ // read the offset and the batch length
+ buffer.getLong();
+ int sizeOfConvertedRecords = buffer.getInt();
+
+ // assert we got an overflow message batch
+ assertTrue(sizeOfConvertedRecords > buffer.limit());
+ assertFalse(convertedRecords.batchIterator().hasNext());
}
- enum DownConversionTest {
- DEFAULT,
- OVERFLOW,
- }
+ @RunWith(value = Parameterized.class)
+ public static class ParameterizedConversionTest {
+ private final CompressionType compressionType;
+ private final byte toMagic;
- @Parameterized.Parameters(name = "compressionType={0}, toMagic={1},
test={2}")
- public static Collection<Object[]> data() {
- List<Object[]> values = new ArrayList<>();
- for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <=
RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
- for (DownConversionTest test : DownConversionTest.values()) {
- values.add(new Object[]{CompressionType.NONE, toMagic, test});
- values.add(new Object[]{CompressionType.GZIP, toMagic, test});
+ public ParameterizedConversionTest(CompressionType compressionType,
byte toMagic) {
+ this.compressionType = compressionType;
+ this.toMagic = toMagic;
+ }
+
+ @Parameterized.Parameters(name = "compressionType={0}, toMagic={1}")
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ for (byte toMagic = RecordBatch.MAGIC_VALUE_V0; toMagic <=
RecordBatch.CURRENT_MAGIC_VALUE; toMagic++) {
+ values.add(new Object[]{CompressionType.NONE, toMagic});
+ values.add(new Object[]{CompressionType.GZIP, toMagic});
}
+ return values;
}
- return values;
- }
- @Test
- public void doTestConversion() throws IOException {
- List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L,
24L);
-
- Header[] headers = {new RecordHeader("headerKey1",
"headerValue1".getBytes()),
- new RecordHeader("headerKey2",
"headerValue2".getBytes()),
- new RecordHeader("headerKey3",
"headerValue3".getBytes())};
-
- List<SimpleRecord> records = asList(
- new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
- new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
- new SimpleRecord(3L, "k3".getBytes(), "hello
again".getBytes()),
- new SimpleRecord(4L, "k4".getBytes(), "goodbye for
now".getBytes()),
- new SimpleRecord(5L, "k5".getBytes(), "hello
again".getBytes()),
- new SimpleRecord(6L, "k6".getBytes(), "I sense
indecision".getBytes()),
- new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
- new SimpleRecord(8L, "k8".getBytes(), "running
out".getBytes(), headers),
- new SimpleRecord(9L, "k9".getBytes(), "ok, almost
done".getBytes()),
- new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes(),
headers));
- assertEquals("incorrect test setup", offsets.size(), records.size());
-
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
- TimestampType.CREATE_TIME, 0L);
- for (int i = 0; i < 3; i++)
- builder.appendWithOffset(offsets.get(i), records.get(i));
- builder.close();
-
- builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L);
- for (int i = 3; i < 6; i++)
- builder.appendWithOffset(offsets.get(i), records.get(i));
- builder.close();
-
- builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L);
- for (int i = 6; i < 10; i++)
- builder.appendWithOffset(offsets.get(i), records.get(i));
- builder.close();
-
- buffer.flip();
+ /**
+ * Test the lazy down-conversion path.
+ */
+ @Test
+ public void testConversion() throws IOException {
+ doTestConversion(false);
+ }
+
+ /**
+ * Test the lazy down-conversion path where the number of bytes we
want to convert is much larger than the
+ * number of bytes we get after conversion. This causes overflow
message batch(es) to be appended towards the
+ * end of the converted output.
+ */
+ @Test
+ public void testConversionWithOverflow() throws IOException {
+ doTestConversion(true);
+ }
+ private void doTestConversion(boolean testConversionOverflow) throws
IOException {
+ List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L,
22L, 24L);
+
+ Header[] headers = {new RecordHeader("headerKey1",
"headerValue1".getBytes()),
+ new RecordHeader("headerKey2",
"headerValue2".getBytes()),
+ new RecordHeader("headerKey3",
"headerValue3".getBytes())};
+
+ List<SimpleRecord> records = asList(
+ new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
+ new SimpleRecord(2L, "k2".getBytes(),
"goodbye".getBytes()),
+ new SimpleRecord(3L, "k3".getBytes(), "hello
again".getBytes()),
+ new SimpleRecord(4L, "k4".getBytes(), "goodbye for
now".getBytes()),
+ new SimpleRecord(5L, "k5".getBytes(), "hello
again".getBytes()),
+ new SimpleRecord(6L, "k6".getBytes(), "I sense
indecision".getBytes()),
+ new SimpleRecord(7L, "k7".getBytes(), "what
now".getBytes()),
+ new SimpleRecord(8L, "k8".getBytes(), "running
out".getBytes(), headers),
+ new SimpleRecord(9L, "k9".getBytes(), "ok, almost
done".getBytes()),
+ new SimpleRecord(10L, "k10".getBytes(),
"finally".getBytes(), headers));
+ assertEquals("incorrect test setup", offsets.size(),
records.size());
+
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+ TimestampType.CREATE_TIME, 0L);
+ for (int i = 0; i < 3; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L);
+ for (int i = 3; i < 6; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer,
RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L);
+ for (int i = 6; i < 10; i++)
+ builder.appendWithOffset(offsets.get(i), records.get(i));
+ builder.close();
+ buffer.flip();
+
+ MemoryRecords recordsToConvert =
MemoryRecords.readableRecords(buffer);
+ int numBytesToConvert = recordsToConvert.sizeInBytes();
+ if (testConversionOverflow)
+ numBytesToConvert *= 2;
+
+ MemoryRecords convertedRecords = convertRecords(recordsToConvert,
toMagic, numBytesToConvert);
+ verifyDownConvertedRecords(records, offsets, convertedRecords,
compressionType, toMagic);
+ }
+ }
+
+ private static MemoryRecords convertRecords(MemoryRecords
recordsToConvert, byte toMagic, int bytesToConvert) throws IOException {
try (FileRecords inputRecords = FileRecords.open(tempFile())) {
- MemoryRecords memoryRecords =
MemoryRecords.readableRecords(buffer);
- inputRecords.append(memoryRecords);
+ inputRecords.append(recordsToConvert);
inputRecords.flush();
LazyDownConversionRecords lazyRecords = new
LazyDownConversionRecords(new TopicPartition("test", 1),
@@ -123,50 +162,27 @@ public class LazyDownConversionRecordsTest {
File outputFile = tempFile();
FileChannel channel = new RandomAccessFile(outputFile,
"rw").getChannel();
- // Size of lazy records is at least as much as the size of
underlying records
- assertTrue(lazyRecords.sizeInBytes() >=
inputRecords.sizeInBytes());
-
- int toWrite;
int written = 0;
- List<SimpleRecord> recordsBeingConverted;
- List<Long> offsetsOfRecords;
- switch (test) {
- case DEFAULT:
- toWrite = inputRecords.sizeInBytes();
- recordsBeingConverted = records;
- offsetsOfRecords = offsets;
- break;
- case OVERFLOW:
- toWrite = inputRecords.sizeInBytes() * 2;
- recordsBeingConverted = records;
- offsetsOfRecords = offsets;
- break;
- default:
- throw new IllegalArgumentException();
- }
- while (written < toWrite)
- written += lazySend.writeTo(channel, written, toWrite -
written);
+ while (written < bytesToConvert)
+ written += lazySend.writeTo(channel, written, bytesToConvert -
written);
FileRecords convertedRecords = FileRecords.open(outputFile, true,
(int) channel.size(), false);
ByteBuffer convertedRecordsBuffer =
ByteBuffer.allocate(convertedRecords.sizeInBytes());
convertedRecords.readInto(convertedRecordsBuffer, 0);
- MemoryRecords convertedMemoryRecords =
MemoryRecords.readableRecords(convertedRecordsBuffer);
- verifyDownConvertedRecords(recordsBeingConverted,
offsetsOfRecords, convertedMemoryRecords, compressionType, toMagic);
+ // cleanup
convertedRecords.close();
channel.close();
- }
- }
- private String utf8(ByteBuffer buffer) {
- return Utils.utf8(buffer, buffer.remaining());
+ return MemoryRecords.readableRecords(convertedRecordsBuffer);
+ }
}
- private void verifyDownConvertedRecords(List<SimpleRecord> initialRecords,
- List<Long> initialOffsets,
- MemoryRecords downConvertedRecords,
- CompressionType compressionType,
- byte toMagic) {
+ private static void verifyDownConvertedRecords(List<SimpleRecord>
initialRecords,
+ List<Long> initialOffsets,
+ MemoryRecords
downConvertedRecords,
+ CompressionType
compressionType,
+ byte toMagic) {
int i = 0;
for (RecordBatch batch : downConvertedRecords.batches()) {
assertTrue("Magic byte should be lower than or equal to " +
toMagic, batch.magic() <= toMagic);
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 36b14a2..5d5221e 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
+import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -568,10 +569,6 @@ public class MemoryRecordsBuilderTest {
}
}
- private String utf8(ByteBuffer buffer) {
- return Utils.utf8(buffer, buffer.remaining());
- }
-
@Test
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws
Exception {
ByteBuffer buffer = ByteBuffer.allocate(128);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 6119536..89c90d1 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -119,6 +119,14 @@
</li>
<li><a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+Prefixed+ACLs">KIP-290</a>
adds the ability
to define ACLs on prefixed resources, e.g. any topic starting with
'foo'.</li>
+ <li><a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion">KIP-283</a>
improves message down-conversion
+ handling on Kafka broker, which has typically been a memory-intensive
operation. The KIP adds a mechanism by which the operation becomes less memory
intensive
+ by down-converting chunks of partition data at a time which helps put
an upper bound on memory consumption. With this improvement, there is a change
in
+ <code>FetchResponse</code> protocol behavior where the broker could
send an oversized message batch towards the end of the response with an invalid
offset.
+ Such oversized messages must be ignored by consumer clients, as is
done by <code>KafkaConsumer</code>.
+ <p>KIP-283 also adds new topic and broker configurations
<code>message.downconversion.enable</code> and
<code>log.message.downconversion.enable</code> respectively
+ to control whether down-conversion is enabled. When disabled, broker
does not perform any down-conversion and instead sends an
<code>UNSUPPORTED_VERSION</code>
+ error to the client.</p></li>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New
Protocol Versions</a></h5>
--
To stop receiving notification emails like this one, please contact
[email protected].