Repository: kafka
Updated Branches:
  refs/heads/0.11.0 ec5b22c8b -> 41fff32a4


KAFKA-5490; Cleaner should retain empty batch if needed to preserve producer 
last sequence

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>, 
Jun Rao <jun...@gmail.com>

Closes #3406 from hachikuji/KAFKA-5490

(cherry picked from commit cb0325d484b957432048dd29419f0fa59c5f132d)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41fff32a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41fff32a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41fff32a

Branch: refs/heads/0.11.0
Commit: 41fff32a4ebb27847e35e0754119a1cf73c3b0ed
Parents: ec5b22c
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jun 28 11:35:06 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Jun 28 11:35:33 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/record/DefaultRecordBatch.java |   4 +-
 .../kafka/common/record/MemoryRecords.java      |  87 ++++++-----
 .../common/utils/ByteBufferOutputStream.java    |  21 ++-
 .../clients/consumer/internals/FetcherTest.java |   7 +-
 .../common/record/DefaultRecordBatchTest.java   |   1 +
 .../kafka/common/record/MemoryRecordsTest.java  | 105 ++++++++++++-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  82 ++++++----
 .../src/main/scala/kafka/log/LogValidator.scala |   3 +
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 155 +++++++++++++++++--
 .../src/test/scala/unit/kafka/log/LogTest.scala |  50 +++++-
 .../scala/unit/kafka/log/LogValidatorTest.scala |  40 ++++-
 11 files changed, 462 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index e8eba8a..2262b33 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -409,8 +409,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
                                         boolean isControlRecord) {
         int offsetDelta = (int) (lastOffset - baseOffset);
         writeHeader(buffer, baseOffset, offsetDelta, 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic,
-                CompressionType.NONE, timestampType, timestamp, timestamp, 
producerId, producerEpoch, baseSequence,
-                isTransactional, isControlRecord, partitionLeaderEpoch, 0);
+                CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, 
timestamp, producerId,
+                producerEpoch, baseSequence, isTransactional, isControlRecord, 
partitionLeaderEpoch, 0);
     }
 
     static void writeHeader(ByteBuffer buffer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 1427421..d3f2444 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.slf4j.Logger;
@@ -151,7 +152,8 @@ public class MemoryRecords extends AbstractRecords {
         for (MutableRecordBatch batch : batches) {
             bytesRead += batch.sizeInBytes();
 
-            if (filter.shouldDiscard(batch))
+            BatchRetention batchRetention = filter.checkBatchRetention(batch);
+            if (batchRetention == BatchRetention.DELETE)
                 continue;
 
             // We use the absolute offset to decide whether to retain the 
message or not. Due to KAFKA-4298, we have to
@@ -168,7 +170,7 @@ public class MemoryRecords extends AbstractRecords {
                     Record record = iterator.next();
                     messagesRead += 1;
 
-                    if (filter.shouldRetain(batch, record)) {
+                    if (filter.shouldRetainRecord(batch, record)) {
                         // Check for log corruption due to KAFKA-4298. If we 
find it, make sure that we overwrite
                         // the corrupted batch with correct data.
                         if (!record.hasMagic(batchMagic))
@@ -184,33 +186,44 @@ public class MemoryRecords extends AbstractRecords {
                 }
             }
 
-            if (writeOriginalBatch) {
-                batch.writeTo(bufferOutputStream);
-                messagesRetained += retainedRecords.size();
-                bytesRetained += batch.sizeInBytes();
-                if (batch.maxTimestamp() > maxTimestamp) {
-                    maxTimestamp = batch.maxTimestamp();
-                    shallowOffsetOfMaxTimestamp = batch.lastOffset();
-                }
-            } else if (!retainedRecords.isEmpty()) {
-                MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, 
retainedRecords, bufferOutputStream);
-                MemoryRecords records = builder.build();
-                int filteredBatchSize = records.sizeInBytes();
-
-                messagesRetained += retainedRecords.size();
-                bytesRetained += filteredBatchSize;
-
-                if (filteredBatchSize > batch.sizeInBytes() && 
filteredBatchSize > maxRecordBatchSize)
-                    log.warn("Record batch from {} with last offset {} 
exceeded max record batch size {} after cleaning " +
-                                    "(new size is {}). Consumers with version 
earlier than 0.10.1.0 may need to " +
-                                    "increase their fetch sizes.",
-                            partition, batch.lastOffset(), maxRecordBatchSize, 
filteredBatchSize);
-
-                MemoryRecordsBuilder.RecordsInfo info = builder.info();
-                if (info.maxTimestamp > maxTimestamp) {
-                    maxTimestamp = info.maxTimestamp;
-                    shallowOffsetOfMaxTimestamp = 
info.shallowOffsetOfMaxTimestamp;
+            if (!retainedRecords.isEmpty()) {
+                if (writeOriginalBatch) {
+                    batch.writeTo(bufferOutputStream);
+                    messagesRetained += retainedRecords.size();
+                    bytesRetained += batch.sizeInBytes();
+                    if (batch.maxTimestamp() > maxTimestamp) {
+                        maxTimestamp = batch.maxTimestamp();
+                        shallowOffsetOfMaxTimestamp = batch.lastOffset();
+                    }
+                } else {
+                    MemoryRecordsBuilder builder = 
buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
+                    MemoryRecords records = builder.build();
+                    int filteredBatchSize = records.sizeInBytes();
+
+                    messagesRetained += retainedRecords.size();
+                    bytesRetained += filteredBatchSize;
+
+                    if (filteredBatchSize > batch.sizeInBytes() && 
filteredBatchSize > maxRecordBatchSize)
+                        log.warn("Record batch from {} with last offset {} 
exceeded max record batch size {} after cleaning " +
+                                        "(new size is {}). Consumers with 
version earlier than 0.10.1.0 may need to " +
+                                        "increase their fetch sizes.",
+                                partition, batch.lastOffset(), 
maxRecordBatchSize, filteredBatchSize);
+
+                    MemoryRecordsBuilder.RecordsInfo info = builder.info();
+                    if (info.maxTimestamp > maxTimestamp) {
+                        maxTimestamp = info.maxTimestamp;
+                        shallowOffsetOfMaxTimestamp = 
info.shallowOffsetOfMaxTimestamp;
+                    }
                 }
+            } else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
+                if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
+                    throw new IllegalStateException("Empty batches are only 
supported for magic v2 and above");
+
+                
bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+                
DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, 
batch.producerId(),
+                        batch.producerEpoch(), batch.baseSequence(), 
batch.baseOffset(), batch.lastOffset(),
+                        batch.partitionLeaderEpoch(), batch.timestampType(), 
batch.maxTimestamp(),
+                        batch.isTransactional(), batch.isControlBatch());
             }
 
             // If we had to allocate a new buffer to fit the filtered output 
(see KAFKA-5316), return early to
@@ -300,20 +313,24 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static abstract class RecordFilter {
+        public enum BatchRetention {
+            DELETE, // Delete the batch without inspecting records
+            RETAIN_EMPTY, // Retain the batch even if it is empty
+            DELETE_EMPTY  // Delete the batch if it is empty
+        }
+
         /**
          * Check whether the full batch can be discarded (i.e. whether we even 
need to
          * check the records individually).
          */
-        protected boolean shouldDiscard(RecordBatch batch) {
-            return false;
-        }
+        protected abstract BatchRetention checkBatchRetention(RecordBatch 
batch);
 
         /**
-         * Check whether a record should be retained in the log. Only records 
from
-         * batches which were not discarded with {@link 
#shouldDiscard(RecordBatch)}
-         * will be considered.
+         * Check whether a record should be retained in the log. Note that 
{@link #checkBatchRetention(RecordBatch)}
+         * is used prior to checking individual record retention. Only records 
from batches which were not
+         * explicitly discarded with {@link BatchRetention#DELETE} will be 
considered.
          */
-        protected abstract boolean shouldRetain(RecordBatch recordBatch, 
Record record);
+        protected abstract boolean shouldRetainRecord(RecordBatch recordBatch, 
Record record);
     }
 
     public static class FilterResult {

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
index 2b13e7e..43e3bba 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java
@@ -62,17 +62,17 @@ public class ByteBufferOutputStream extends OutputStream {
     }
 
     public void write(int b) {
-        maybeExpandBuffer(1);
+        ensureRemaining(1);
         buffer.put((byte) b);
     }
 
     public void write(byte[] bytes, int off, int len) {
-        maybeExpandBuffer(len);
+        ensureRemaining(len);
         buffer.put(bytes, off, len);
     }
 
     public void write(ByteBuffer sourceBuffer) {
-        maybeExpandBuffer(sourceBuffer.remaining());
+        ensureRemaining(sourceBuffer.remaining());
         buffer.put(sourceBuffer);
     }
 
@@ -93,7 +93,7 @@ public class ByteBufferOutputStream extends OutputStream {
     }
 
     public void position(int position) {
-        maybeExpandBuffer(position - buffer.position());
+        ensureRemaining(position - buffer.position());
         buffer.position(position);
     }
 
@@ -105,9 +105,16 @@ public class ByteBufferOutputStream extends OutputStream {
         return initialCapacity;
     }
 
-    private void maybeExpandBuffer(int remainingRequired) {
-        if (remainingRequired > buffer.remaining())
-            expandBuffer(remainingRequired);
+    /**
+     * Ensure there is enough space to write some number of bytes, expanding 
the underlying buffer if necessary.
+     * This can be used to avoid incremental expansions through calls to 
{@link #write(int)} when you know how
+     * many total bytes are needed.
+     *
+     * @param remainingBytesRequired The number of bytes required
+     */
+    public void ensureRemaining(int remainingBytesRequired) {
+        if (remainingBytesRequired > buffer.remaining())
+            expandBuffer(remainingBytesRequired);
     }
 
     private void expandBuffer(int remainingRequired) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 813a277..0801979 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1641,7 +1641,12 @@ public class FetcherTest {
         // Remove the last record to simulate compaction
         MemoryRecords.FilterResult result = records.filterTo(tp1, new 
MemoryRecords.RecordFilter() {
             @Override
-            protected boolean shouldRetain(RecordBatch recordBatch, Record 
record) {
+            protected BatchRetention checkBatchRetention(RecordBatch batch) {
+                return BatchRetention.DELETE_EMPTY;
+            }
+
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, 
Record record) {
                 return record.key() != null;
             }
         }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, 
BufferSupplier.NO_CACHING);

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index 6129c04..587bf14 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -63,6 +63,7 @@ public class DefaultRecordBatchTest {
                     assertEquals(isTransactional, batch.isTransactional());
                     assertEquals(timestampType, batch.timestampType());
                     assertEquals(timestamp, batch.maxTimestamp());
+                    assertEquals(RecordBatch.NO_TIMESTAMP, 
batch.baseTimestamp());
                     assertEquals(isControlBatch, batch.isControlBatch());
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 21f6d53..de00378 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import 
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
@@ -26,6 +27,7 @@ import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 
@@ -239,6 +241,94 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testFilterToEmptyBatchRetention() {
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            for (boolean isTransactional : Arrays.asList(true, false)) {
+                ByteBuffer buffer = ByteBuffer.allocate(2048);
+                long producerId = 23L;
+                short producerEpoch = 5;
+                long baseOffset = 3L;
+                int baseSequence = 10;
+                int partitionLeaderEpoch = 293;
+
+                MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magic, compression, TimestampType.CREATE_TIME,
+                        baseOffset, RecordBatch.NO_TIMESTAMP, producerId, 
producerEpoch, baseSequence, isTransactional,
+                        partitionLeaderEpoch);
+                builder.append(11L, "2".getBytes(), "b".getBytes());
+                builder.append(12L, "3".getBytes(), "c".getBytes());
+                builder.close();
+
+                ByteBuffer filtered = ByteBuffer.allocate(2048);
+                builder.build().filterTo(new TopicPartition("foo", 0), new 
MemoryRecords.RecordFilter() {
+                    @Override
+                    protected BatchRetention checkBatchRetention(RecordBatch 
batch) {
+                        // retain all batches
+                        return BatchRetention.RETAIN_EMPTY;
+                    }
+
+                    @Override
+                    protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                        // delete the records
+                        return false;
+                    }
+                }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+                filtered.flip();
+                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+
+                List<MutableRecordBatch> batches = 
TestUtils.toList(filteredRecords.batches());
+                assertEquals(1, batches.size());
+
+                MutableRecordBatch batch = batches.get(0);
+                assertEquals(0, batch.countOrNull().intValue());
+                assertEquals(12L, batch.maxTimestamp());
+                assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+                assertEquals(baseOffset, batch.baseOffset());
+                assertEquals(baseOffset + 1, batch.lastOffset());
+                assertEquals(baseSequence, batch.baseSequence());
+                assertEquals(baseSequence + 1, batch.lastSequence());
+                assertEquals(isTransactional, batch.isTransactional());
+            }
+        }
+    }
+
+    @Test
+    public void testEmptyBatchDeletion() {
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            for (final BatchRetention deleteRetention : 
Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) {
+                ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+                long producerId = 23L;
+                short producerEpoch = 5;
+                long baseOffset = 3L;
+                int baseSequence = 10;
+                int partitionLeaderEpoch = 293;
+
+                DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
+                        baseSequence, baseOffset, baseOffset, 
partitionLeaderEpoch, TimestampType.CREATE_TIME,
+                        System.currentTimeMillis(), false, false);
+                buffer.flip();
+
+                ByteBuffer filtered = ByteBuffer.allocate(2048);
+                MemoryRecords.readableRecords(buffer).filterTo(new 
TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
+                    @Override
+                    protected BatchRetention checkBatchRetention(RecordBatch 
batch) {
+                        return deleteRetention;
+                    }
+
+                    @Override
+                    protected boolean shouldRetainRecord(RecordBatch 
recordBatch, Record record) {
+                        return false;
+                    }
+                }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+                filtered.flip();
+                MemoryRecords filteredRecords = 
MemoryRecords.readableRecords(filtered);
+                assertEquals(0, filteredRecords.sizeInBytes());
+            }
+        }
+    }
+
+    @Test
     public void testBuildEndTxnMarker() {
         if (magic >= RecordBatch.MAGIC_VALUE_V2) {
             long producerId = 73;
@@ -303,13 +393,15 @@ public class MemoryRecordsTest {
             ByteBuffer filtered = ByteBuffer.allocate(2048);
             MemoryRecords.readableRecords(buffer).filterTo(new 
TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
                 @Override
-                protected boolean shouldDiscard(RecordBatch batch) {
+                protected BatchRetention checkBatchRetention(RecordBatch 
batch) {
                     // discard the second and fourth batches
-                    return batch.lastOffset() == 2L || batch.lastOffset() == 
6L;
+                    if (batch.lastOffset() == 2L || batch.lastOffset() == 6L)
+                        return BatchRetention.DELETE;
+                    return BatchRetention.DELETE_EMPTY;
                 }
 
                 @Override
-                protected boolean shouldRetain(RecordBatch recordBatch, Record 
record) {
+                protected boolean shouldRetainRecord(RecordBatch recordBatch, 
Record record) {
                     return true;
                 }
             }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
@@ -683,7 +775,12 @@ public class MemoryRecordsTest {
 
     private static class RetainNonNullKeysFilter extends 
MemoryRecords.RecordFilter {
         @Override
-        public boolean shouldRetain(RecordBatch batch, Record record) {
+        protected BatchRetention checkBatchRetention(RecordBatch batch) {
+            return BatchRetention.DELETE_EMPTY;
+        }
+
+        @Override
+        public boolean shouldRetainRecord(RecordBatch batch, Record record) {
             return record.hasKey();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5222487..d8a86db 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
@@ -63,7 +64,22 @@ import scala.collection.JavaConverters._
  * The cleaner will only retain delete records for a period of time to avoid 
accumulating space indefinitely. This period of time is configurable on a 
per-topic
  * basis and is measured from the time the segment enters the clean portion of 
the log (at which point any prior message with that key has been removed).
  * Delete markers in the clean section of the log that are older than this 
time will not be retained when log segments are being recopied as part of 
cleaning.
- * 
+ *
+ * Note that cleaning is more complicated with the idempotent/transactional 
producer capabilities. The following
+ * are the key points:
+ *
+ * 1. In order to maintain sequence number continuity for active producers, we 
always retain the last batch
+ *    from each producerId, even if all the records from the batch have been 
removed. The batch will be removed
+ *    once the producer either writes a new batch or is expired due to 
inactivity.
+ * 2. We do not clean beyond the last stable offset. This ensures that all 
records observed by the cleaner have
+ *    been decided (i.e. committed or aborted). In particular, this allows us 
to use the transaction index to
+ *    collect the aborted transactions ahead of time.
+ * 3. Records from aborted transactions are removed by the cleaner immediately 
without regard to record keys.
+ * 4. Transaction markers are retained until all record batches from the same 
transaction have been removed and
+ *    a sufficient amount of time has passed to reasonably ensure that an 
active consumer wouldn't consume any
+ *    data from the transaction prior to reaching the offset of the marker. 
This follows the same logic used for
+ *    tombstone deletion.
+ *
  * @param config Configuration parameters for the cleaner
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
@@ -478,34 +494,29 @@ private[log] class Cleaner(val id: Int,
                              activeProducers: Map[Long, ProducerIdEntry],
                              stats: CleanerStats) {
     val logCleanerFilter = new RecordFilter {
-      var retainLastBatchSequence: Boolean = false
-      var discardBatchRecords: Boolean = false
+      var discardBatchRecords: Boolean = _
 
-      override def shouldDiscard(batch: RecordBatch): Boolean = {
+      override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
         // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
         // note that we will never delete a marker until all the records from 
that transaction are removed.
         discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletes)
 
         // check if the batch contains the last sequence number for the 
producer. if so, we cannot
         // remove the batch just yet or the producer may see an out of 
sequence error.
-        if (batch.hasProducerId && 
activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence)) {
-          retainLastBatchSequence = true
-          false
-        } else {
-          retainLastBatchSequence = false
-          discardBatchRecords
-        }
+        if (batch.hasProducerId && 
activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence))
+          BatchRetention.RETAIN_EMPTY
+        else if (discardBatchRecords)
+          BatchRetention.DELETE
+        else
+          BatchRetention.DELETE_EMPTY
       }
 
-      override def shouldRetain(batch: RecordBatch, record: Record): Boolean = 
{
-        if (retainLastBatchSequence && batch.lastSequence == record.sequence)
-          // always retain the record with the last sequence number
-          true
-        else if (discardBatchRecords)
-          // remove the record if the batch would have otherwise been discarded
+      override def shouldRetainRecord(batch: RecordBatch, record: Record): 
Boolean = {
+        if (discardBatchRecords)
+          // The batch is only retained to preserve producer sequence 
information; the records can be removed
           false
         else
-          shouldRetainRecord(source, map, retainDeletes, batch, record, stats)
+          Cleaner.this.shouldRetainRecord(source, map, retainDeletes, batch, 
record, stats)
       }
     }
 
@@ -731,7 +742,8 @@ private[log] class Cleaner(val id: Int,
         } else {
           val isAborted = transactionMetadata.onBatchRead(batch)
           if (isAborted) {
-            // abort markers are supported in v2 and above, which means count 
is defined
+            // If the batch is aborted, do not bother populating the offset 
map.
+            // Note that abort markers are supported in v2 and above, which 
means count is defined.
             stats.indexMessagesRead(batch.countOrNull)
           } else {
             for (record <- batch.asScala) {
@@ -849,7 +861,7 @@ private[log] object CleanedTransactionMetadata {
 private[log] class CleanedTransactionMetadata(val abortedTransactions: 
mutable.PriorityQueue[AbortedTxn],
                                               val transactionIndex: 
Option[TransactionIndex] = None) {
   val ongoingCommittedTxns = mutable.Set.empty[Long]
-  val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTxn]
+  val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata]
 
   /**
    * Update the cleaned transaction state with a control batch that has just 
been traversed by the cleaner.
@@ -863,14 +875,16 @@ private[log] class CleanedTransactionMetadata(val 
abortedTransactions: mutable.P
     val producerId = controlBatch.producerId
     controlType match {
       case ControlRecordType.ABORT =>
-        val maybeAbortedTxn = ongoingAbortedTxns.remove(producerId)
-        maybeAbortedTxn.foreach { abortedTxn =>
-          transactionIndex.foreach(_.append(abortedTxn))
+        ongoingAbortedTxns.remove(producerId) match {
+          // Retain the marker until all batches from the transaction have 
been removed
+          case Some(abortedTxnMetadata) if 
abortedTxnMetadata.lastObservedBatchOffset.isDefined =>
+            transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+            false
+          case _ => true
         }
-        true
 
       case ControlRecordType.COMMIT =>
-        // this marker is eligible for deletion if we didn't traverse any 
records from the transaction
+        // This marker is eligible for deletion if we didn't traverse any 
batches from the transaction
         !ongoingCommittedTxns.remove(producerId)
 
       case _ => false
@@ -880,7 +894,7 @@ private[log] class CleanedTransactionMetadata(val 
abortedTransactions: mutable.P
   private def consumeAbortedTxnsUpTo(offset: Long): Unit = {
     while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) {
       val abortedTxn = abortedTransactions.dequeue()
-      ongoingAbortedTxns += abortedTxn.producerId -> abortedTxn
+      ongoingAbortedTxns += abortedTxn.producerId -> new 
AbortedTransactionMetadata(abortedTxn)
     }
   }
 
@@ -891,11 +905,13 @@ private[log] class CleanedTransactionMetadata(val 
abortedTransactions: mutable.P
   def onBatchRead(batch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(batch.lastOffset)
     if (batch.isTransactional) {
-      if (ongoingAbortedTxns.contains(batch.producerId))
-        true
-      else {
-        ongoingCommittedTxns += batch.producerId
-        false
+      ongoingAbortedTxns.get(batch.producerId) match {
+        case Some(abortedTransactionMetadata) =>
+          abortedTransactionMetadata.lastObservedBatchOffset = 
Some(batch.lastOffset)
+          true
+        case None =>
+          ongoingCommittedTxns += batch.producerId
+          false
       }
     } else {
       false
@@ -903,3 +919,7 @@ private[log] class CleanedTransactionMetadata(val 
abortedTransactions: mutable.P
   }
 
 }
+
+private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) {
+  var lastObservedBatchOffset: Option[Long] = None
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala 
b/core/src/main/scala/kafka/log/LogValidator.scala
index e7d7963..b6b20e3 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -78,6 +78,9 @@ private[kafka] object LogValidator extends Logging {
 
       if (batch.isControlBatch)
         throw new InvalidRecordException("Clients are not allowed to write 
control records")
+
+      if (Option(batch.countOrNull).contains(0))
+        throw new InvalidRecordException("Record batches must contain at least 
one record")
     }
 
     if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index dabd2d6..19ea699 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import java.io.File
 import java.nio._
-import java.nio.file.{Files, Paths}
+import java.nio.file.Paths
 import java.util.Properties
 
 import kafka.common._
@@ -107,8 +107,10 @@ class LogCleanerTest extends JUnitSuite {
 
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
-    assertEquals(List(2, 3, 3, 4, 1, 4), keysInLog(log))
-    assertEquals(List(1, 2, 3, 5, 6, 7), offsetsInLog(log))
+    assertEquals(List(2, 5, 7), lastOffsetsPerBatchInLog(log))
+    assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1), lastSequencesInLog(log))
+    assertEquals(List(2, 3, 1, 4), keysInLog(log))
+    assertEquals(List(1, 3, 6, 7), offsetsInLog(log))
 
     // we have to reload the log to validate that the cleaner maintained 
sequence numbers correctly
     def reloadLog(): Unit = {
@@ -137,8 +139,10 @@ class LogCleanerTest extends JUnitSuite {
     appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2))
     log.roll()
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
-    assertEquals(List(3, 3, 4, 1, 4, 2), keysInLog(log))
-    assertEquals(List(2, 3, 5, 6, 7, 8), offsetsInLog(log))
+    assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1, pid4 -> 0), 
lastSequencesInLog(log))
+    assertEquals(List(2, 5, 7, 8), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(3, 1, 4, 2), keysInLog(log))
+    assertEquals(List(3, 6, 7, 8), offsetsInLog(log))
 
     reloadLog()
 
@@ -266,12 +270,59 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
 
     // clean again with large delete horizon and verify the marker is removed
-    cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = 
Long.MaxValue)
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 1, 3), keysInLog(log))
     assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
   }
 
   @Test
+  def testCommitMarkerRetentionWithEmptyBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId, 
producerEpoch)
+
+    appendProducer(Seq(2, 3)) // batch last offset is 1
+    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 
0, isFromClient = false)
+    log.roll()
+
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0)
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0)
+    log.roll()
+
+    // first time through the records are removed
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), keysInLog(log))
+    assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is retained
+    assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty 
batch is retained
+
+    // the empty batch remains if cleaned again because it still holds the 
last sequence
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), keysInLog(log))
+    assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is still 
retained
+    assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty 
batch is retained
+
+    // append a new record from the producer to allow cleaning of the empty 
batch
+    appendProducer(Seq(1))
+    log.roll()
+
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3, 1), keysInLog(log))
+    assertEquals(List(2, 3, 4, 5), offsetsInLog(log)) // commit marker is 
still retained
+    assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty 
batch should be gone
+
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3, 1), keysInLog(log))
+    assertEquals(List(3, 4, 5), offsetsInLog(log)) // commit marker is gone
+    assertEquals(List(3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch 
is gone
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
@@ -301,6 +352,65 @@ class LogCleanerTest extends JUnitSuite {
     assertEquals(List(4, 5), offsetsInLog(log))
   }
 
+  @Test
+  def testAbortMarkerRetentionWithEmptyBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    val producerEpoch = 0.toShort
+    val producerId = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId, 
producerEpoch)
+
+    appendProducer(Seq(2, 3)) // batch last offset is 1
+    log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 
0, isFromClient = false)
+    log.roll()
+
+    def assertAbortedTransactionIndexed(): Unit = {
+      val abortedTxns = log.collectAbortedTransactions(0L, 100L)
+      assertEquals(1, abortedTxns.size)
+      assertEquals(producerId, abortedTxns.head.producerId)
+      assertEquals(0, abortedTxns.head.firstOffset)
+      assertEquals(2, abortedTxns.head.lastOffset)
+    }
+
+    assertAbortedTransactionIndexed()
+
+    // first time through the records are removed
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertAbortedTransactionIndexed()
+    assertEquals(List(), keysInLog(log))
+    assertEquals(List(2), offsetsInLog(log)) // abort marker is retained
+    assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is 
retained
+
+    // the empty batch remains if cleaned again because it still holds the 
last sequence
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertAbortedTransactionIndexed()
+    assertEquals(List(), keysInLog(log))
+    assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained
+    assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is 
retained
+
+    // now update the last sequence so that the empty batch can be removed
+    appendProducer(Seq(1))
+    log.roll()
+
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertAbortedTransactionIndexed()
+    assertEquals(List(1), keysInLog(log))
+    assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet 
gone because we read the empty batch
+    assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not 
preserve the empty batch
+
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), 
deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(1), keysInLog(log))
+    assertEquals(List(3), offsetsInLog(log)) // abort marker is gone
+    assertEquals(List(3), lastOffsetsPerBatchInLog(log))
+
+    // we do not bother retaining the aborted transaction in the index
+    assertEquals(0, log.collectAbortedTransactions(0L, 100L).size)
+  }
+
   /**
    * Test log cleaning with logs containing messages larger than default 
message size
    */
@@ -404,8 +514,10 @@ class LogCleanerTest extends JUnitSuite {
     log.roll()
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
-    assertEquals(List(0, 0, 1), keysInLog(log))
-    assertEquals(List(1, 3, 4), offsetsInLog(log))
+    assertEquals(List(1, 3, 4), lastOffsetsPerBatchInLog(log))
+    assertEquals(Map(1L -> 0, 2L -> 1, 3L -> 0), lastSequencesInLog(log))
+    assertEquals(List(0, 1), keysInLog(log))
+    assertEquals(List(3, 4), offsetsInLog(log))
   }
 
   @Test
@@ -425,8 +537,20 @@ class LogCleanerTest extends JUnitSuite {
     log.roll()
 
     cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
-    assertEquals(List(3), keysInLog(log))
-    assertEquals(List(2, 3), offsetsInLog(log))
+    assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log))
+    assertEquals(Map(producerId -> 2), lastSequencesInLog(log))
+    assertEquals(List(), keysInLog(log))
+    assertEquals(List(3), offsetsInLog(log))
+
+    // Append a new entry from the producer and verify that the empty batch is 
cleaned up
+    appendProducer(Seq(1, 5))
+    log.roll()
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, 
log.activeSegment.baseOffset))
+
+    assertEquals(List(3, 5), lastOffsetsPerBatchInLog(log))
+    assertEquals(Map(producerId -> 4), lastSequencesInLog(log))
+    assertEquals(List(1, 5), keysInLog(log))
+    assertEquals(List(3, 4, 5), offsetsInLog(log))
   }
 
   @Test
@@ -588,6 +712,17 @@ class LogCleanerTest extends JUnitSuite {
       yield TestUtils.readString(record.key).toInt
   }
 
+  def lastOffsetsPerBatchInLog(log: Log): Iterable[Long] = {
+    for (segment <- log.logSegments; batch <- segment.log.batches.asScala)
+      yield batch.lastOffset
+  }
+
+  def lastSequencesInLog(log: Log): Map[Long, Int] = {
+    (for (segment <- log.logSegments;
+          batch <- segment.log.batches.asScala if !batch.isControlBatch && 
batch.hasProducerId)
+      yield batch.producerId -> batch.lastSequence).toMap
+  }
+
   /* extract all the offsets from a log */
   def offsetsInLog(log: Log): Iterable[Long] =
     log.logSegments.flatMap(s => 
s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(m => m.offset))

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 630cfcf..65a4eeb 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -32,6 +32,7 @@ import kafka.server.{BrokerTopicStats, KafkaConfig}
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
+import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.IsolationLevel
@@ -401,7 +402,8 @@ class LogTest {
 
     val filtered = ByteBuffer.allocate(2048)
     records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
-      override def shouldRetain(recordBatch: RecordBatch, record: Record): 
Boolean = !record.hasKey
+      override def checkBatchRetention(batch: RecordBatch): BatchRetention = 
RecordFilter.BatchRetention.DELETE_EMPTY
+      override def shouldRetainRecord(recordBatch: RecordBatch, record: 
Record): Boolean = !record.hasKey
     }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
     filtered.flip()
     val filteredRecords = MemoryRecords.readableRecords(filtered)
@@ -428,6 +430,49 @@ class LogTest {
   }
 
   @Test
+  def testRebuildProducerStateWithEmptyCompactedBatch() {
+    val log = createLog(2048)
+    val pid = 1L
+    val epoch = 0.toShort
+    val seq = 0
+    val baseOffset = 23L
+
+    // create an empty batch
+    val records = TestUtils.records(producerId = pid, producerEpoch = epoch, 
sequence = seq, baseOffset = baseOffset, records = List(
+      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, 
"a".getBytes),
+      new SimpleRecord(System.currentTimeMillis(), "key".getBytes, 
"b".getBytes)))
+    records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
+
+    val filtered = ByteBuffer.allocate(2048)
+    records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
+      override def checkBatchRetention(batch: RecordBatch): BatchRetention = 
RecordFilter.BatchRetention.RETAIN_EMPTY
+      override def shouldRetainRecord(recordBatch: RecordBatch, record: 
Record): Boolean = false
+    }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
+    filtered.flip()
+    val filteredRecords = MemoryRecords.readableRecords(filtered)
+
+    log.appendAsFollower(filteredRecords)
+
+    // append some more data and then truncate to force rebuilding of the PID 
map
+    val moreRecords = TestUtils.records(baseOffset = baseOffset + 2, records = 
List(
+      new SimpleRecord(System.currentTimeMillis(), "e".getBytes),
+      new SimpleRecord(System.currentTimeMillis(), "f".getBytes)))
+    moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0))
+    log.appendAsFollower(moreRecords)
+
+    log.truncateTo(baseOffset + 2)
+
+    val activeProducers = log.activeProducers
+    assertTrue(activeProducers.contains(pid))
+
+    val entry = activeProducers(pid)
+    assertEquals(0, entry.firstSeq)
+    assertEquals(baseOffset, entry.firstOffset)
+    assertEquals(1, entry.lastSeq)
+    assertEquals(baseOffset + 1, entry.lastOffset)
+  }
+
+  @Test
   def testUpdatePidMapWithCompactedData() {
     val log = createLog(2048)
     val pid = 1L
@@ -445,7 +490,8 @@ class LogTest {
 
     val filtered = ByteBuffer.allocate(2048)
     records.filterTo(new TopicPartition("foo", 0), new RecordFilter {
-      override def shouldRetain(recordBatch: RecordBatch, record: Record): 
Boolean = !record.hasKey
+      override def checkBatchRetention(batch: RecordBatch): BatchRetention = 
RecordFilter.BatchRetention.DELETE_EMPTY
+      override def shouldRetainRecord(recordBatch: RecordBatch, record: 
Record): Boolean = !record.hasKey
     }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING)
     filtered.flip()
     val filteredRecords = MemoryRecords.readableRecords(filtered)

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index 3ab9732..2225ca6 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 import java.nio.ByteBuffer
 
 import kafka.common.LongRef
-import kafka.message.{DefaultCompressionCodec, GZIPCompressionCodec, 
NoCompressionCodec, SnappyCompressionCodec}
+import kafka.message.{CompressionCodec, DefaultCompressionCodec, 
GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
 import org.apache.kafka.common.errors.{InvalidTimestampException, 
UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.test.TestUtils
@@ -909,6 +909,44 @@ class LogValidatorTest {
       isFromClient = true)
   }
 
+  @Test(expected = classOf[InvalidRecordException])
+  def testCompressedBatchWithoutRecordsNotAllowed(): Unit = {
+    testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, 
DefaultCompressionCodec)
+  }
+
+  @Test(expected = classOf[InvalidRecordException])
+  def testUncompressedBatchWithoutRecordsNotAllowed(): Unit = {
+    testBatchWithoutRecordsNotAllowed(NoCompressionCodec, NoCompressionCodec)
+  }
+
+  @Test(expected = classOf[InvalidRecordException])
+  def testRecompressedBatchWithoutRecordsNotAllowed(): Unit = {
+    testBatchWithoutRecordsNotAllowed(NoCompressionCodec, 
DefaultCompressionCodec)
+  }
+
+  private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, 
targetCodec: CompressionCodec): Unit = {
+    val offset = 1234567
+    val (producerId, producerEpoch, baseSequence, isTransactional, 
partitionLeaderEpoch) =
+      (1324L, 10.toShort, 984, true, 40)
+    val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD)
+    DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
+      baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, 
System.currentTimeMillis(),
+      isTransactional, false)
+    buffer.flip()
+    val records = MemoryRecords.readableRecords(buffer)
+    LogValidator.validateMessagesAndAssignOffsets(records,
+      offsetCounter = new LongRef(offset),
+      now = System.currentTimeMillis(),
+      sourceCodec = sourceCodec,
+      targetCodec = targetCodec,
+      compactedTopic = false,
+      magic = RecordBatch.CURRENT_MAGIC_VALUE,
+      timestampType = TimestampType.CREATE_TIME,
+      timestampDiffMaxMs = 5000L,
+      partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH,
+      isFromClient = true)
+  }
+
   private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
                             timestamp: Long = RecordBatch.NO_TIMESTAMP,
                             codec: CompressionType = CompressionType.NONE): 
MemoryRecords = {

Reply via email to