Repository: kafka Updated Branches: refs/heads/trunk 3930dd7e7 -> 0f86dbe89
MINOR: Support auto-incrementing offsets in MemoryRecordsBuilder Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #2282 from hachikuji/builder-autoincrement-offsets Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0f86dbe8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0f86dbe8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0f86dbe8 Branch: refs/heads/trunk Commit: 0f86dbe89da19ed1cc9142a5362cfa2fe3bc48ee Parents: 3930dd7 Author: Jason Gustafson <ja...@confluent.io> Authored: Wed Dec 21 00:07:10 2016 +0000 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Wed Dec 21 00:07:10 2016 +0000 ---------------------------------------------------------------------- .../clients/producer/internals/RecordBatch.java | 3 +- .../kafka/common/record/MemoryRecords.java | 9 ++- .../common/record/MemoryRecordsBuilder.java | 58 +++++++++++------ .../clients/consumer/KafkaConsumerTest.java | 4 +- .../clients/consumer/internals/FetcherTest.java | 22 +++---- .../record/ByteBufferLogInputStreamTest.java | 12 ++-- .../common/record/MemoryRecordsBuilderTest.java | 66 +++++++++++++------- .../kafka/common/record/MemoryRecordsTest.java | 34 +++++----- .../src/main/scala/kafka/log/LogValidator.scala | 2 +- 9 files changed, 131 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index e9ef441..68b27d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -45,7 +45,6 @@ public final class RecordBatch { public final ProduceRequestResult produceFuture; public long lastAppendTime; private final List<Thunk> thunks; - private long offsetCounter = 0L; private boolean retry; private final MemoryRecordsBuilder recordsBuilder; @@ -69,7 +68,7 @@ public final class RecordBatch { if (!recordsBuilder.hasRoomFor(key, value)) { return null; } else { - long checksum = this.recordsBuilder.append(offsetCounter++, timestamp, key, value); + long checksum = this.recordsBuilder.append(timestamp, key, value); this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 1485486..65d91c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -304,6 +304,13 @@ public class MemoryRecords extends AbstractRecords { } public static MemoryRecordsBuilder builder(ByteBuffer buffer, + CompressionType compressionType, + TimestampType timestampType, + long baseOffset) { + return builder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset, System.currentTimeMillis()); + } + + public static MemoryRecordsBuilder builder(ByteBuffer buffer, byte magic, CompressionType compressionType, TimestampType timestampType, @@ -390,7 +397,7 @@ public class MemoryRecords extends AbstractRecords { MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, firstOffset, logAppendTime); for (LogEntry entry : entries) - builder.append(entry); + builder.appendWithOffset(entry.offset(), entry.record()); return builder; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index b90a9e6..d60861b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -164,6 +164,8 @@ public class MemoryRecordsBuilder { public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) return new RecordsInfo(logAppendTime, lastOffset); + else if (maxTimestamp == Record.NO_TIMESTAMP) + return new RecordsInfo(Record.NO_TIMESTAMP, lastOffset); else return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset); } @@ -208,16 +210,16 @@ public class MemoryRecordsBuilder { } /** - * Append a new record and offset to the buffer + * Append a new record at the given offset. * @param offset The absolute offset of the record in the log buffer * @param timestamp The record timestamp * @param key The record key * @param value The record value * @return crc of the record */ - public long append(long offset, long timestamp, byte[] key, byte[] value) { + public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { try { - if (lastOffset > 0 && offset <= lastOffset) + if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); int size = Record.recordSize(magic, key, value); @@ -234,17 +236,37 @@ public class MemoryRecordsBuilder { } /** - * Add the record, converting to the desired magic value if necessary. + * Append a new record at the next consecutive offset. If no records have been appended yet, use the base + * offset of this builder. + * @param timestamp The record timestamp + * @param key The record key + * @param value The record value + * @return crc of the record + */ + public long append(long timestamp, byte[] key, byte[] value) { + return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, timestamp, key, value); + } + + /** + * Add the record at the next consecutive offset, converting to the desired magic value if necessary. + * @param record The record to add + */ + public void convertAndAppend(Record record) { + convertAndAppendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record); + } + + /** + * Add the record at the given offset, converting to the desired magic value if necessary. * @param offset The offset of the record * @param record The record to add */ - public void convertAndAppend(long offset, Record record) { + public void convertAndAppendWithOffset(long offset, Record record) { if (magic == record.magic()) { - append(offset, record); + appendWithOffset(offset, record); return; } - if (lastOffset > 0 && offset <= lastOffset) + if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); try { @@ -278,28 +300,28 @@ public class MemoryRecordsBuilder { } /** - * Append the given log entry. The entry's record must have a magic which matches the magic use to - * construct this builder and the offset must be greater than the last appended entry. - * @param entry The entry to append - */ - public void append(LogEntry entry) { - append(entry.offset(), entry.record()); - } - - /** * Add a record with a given offset. The record must have a magic which matches the magic use to * construct this builder and the offset must be greater than the last appended entry. * @param offset The offset of the record * @param record The record to add */ - public void append(long offset, Record record) { + public void appendWithOffset(long offset, Record record) { if (record.magic() != magic) throw new IllegalArgumentException("Inner log entries must have matching magic values as the wrapper"); - if (lastOffset > 0 && offset <= lastOffset) + if (lastOffset >= 0 && offset <= lastOffset) throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); appendUnchecked(offset, record); } + /** + * Append the record at the next consecutive offset. If no records have been appended yet, use the base + * offset of this builder. + * @param record The record to add + */ + public void append(Record record) { + appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record); + } + private long toInnerOffset(long offset) { // use relative offsets for compressed messages with magic v1 if (magic > 0 && compressionType != CompressionType.NONE) http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index a4386f8..8240d05 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1325,9 +1325,9 @@ public class KafkaConsumerTest { TopicPartition partition = fetchEntry.getKey(); long fetchOffset = fetchEntry.getValue().offset; int fetchCount = fetchEntry.getValue().count; - MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, fetchOffset); for (int i = 0; i < fetchCount; i++) - records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); + records.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes()); tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build())); } return new FetchResponse(tpResponses, 0); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 15075cb..272a5ee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -106,15 +106,15 @@ public class FetcherTest { metadata.update(cluster, time.milliseconds()); client.setNode(node); - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - builder.append(1L, 0L, "key".getBytes(), "value-1".getBytes()); - builder.append(2L, 0L, "key".getBytes(), "value-2".getBytes()); - builder.append(3L, 0L, "key".getBytes(), "value-3".getBytes()); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L); + builder.append(0L, "key".getBytes(), "value-1".getBytes()); + builder.append(0L, "key".getBytes(), "value-2".getBytes()); + builder.append(0L, "key".getBytes(), "value-3".getBytes()); records = builder.build(); - builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - builder.append(4L, 0L, "key".getBytes(), "value-4".getBytes()); - builder.append(5L, 0L, "key".getBytes(), "value-5".getBytes()); + builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 4L); + builder.append(0L, "key".getBytes(), "value-4".getBytes()); + builder.append(0L, "key".getBytes(), "value-5".getBytes()); nextRecords = builder.build(); } @@ -293,9 +293,9 @@ public class FetcherTest { // this test verifies the fetcher updates the current fetched/consumed positions correctly for this case MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - builder.append(15L, 0L, "key".getBytes(), "value-1".getBytes()); - builder.append(20L, 0L, "key".getBytes(), "value-2".getBytes()); - builder.append(30L, 0L, "key".getBytes(), "value-3".getBytes()); + builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes()); + builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes()); + builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes()); MemoryRecords records = builder.build(); List<ConsumerRecord<byte[], byte[]>> consumerRecords; @@ -618,7 +618,7 @@ public class FetcherTest { if (i > 1) { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); for (int v = 0; v < 3; v++) { - builder.append((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); } this.records = builder.build(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java index c8621cd..0fad9a4 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java @@ -31,8 +31,8 @@ public class ByteBufferLogInputStreamTest { public void iteratorIgnoresIncompleteEntries() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 20L, "b".getBytes(), "2".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); + builder.append(20L, "b".getBytes(), "2".getBytes()); ByteBuffer recordsBuffer = builder.build().buffer(); recordsBuffer.limit(recordsBuffer.limit() - 5); @@ -49,7 +49,7 @@ public class ByteBufferLogInputStreamTest { public void testSetCreateTimeV1() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); @@ -66,7 +66,7 @@ public class ByteBufferLogInputStreamTest { public void testSetCreateTimeNotAllowedV0() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); @@ -80,7 +80,7 @@ public class ByteBufferLogInputStreamTest { public void testSetLogAppendTimeV1() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); @@ -97,7 +97,7 @@ public class ByteBufferLogInputStreamTest { public void testSetLogAppendTimeNotAllowedV0() { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L); - builder.append(0L, 15L, "a".getBytes(), "1".getBytes()); + builder.append(15L, "a".getBytes(), "1".getBytes()); Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = builder.build().shallowEntries().iterator(); assertTrue(iterator.hasNext()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index a52976b..034faf6 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -55,10 +55,9 @@ public class MemoryRecordsBuilderTest { TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); int uncompressedSize = 0; - long offset = 0L; for (Record record : records) { uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; - builder.append(offset++, record); + builder.append(record); } MemoryRecords built = builder.build(); @@ -86,10 +85,9 @@ public class MemoryRecordsBuilderTest { TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity()); int uncompressedSize = 0; - long offset = 0L; for (Record record : records) { uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; - builder.append(offset++, record); + builder.append(record); } MemoryRecords built = builder.build(); @@ -110,9 +108,9 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity()); - builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 0L, "b".getBytes(), "2".getBytes()); - builder.append(2L, 0L, "c".getBytes(), "3".getBytes()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(0L, "b".getBytes(), "2".getBytes()); + builder.append(0L, "c".getBytes(), "3".getBytes()); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); @@ -135,14 +133,13 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity()); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); - builder.convertAndAppend(1L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); - builder.convertAndAppend(2L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(logAppendTime, info.maxTimestamp); - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (Record record : records.records()) { @@ -159,9 +156,9 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); - builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 2L, "b".getBytes(), "2".getBytes()); - builder.append(2L, 1L, "c".getBytes(), "3".getBytes()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(2L, "b".getBytes(), "2".getBytes()); + builder.append(1L, "c".getBytes(), "3".getBytes()); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); @@ -188,11 +185,11 @@ public class MemoryRecordsBuilderTest { long logAppendTime = System.currentTimeMillis(); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); - builder.append(0L, 0L, "a".getBytes(), "1".getBytes()); - builder.append(1L, 1L, "b".getBytes(), "2".getBytes()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(1L, "b".getBytes(), "2".getBytes()); assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes())); - builder.append(2L, 2L, "c".getBytes(), "3".getBytes()); + builder.append(2L, "c".getBytes(), "3".getBytes()); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); @@ -206,6 +203,33 @@ public class MemoryRecordsBuilderTest { } } + @Test(expected = IllegalArgumentException.class) + public void testAppendAtInvalidOffset() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + + builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null); + + // offsets must increase monotonically + builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null); + } + + @Test(expected = IllegalArgumentException.class) + public void testAppendWithInvalidMagic() { + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.position(bufferOffset); + + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, + TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); + + builder.append(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), null)); + } + @Test public void convertUsingCreateTime() { ByteBuffer buffer = ByteBuffer.allocate(1024); @@ -215,14 +239,14 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity()); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); - builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes())); + builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes())); MemoryRecords records = builder.build(); MemoryRecordsBuilder.RecordsInfo info = builder.info(); assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp); - assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); for (Record record : records.records()) { assertEquals(TimestampType.CREATE_TIME, record.timestampType()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index f2741ee..9c8ca7f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -60,8 +60,8 @@ public class MemoryRecordsTest { for (int i = 0; i < list.size(); i++) { Record r = list.get(i); - builder1.append(firstOffset + i, r); - builder2.append(firstOffset + i, i + 1, toNullableArray(r.key()), toNullableArray(r.value())); + builder1.append(r); + builder2.append(i + 1, toNullableArray(r.key()), toNullableArray(r.value())); } MemoryRecords recs1 = builder1.build(); @@ -85,7 +85,7 @@ public class MemoryRecordsTest { @Test public void testHasRoomForMethod() { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME); - builder.append(0, Record.create(magic, 0L, "a".getBytes(), "1".getBytes())); + builder.append(Record.create(magic, 0L, "a".getBytes(), "1".getBytes())); assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes())); builder.close(); @@ -95,23 +95,23 @@ public class MemoryRecordsTest { @Test public void testFilterTo() { ByteBuffer buffer = ByteBuffer.allocate(2048); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME); - builder.append(0L, 10L, null, "a".getBytes()); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L); + builder.append(10L, null, "a".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L); - builder.append(1L, 11L, "1".getBytes(), "b".getBytes()); - builder.append(2L, 12L, null, "c".getBytes()); + builder.append(11L, "1".getBytes(), "b".getBytes()); + builder.append(12L, null, "c".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L); - builder.append(3L, 13L, null, "d".getBytes()); - builder.append(4L, 20L, "4".getBytes(), "e".getBytes()); - builder.append(5L, 15L, "5".getBytes(), "f".getBytes()); + builder.append(13L, null, "d".getBytes()); + builder.append(20L, "4".getBytes(), "e".getBytes()); + builder.append(15L, "5".getBytes(), "f".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L); - builder.append(6L, 16L, "6".getBytes(), "g".getBytes()); + builder.append(16L, "6".getBytes(), "g".getBytes()); builder.close(); buffer.flip(); @@ -175,18 +175,18 @@ public class MemoryRecordsTest { ByteBuffer buffer = ByteBuffer.allocate(2048); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 0L, logAppendTime); - builder.append(0L, 10L, null, "a".getBytes()); + builder.append(10L, null, "a".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime); - builder.append(1L, 11L, "1".getBytes(), "b".getBytes()); - builder.append(2L, 12L, null, "c".getBytes()); + builder.append(11L, "1".getBytes(), "b".getBytes()); + builder.append(12L, null, "c".getBytes()); builder.close(); builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime); - builder.append(3L, 13L, null, "d".getBytes()); - builder.append(4L, 14L, "4".getBytes(), "e".getBytes()); - builder.append(5L, 15L, "5".getBytes(), "f".getBytes()); + builder.append(13L, null, "d".getBytes()); + builder.append(14L, "4".getBytes(), "e".getBytes()); + builder.append(15L, "5".getBytes(), "f".getBytes()); builder.close(); buffer.flip(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 1713942..d99c2ad 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -156,7 +156,7 @@ private[kafka] object LogValidator { val record = logEntry.record validateKey(record, compactedTopic) validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs) - builder.convertAndAppend(offsetCounter.getAndIncrement(), record) + builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), record) } builder.close()