Repository: kafka
Updated Branches:
  refs/heads/trunk 3930dd7e7 -> 0f86dbe89


MINOR: Support auto-incrementing offsets in MemoryRecordsBuilder

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #2282 from hachikuji/builder-autoincrement-offsets


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0f86dbe8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0f86dbe8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0f86dbe8

Branch: refs/heads/trunk
Commit: 0f86dbe89da19ed1cc9142a5362cfa2fe3bc48ee
Parents: 3930dd7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Dec 21 00:07:10 2016 +0000
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Wed Dec 21 00:07:10 2016 +0000

----------------------------------------------------------------------
 .../clients/producer/internals/RecordBatch.java |  3 +-
 .../kafka/common/record/MemoryRecords.java      |  9 ++-
 .../common/record/MemoryRecordsBuilder.java     | 58 +++++++++++------
 .../clients/consumer/KafkaConsumerTest.java     |  4 +-
 .../clients/consumer/internals/FetcherTest.java | 22 +++----
 .../record/ByteBufferLogInputStreamTest.java    | 12 ++--
 .../common/record/MemoryRecordsBuilderTest.java | 66 +++++++++++++-------
 .../kafka/common/record/MemoryRecordsTest.java  | 34 +++++-----
 .../src/main/scala/kafka/log/LogValidator.scala |  2 +-
 9 files changed, 131 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index e9ef441..68b27d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -45,7 +45,6 @@ public final class RecordBatch {
     public final ProduceRequestResult produceFuture;
     public long lastAppendTime;
     private final List<Thunk> thunks;
-    private long offsetCounter = 0L;
     private boolean retry;
     private final MemoryRecordsBuilder recordsBuilder;
 
@@ -69,7 +68,7 @@ public final class RecordBatch {
         if (!recordsBuilder.hasRoomFor(key, value)) {
             return null;
         } else {
-            long checksum = this.recordsBuilder.append(offsetCounter++, 
timestamp, key, value);
+            long checksum = this.recordsBuilder.append(timestamp, key, value);
             this.maxRecordSize = Math.max(this.maxRecordSize, 
Record.recordSize(key, value));
             this.lastAppendTime = now;
             FutureRecordMetadata future = new 
FutureRecordMetadata(this.produceFuture, this.recordCount,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 1485486..65d91c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -304,6 +304,13 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               long baseOffset) {
+        return builder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, 
timestampType, baseOffset, System.currentTimeMillis());
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
                                                byte magic,
                                                CompressionType compressionType,
                                                TimestampType timestampType,
@@ -390,7 +397,7 @@ public class MemoryRecords extends AbstractRecords {
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
compressionType, timestampType,
                 firstOffset, logAppendTime);
         for (LogEntry entry : entries)
-            builder.append(entry);
+            builder.appendWithOffset(entry.offset(), entry.record());
 
         return builder;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index b90a9e6..d60861b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -164,6 +164,8 @@ public class MemoryRecordsBuilder {
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             return new RecordsInfo(logAppendTime,  lastOffset);
+        else if (maxTimestamp == Record.NO_TIMESTAMP)
+            return new RecordsInfo(Record.NO_TIMESTAMP, lastOffset);
         else
             return new RecordsInfo(maxTimestamp, compressionType == 
CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
     }
@@ -208,16 +210,16 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Append a new record and offset to the buffer
+     * Append a new record at the given offset.
      * @param offset The absolute offset of the record in the log buffer
      * @param timestamp The record timestamp
      * @param key The record key
      * @param value The record value
      * @return crc of the record
      */
-    public long append(long offset, long timestamp, byte[] key, byte[] value) {
+    public long appendWithOffset(long offset, long timestamp, byte[] key, 
byte[] value) {
         try {
-            if (lastOffset > 0 && offset <= lastOffset)
+            if (lastOffset >= 0 && offset <= lastOffset)
                 throw new IllegalArgumentException(String.format("Illegal 
offset %s following previous offset %s (Offsets must increase monotonically).", 
offset, lastOffset));
 
             int size = Record.recordSize(magic, key, value);
@@ -234,17 +236,37 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Add the record, converting to the desired magic value if necessary.
+     * Append a new record at the next consecutive offset. If no records have 
been appended yet, use the base
+     * offset of this builder.
+     * @param timestamp The record timestamp
+     * @param key The record key
+     * @param value The record value
+     * @return crc of the record
+     */
+    public long append(long timestamp, byte[] key, byte[] value) {
+        return appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, 
timestamp, key, value);
+    }
+
+    /**
+     * Add the record at the next consecutive offset, converting to the 
desired magic value if necessary.
+     * @param record The record to add
+     */
+    public void convertAndAppend(Record record) {
+        convertAndAppendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 
1, record);
+    }
+
+    /**
+     * Add the record at the given offset, converting to the desired magic 
value if necessary.
      * @param offset The offset of the record
      * @param record The record to add
      */
-    public void convertAndAppend(long offset, Record record) {
+    public void convertAndAppendWithOffset(long offset, Record record) {
         if (magic == record.magic()) {
-            append(offset, record);
+            appendWithOffset(offset, record);
             return;
         }
 
-        if (lastOffset > 0 && offset <= lastOffset)
+        if (lastOffset >= 0 && offset <= lastOffset)
             throw new IllegalArgumentException(String.format("Illegal offset 
%s following previous offset %s (Offsets must increase monotonically).", 
offset, lastOffset));
 
         try {
@@ -278,28 +300,28 @@ public class MemoryRecordsBuilder {
     }
 
     /**
-     * Append the given log entry. The entry's record must have a magic which 
matches the magic use to
-     * construct this builder and the offset must be greater than the last 
appended entry.
-     * @param entry The entry to append
-     */
-    public void append(LogEntry entry) {
-        append(entry.offset(), entry.record());
-    }
-
-    /**
      * Add a record with a given offset. The record must have a magic which 
matches the magic use to
      * construct this builder and the offset must be greater than the last 
appended entry.
      * @param offset The offset of the record
      * @param record The record to add
      */
-    public void append(long offset, Record record) {
+    public void appendWithOffset(long offset, Record record) {
         if (record.magic() != magic)
             throw new IllegalArgumentException("Inner log entries must have 
matching magic values as the wrapper");
-        if (lastOffset > 0 && offset <= lastOffset)
+        if (lastOffset >= 0 && offset <= lastOffset)
             throw new IllegalArgumentException(String.format("Illegal offset 
%s following previous offset %s (Offsets must increase monotonically).", 
offset, lastOffset));
         appendUnchecked(offset, record);
     }
 
+    /**
+     * Append the record at the next consecutive offset. If no records have 
been appended yet, use the base
+     * offset of this builder.
+     * @param record The record to add
+     */
+    public void append(Record record) {
+        appendWithOffset(lastOffset < 0 ? baseOffset : lastOffset + 1, record);
+    }
+
     private long toInnerOffset(long offset) {
         // use relative offsets for compressed messages with magic v1
         if (magic > 0 && compressionType != CompressionType.NONE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a4386f8..8240d05 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1325,9 +1325,9 @@ public class KafkaConsumerTest {
             TopicPartition partition = fetchEntry.getKey();
             long fetchOffset = fetchEntry.getValue().offset;
             int fetchCount = fetchEntry.getValue().count;
-            MemoryRecordsBuilder records = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME);
+            MemoryRecordsBuilder records = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, fetchOffset);
             for (int i = 0; i < fetchCount; i++)
-                records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), 
("value-" + i).getBytes());
+                records.append(0L, ("key-" + i).getBytes(), ("value-" + 
i).getBytes());
             tpResponses.put(partition, new 
FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build()));
         }
         return new FetchResponse(tpResponses, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 15075cb..272a5ee 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -106,15 +106,15 @@ public class FetcherTest {
         metadata.update(cluster, time.milliseconds());
         client.setNode(node);
 
-        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME);
-        builder.append(1L, 0L, "key".getBytes(), "value-1".getBytes());
-        builder.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
-        builder.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, 1L);
+        builder.append(0L, "key".getBytes(), "value-1".getBytes());
+        builder.append(0L, "key".getBytes(), "value-2".getBytes());
+        builder.append(0L, "key".getBytes(), "value-3".getBytes());
         records = builder.build();
 
-        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME);
-        builder.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
-        builder.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
+        builder = MemoryRecords.builder(ByteBuffer.allocate(1024), 
CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
+        builder.append(0L, "key".getBytes(), "value-4".getBytes());
+        builder.append(0L, "key".getBytes(), "value-5".getBytes());
         nextRecords = builder.build();
     }
 
@@ -293,9 +293,9 @@ public class FetcherTest {
         // this test verifies the fetcher updates the current fetched/consumed 
positions correctly for this case
 
         MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME);
-        builder.append(15L, 0L, "key".getBytes(), "value-1".getBytes());
-        builder.append(20L, 0L, "key".getBytes(), "value-2".getBytes());
-        builder.append(30L, 0L, "key".getBytes(), "value-3".getBytes());
+        builder.appendWithOffset(15L, 0L, "key".getBytes(), 
"value-1".getBytes());
+        builder.appendWithOffset(20L, 0L, "key".getBytes(), 
"value-2".getBytes());
+        builder.appendWithOffset(30L, 0L, "key".getBytes(), 
"value-3".getBytes());
         MemoryRecords records = builder.build();
 
         List<ConsumerRecord<byte[], byte[]>> consumerRecords;
@@ -618,7 +618,7 @@ public class FetcherTest {
             if (i > 1) {
                 MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME);
                 for (int v = 0; v < 3; v++) {
-                    builder.append((long) i * 3 + v, Record.NO_TIMESTAMP, 
"key".getBytes(), String.format("value-%d", v).getBytes());
+                    builder.appendWithOffset((long) i * 3 + v, 
Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes());
                 }
                 this.records = builder.build();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
index c8621cd..0fad9a4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferLogInputStreamTest.java
@@ -31,8 +31,8 @@ public class ByteBufferLogInputStreamTest {
     public void iteratorIgnoresIncompleteEntries() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
-        builder.append(1L, 20L, "b".getBytes(), "2".getBytes());
+        builder.append(15L, "a".getBytes(), "1".getBytes());
+        builder.append(20L, "b".getBytes(), "2".getBytes());
 
         ByteBuffer recordsBuffer = builder.build().buffer();
         recordsBuffer.limit(recordsBuffer.limit() - 5);
@@ -49,7 +49,7 @@ public class ByteBufferLogInputStreamTest {
     public void testSetCreateTimeV1() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        builder.append(15L, "a".getBytes(), "1".getBytes());
         Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = 
builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
@@ -66,7 +66,7 @@ public class ByteBufferLogInputStreamTest {
     public void testSetCreateTimeNotAllowedV0() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        builder.append(15L, "a".getBytes(), "1".getBytes());
         Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = 
builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
@@ -80,7 +80,7 @@ public class ByteBufferLogInputStreamTest {
     public void testSetLogAppendTimeV1() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Record.MAGIC_VALUE_V1, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        builder.append(15L, "a".getBytes(), "1".getBytes());
         Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = 
builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());
@@ -97,7 +97,7 @@ public class ByteBufferLogInputStreamTest {
     public void testSetLogAppendTimeNotAllowedV0() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
Record.MAGIC_VALUE_V0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
-        builder.append(0L, 15L, "a".getBytes(), "1".getBytes());
+        builder.append(15L, "a".getBytes(), "1".getBytes());
         Iterator<ByteBufferLogInputStream.ByteBufferLogEntry> iterator = 
builder.build().shallowEntries().iterator();
 
         assertTrue(iterator.hasNext());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index a52976b..034faf6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -55,10 +55,9 @@ public class MemoryRecordsBuilderTest {
                 TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
 
         int uncompressedSize = 0;
-        long offset = 0L;
         for (Record record : records) {
             uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
-            builder.append(offset++, record);
+            builder.append(record);
         }
 
         MemoryRecords built = builder.build();
@@ -86,10 +85,9 @@ public class MemoryRecordsBuilderTest {
                 TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
 
         int uncompressedSize = 0;
-        long offset = 0L;
         for (Record record : records) {
             uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
-            builder.append(offset++, record);
+            builder.append(record);
         }
 
         MemoryRecords built = builder.build();
@@ -110,9 +108,9 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
                 TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, 
buffer.capacity());
-        builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
-        builder.append(1L, 0L, "b".getBytes(), "2".getBytes());
-        builder.append(2L, 0L, "c".getBytes(), "3".getBytes());
+        builder.append(0L, "a".getBytes(), "1".getBytes());
+        builder.append(0L, "b".getBytes(), "2".getBytes());
+        builder.append(0L, "c".getBytes(), "3".getBytes());
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
@@ -135,14 +133,13 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
                 TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, 
buffer.capacity());
 
-        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, 
"a".getBytes(), "1".getBytes()));
-        builder.convertAndAppend(1L, Record.create(Record.MAGIC_VALUE_V0, 0L, 
"b".getBytes(), "2".getBytes()));
-        builder.convertAndAppend(2L, Record.create(Record.MAGIC_VALUE_V0, 0L, 
"c".getBytes(), "3".getBytes()));
+        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"a".getBytes(), "1".getBytes()));
+        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"b".getBytes(), "2".getBytes()));
+        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"c".getBytes(), "3".getBytes()));
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         assertEquals(logAppendTime, info.maxTimestamp);
-
         assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
         for (Record record : records.records()) {
@@ -159,9 +156,9 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, 
buffer.capacity());
-        builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
-        builder.append(1L, 2L, "b".getBytes(), "2".getBytes());
-        builder.append(2L, 1L, "c".getBytes(), "3".getBytes());
+        builder.append(0L, "a".getBytes(), "1".getBytes());
+        builder.append(2L, "b".getBytes(), "2".getBytes());
+        builder.append(1L, "c".getBytes(), "3".getBytes());
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
@@ -188,11 +185,11 @@ public class MemoryRecordsBuilderTest {
         long logAppendTime = System.currentTimeMillis();
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, 
buffer.capacity());
-        builder.append(0L, 0L, "a".getBytes(), "1".getBytes());
-        builder.append(1L, 1L, "b".getBytes(), "2".getBytes());
+        builder.append(0L, "a".getBytes(), "1".getBytes());
+        builder.append(1L, "b".getBytes(), "2".getBytes());
 
         assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes()));
-        builder.append(2L, 2L, "c".getBytes(), "3".getBytes());
+        builder.append(2L, "c".getBytes(), "3".getBytes());
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
@@ -206,6 +203,33 @@ public class MemoryRecordsBuilderTest {
         }
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testAppendAtInvalidOffset() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, 
buffer.capacity());
+
+        builder.appendWithOffset(0L, System.currentTimeMillis(), 
"a".getBytes(), null);
+
+        // offsets must increase monotonically
+        builder.appendWithOffset(0L, System.currentTimeMillis(), 
"b".getBytes(), null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAppendWithInvalidMagic() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        buffer.position(bufferOffset);
+
+        long logAppendTime = System.currentTimeMillis();
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
+                TimestampType.CREATE_TIME, 0L, logAppendTime, 
buffer.capacity());
+
+        builder.append(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"a".getBytes(), null));
+    }
+
     @Test
     public void convertUsingCreateTime() {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
@@ -215,14 +239,14 @@ public class MemoryRecordsBuilderTest {
         MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
Record.MAGIC_VALUE_V1, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, 
buffer.capacity());
 
-        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, 
"a".getBytes(), "1".getBytes()));
-        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, 
"b".getBytes(), "2".getBytes()));
-        builder.convertAndAppend(0L, Record.create(Record.MAGIC_VALUE_V0, 0L, 
"c".getBytes(), "3".getBytes()));
+        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"a".getBytes(), "1".getBytes()));
+        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"b".getBytes(), "2".getBytes()));
+        builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, 
"c".getBytes(), "3".getBytes()));
         MemoryRecords records = builder.build();
 
         MemoryRecordsBuilder.RecordsInfo info = builder.info();
         assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
-        assertEquals(0L, info.shallowOffsetOfMaxTimestamp);
+        assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
 
         for (Record record : records.records()) {
             assertEquals(TimestampType.CREATE_TIME, record.timestampType());

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index f2741ee..9c8ca7f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -60,8 +60,8 @@ public class MemoryRecordsTest {
 
         for (int i = 0; i < list.size(); i++) {
             Record r = list.get(i);
-            builder1.append(firstOffset + i, r);
-            builder2.append(firstOffset + i, i + 1, toNullableArray(r.key()), 
toNullableArray(r.value()));
+            builder1.append(r);
+            builder2.append(i + 1, toNullableArray(r.key()), 
toNullableArray(r.value()));
         }
 
         MemoryRecords recs1 = builder1.build();
@@ -85,7 +85,7 @@ public class MemoryRecordsTest {
     @Test
     public void testHasRoomForMethod() {
         MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, 
TimestampType.CREATE_TIME);
-        builder.append(0, Record.create(magic, 0L, "a".getBytes(), 
"1".getBytes()));
+        builder.append(Record.create(magic, 0L, "a".getBytes(), 
"1".getBytes()));
 
         assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
         builder.close();
@@ -95,23 +95,23 @@ public class MemoryRecordsTest {
     @Test
     public void testFilterTo() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
-        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
compression, TimestampType.CREATE_TIME);
-        builder.append(0L, 10L, null, "a".getBytes());
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
compression, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
         builder.close();
 
         builder = MemoryRecords.builder(buffer, magic, compression, 
TimestampType.CREATE_TIME, 1L);
-        builder.append(1L, 11L, "1".getBytes(), "b".getBytes());
-        builder.append(2L, 12L, null, "c".getBytes());
+        builder.append(11L, "1".getBytes(), "b".getBytes());
+        builder.append(12L, null, "c".getBytes());
         builder.close();
 
         builder = MemoryRecords.builder(buffer, magic, compression, 
TimestampType.CREATE_TIME, 3L);
-        builder.append(3L, 13L, null, "d".getBytes());
-        builder.append(4L, 20L, "4".getBytes(), "e".getBytes());
-        builder.append(5L, 15L, "5".getBytes(), "f".getBytes());
+        builder.append(13L, null, "d".getBytes());
+        builder.append(20L, "4".getBytes(), "e".getBytes());
+        builder.append(15L, "5".getBytes(), "f".getBytes());
         builder.close();
 
         builder = MemoryRecords.builder(buffer, magic, compression, 
TimestampType.CREATE_TIME, 6L);
-        builder.append(6L, 16L, "6".getBytes(), "g".getBytes());
+        builder.append(16L, "6".getBytes(), "g".getBytes());
         builder.close();
 
         buffer.flip();
@@ -175,18 +175,18 @@ public class MemoryRecordsTest {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
compression,
                 TimestampType.LOG_APPEND_TIME, 0L, logAppendTime);
-        builder.append(0L, 10L, null, "a".getBytes());
+        builder.append(10L, null, "a".getBytes());
         builder.close();
 
         builder = MemoryRecords.builder(buffer, magic, compression, 
TimestampType.LOG_APPEND_TIME, 1L, logAppendTime);
-        builder.append(1L, 11L, "1".getBytes(), "b".getBytes());
-        builder.append(2L, 12L, null, "c".getBytes());
+        builder.append(11L, "1".getBytes(), "b".getBytes());
+        builder.append(12L, null, "c".getBytes());
         builder.close();
 
         builder = MemoryRecords.builder(buffer, magic, compression, 
TimestampType.LOG_APPEND_TIME, 3L, logAppendTime);
-        builder.append(3L, 13L, null, "d".getBytes());
-        builder.append(4L, 14L, "4".getBytes(), "e".getBytes());
-        builder.append(5L, 15L, "5".getBytes(), "f".getBytes());
+        builder.append(13L, null, "d".getBytes());
+        builder.append(14L, "4".getBytes(), "e".getBytes());
+        builder.append(15L, "5".getBytes(), "f".getBytes());
         builder.close();
 
         buffer.flip();

http://git-wip-us.apache.org/repos/asf/kafka/blob/0f86dbe8/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala 
b/core/src/main/scala/kafka/log/LogValidator.scala
index 1713942..d99c2ad 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -156,7 +156,7 @@ private[kafka] object LogValidator {
       val record = logEntry.record
       validateKey(record, compactedTopic)
       validateTimestamp(record, now, timestampType, messageTimestampDiffMaxMs)
-      builder.convertAndAppend(offsetCounter.getAndIncrement(), record)
+      builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), 
record)
     }
 
     builder.close()

Reply via email to