Repository: kafka Updated Branches: refs/heads/0.11.0 ec5b22c8b -> 41fff32a4
KAFKA-5490; Cleaner should retain empty batch if needed to preserve producer last sequence Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com>, Jun Rao <jun...@gmail.com> Closes #3406 from hachikuji/KAFKA-5490 (cherry picked from commit cb0325d484b957432048dd29419f0fa59c5f132d) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41fff32a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41fff32a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41fff32a Branch: refs/heads/0.11.0 Commit: 41fff32a4ebb27847e35e0754119a1cf73c3b0ed Parents: ec5b22c Author: Jason Gustafson <ja...@confluent.io> Authored: Wed Jun 28 11:35:06 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Wed Jun 28 11:35:33 2017 -0700 ---------------------------------------------------------------------- .../kafka/common/record/DefaultRecordBatch.java | 4 +- .../kafka/common/record/MemoryRecords.java | 87 ++++++----- .../common/utils/ByteBufferOutputStream.java | 21 ++- .../clients/consumer/internals/FetcherTest.java | 7 +- .../common/record/DefaultRecordBatchTest.java | 1 + .../kafka/common/record/MemoryRecordsTest.java | 105 ++++++++++++- core/src/main/scala/kafka/log/LogCleaner.scala | 82 ++++++---- .../src/main/scala/kafka/log/LogValidator.scala | 3 + .../scala/unit/kafka/log/LogCleanerTest.scala | 155 +++++++++++++++++-- .../src/test/scala/unit/kafka/log/LogTest.scala | 50 +++++- .../scala/unit/kafka/log/LogValidatorTest.scala | 40 ++++- 11 files changed, 462 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index e8eba8a..2262b33 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -409,8 +409,8 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe boolean isControlRecord) { int offsetDelta = (int) (lastOffset - baseOffset); writeHeader(buffer, baseOffset, offsetDelta, DefaultRecordBatch.RECORD_BATCH_OVERHEAD, magic, - CompressionType.NONE, timestampType, timestamp, timestamp, producerId, producerEpoch, baseSequence, - isTransactional, isControlRecord, partitionLeaderEpoch, 0); + CompressionType.NONE, timestampType, RecordBatch.NO_TIMESTAMP, timestamp, producerId, + producerEpoch, baseSequence, isTransactional, isControlRecord, partitionLeaderEpoch, 0); } static void writeHeader(ByteBuffer buffer, http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 1427421..d3f2444 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.ByteBufferOutputStream; import org.apache.kafka.common.utils.CloseableIterator; import org.slf4j.Logger; @@ -151,7 +152,8 @@ public class MemoryRecords extends AbstractRecords { for (MutableRecordBatch batch : batches) { bytesRead += batch.sizeInBytes(); - if (filter.shouldDiscard(batch)) + BatchRetention batchRetention = filter.checkBatchRetention(batch); + if (batchRetention == BatchRetention.DELETE) continue; // We use the absolute offset to decide whether to retain the message or not. Due to KAFKA-4298, we have to @@ -168,7 +170,7 @@ public class MemoryRecords extends AbstractRecords { Record record = iterator.next(); messagesRead += 1; - if (filter.shouldRetain(batch, record)) { + if (filter.shouldRetainRecord(batch, record)) { // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite // the corrupted batch with correct data. if (!record.hasMagic(batchMagic)) @@ -184,33 +186,44 @@ public class MemoryRecords extends AbstractRecords { } } - if (writeOriginalBatch) { - batch.writeTo(bufferOutputStream); - messagesRetained += retainedRecords.size(); - bytesRetained += batch.sizeInBytes(); - if (batch.maxTimestamp() > maxTimestamp) { - maxTimestamp = batch.maxTimestamp(); - shallowOffsetOfMaxTimestamp = batch.lastOffset(); - } - } else if (!retainedRecords.isEmpty()) { - MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream); - MemoryRecords records = builder.build(); - int filteredBatchSize = records.sizeInBytes(); - - messagesRetained += retainedRecords.size(); - bytesRetained += filteredBatchSize; - - if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) - log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + - "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " + - "increase their fetch sizes.", - partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize); - - MemoryRecordsBuilder.RecordsInfo info = builder.info(); - if (info.maxTimestamp > maxTimestamp) { - maxTimestamp = info.maxTimestamp; - shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp; + if (!retainedRecords.isEmpty()) { + if (writeOriginalBatch) { + batch.writeTo(bufferOutputStream); + messagesRetained += retainedRecords.size(); + bytesRetained += batch.sizeInBytes(); + if (batch.maxTimestamp() > maxTimestamp) { + maxTimestamp = batch.maxTimestamp(); + shallowOffsetOfMaxTimestamp = batch.lastOffset(); + } + } else { + MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream); + MemoryRecords records = builder.build(); + int filteredBatchSize = records.sizeInBytes(); + + messagesRetained += retainedRecords.size(); + bytesRetained += filteredBatchSize; + + if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize) + log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " + + "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " + + "increase their fetch sizes.", + partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + if (info.maxTimestamp > maxTimestamp) { + maxTimestamp = info.maxTimestamp; + shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp; + } } + } else if (batchRetention == BatchRetention.RETAIN_EMPTY) { + if (batchMagic < RecordBatch.MAGIC_VALUE_V2) + throw new IllegalStateException("Empty batches are only supported for magic v2 and above"); + + bufferOutputStream.ensureRemaining(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + DefaultRecordBatch.writeEmptyHeader(bufferOutputStream.buffer(), batchMagic, batch.producerId(), + batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(), + batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(), + batch.isTransactional(), batch.isControlBatch()); } // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to @@ -300,20 +313,24 @@ public class MemoryRecords extends AbstractRecords { } public static abstract class RecordFilter { + public enum BatchRetention { + DELETE, // Delete the batch without inspecting records + RETAIN_EMPTY, // Retain the batch even if it is empty + DELETE_EMPTY // Delete the batch if it is empty + } + /** * Check whether the full batch can be discarded (i.e. whether we even need to * check the records individually). */ - protected boolean shouldDiscard(RecordBatch batch) { - return false; - } + protected abstract BatchRetention checkBatchRetention(RecordBatch batch); /** - * Check whether a record should be retained in the log. Only records from - * batches which were not discarded with {@link #shouldDiscard(RecordBatch)} - * will be considered. + * Check whether a record should be retained in the log. Note that {@link #checkBatchRetention(RecordBatch)} + * is used prior to checking individual record retention. Only records from batches which were not + * explicitly discarded with {@link BatchRetention#DELETE} will be considered. */ - protected abstract boolean shouldRetain(RecordBatch recordBatch, Record record); + protected abstract boolean shouldRetainRecord(RecordBatch recordBatch, Record record); } public static class FilterResult { http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java index 2b13e7e..43e3bba 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java @@ -62,17 +62,17 @@ public class ByteBufferOutputStream extends OutputStream { } public void write(int b) { - maybeExpandBuffer(1); + ensureRemaining(1); buffer.put((byte) b); } public void write(byte[] bytes, int off, int len) { - maybeExpandBuffer(len); + ensureRemaining(len); buffer.put(bytes, off, len); } public void write(ByteBuffer sourceBuffer) { - maybeExpandBuffer(sourceBuffer.remaining()); + ensureRemaining(sourceBuffer.remaining()); buffer.put(sourceBuffer); } @@ -93,7 +93,7 @@ public class ByteBufferOutputStream extends OutputStream { } public void position(int position) { - maybeExpandBuffer(position - buffer.position()); + ensureRemaining(position - buffer.position()); buffer.position(position); } @@ -105,9 +105,16 @@ public class ByteBufferOutputStream extends OutputStream { return initialCapacity; } - private void maybeExpandBuffer(int remainingRequired) { - if (remainingRequired > buffer.remaining()) - expandBuffer(remainingRequired); + /** + * Ensure there is enough space to write some number of bytes, expanding the underlying buffer if necessary. + * This can be used to avoid incremental expansions through calls to {@link #write(int)} when you know how + * many total bytes are needed. + * + * @param remainingBytesRequired The number of bytes required + */ + public void ensureRemaining(int remainingBytesRequired) { + if (remainingBytesRequired > buffer.remaining()) + expandBuffer(remainingBytesRequired); } private void expandBuffer(int remainingRequired) { http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 813a277..0801979 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1641,7 +1641,12 @@ public class FetcherTest { // Remove the last record to simulate compaction MemoryRecords.FilterResult result = records.filterTo(tp1, new MemoryRecords.RecordFilter() { @Override - protected boolean shouldRetain(RecordBatch recordBatch, Record record) { + protected BatchRetention checkBatchRetention(RecordBatch batch) { + return BatchRetention.DELETE_EMPTY; + } + + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { return record.key() != null; } }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING); http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java index 6129c04..587bf14 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java @@ -63,6 +63,7 @@ public class DefaultRecordBatchTest { assertEquals(isTransactional, batch.isTransactional()); assertEquals(timestampType, batch.timestampType()); assertEquals(timestamp, batch.maxTimestamp()); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.baseTimestamp()); assertEquals(isControlBatch, batch.isControlBatch()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 21f6d53..de00378 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -26,6 +27,7 @@ import org.junit.runners.Parameterized; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -239,6 +241,94 @@ public class MemoryRecordsTest { } @Test + public void testFilterToEmptyBatchRetention() { + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + for (boolean isTransactional : Arrays.asList(true, false)) { + ByteBuffer buffer = ByteBuffer.allocate(2048); + long producerId = 23L; + short producerEpoch = 5; + long baseOffset = 3L; + int baseSequence = 10; + int partitionLeaderEpoch = 293; + + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, + baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, + partitionLeaderEpoch); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + builder.build().filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + // retain all batches + return BatchRetention.RETAIN_EMPTY; + } + + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + // delete the records + return false; + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + + List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches()); + assertEquals(1, batches.size()); + + MutableRecordBatch batch = batches.get(0); + assertEquals(0, batch.countOrNull().intValue()); + assertEquals(12L, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(baseOffset, batch.baseOffset()); + assertEquals(baseOffset + 1, batch.lastOffset()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(baseSequence + 1, batch.lastSequence()); + assertEquals(isTransactional, batch.isTransactional()); + } + } + } + + @Test + public void testEmptyBatchDeletion() { + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + for (final BatchRetention deleteRetention : Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) { + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + long producerId = 23L; + short producerEpoch = 5; + long baseOffset = 3L; + int baseSequence = 10; + int partitionLeaderEpoch = 293; + + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch, + baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME, + System.currentTimeMillis(), false, false); + buffer.flip(); + + ByteBuffer filtered = ByteBuffer.allocate(2048); + MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { + @Override + protected BatchRetention checkBatchRetention(RecordBatch batch) { + return deleteRetention; + } + + @Override + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { + return false; + } + }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); + + filtered.flip(); + MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); + assertEquals(0, filteredRecords.sizeInBytes()); + } + } + } + + @Test public void testBuildEndTxnMarker() { if (magic >= RecordBatch.MAGIC_VALUE_V2) { long producerId = 73; @@ -303,13 +393,15 @@ public class MemoryRecordsTest { ByteBuffer filtered = ByteBuffer.allocate(2048); MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() { @Override - protected boolean shouldDiscard(RecordBatch batch) { + protected BatchRetention checkBatchRetention(RecordBatch batch) { // discard the second and fourth batches - return batch.lastOffset() == 2L || batch.lastOffset() == 6L; + if (batch.lastOffset() == 2L || batch.lastOffset() == 6L) + return BatchRetention.DELETE; + return BatchRetention.DELETE_EMPTY; } @Override - protected boolean shouldRetain(RecordBatch recordBatch, Record record) { + protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { return true; } }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING); @@ -683,7 +775,12 @@ public class MemoryRecordsTest { private static class RetainNonNullKeysFilter extends MemoryRecords.RecordFilter { @Override - public boolean shouldRetain(RecordBatch batch, Record record) { + protected BatchRetention checkBatchRetention(RecordBatch batch) { + return BatchRetention.DELETE_EMPTY; + } + + @Override + public boolean shouldRetainRecord(RecordBatch batch, Record record) { return record.hasKey(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 5222487..d8a86db 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter +import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import scala.collection.mutable import scala.collection.JavaConverters._ @@ -63,7 +64,22 @@ import scala.collection.JavaConverters._ * The cleaner will only retain delete records for a period of time to avoid accumulating space indefinitely. This period of time is configurable on a per-topic * basis and is measured from the time the segment enters the clean portion of the log (at which point any prior message with that key has been removed). * Delete markers in the clean section of the log that are older than this time will not be retained when log segments are being recopied as part of cleaning. - * + * + * Note that cleaning is more complicated with the idempotent/transactional producer capabilities. The following + * are the key points: + * + * 1. In order to maintain sequence number continuity for active producers, we always retain the last batch + * from each producerId, even if all the records from the batch have been removed. The batch will be removed + * once the producer either writes a new batch or is expired due to inactivity. + * 2. We do not clean beyond the last stable offset. This ensures that all records observed by the cleaner have + * been decided (i.e. committed or aborted). In particular, this allows us to use the transaction index to + * collect the aborted transactions ahead of time. + * 3. Records from aborted transactions are removed by the cleaner immediately without regard to record keys. + * 4. Transaction markers are retained until all record batches from the same transaction have been removed and + * a sufficient amount of time has passed to reasonably ensure that an active consumer wouldn't consume any + * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for + * tombstone deletion. + * * @param config Configuration parameters for the cleaner * @param logDirs The directories where offset checkpoints reside * @param logs The pool of logs @@ -478,34 +494,29 @@ private[log] class Cleaner(val id: Int, activeProducers: Map[Long, ProducerIdEntry], stats: CleanerStats) { val logCleanerFilter = new RecordFilter { - var retainLastBatchSequence: Boolean = false - var discardBatchRecords: Boolean = false + var discardBatchRecords: Boolean = _ - override def shouldDiscard(batch: RecordBatch): Boolean = { + override def checkBatchRetention(batch: RecordBatch): BatchRetention = { // we piggy-back on the tombstone retention logic to delay deletion of transaction markers. // note that we will never delete a marker until all the records from that transaction are removed. discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletes) // check if the batch contains the last sequence number for the producer. if so, we cannot // remove the batch just yet or the producer may see an out of sequence error. - if (batch.hasProducerId && activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence)) { - retainLastBatchSequence = true - false - } else { - retainLastBatchSequence = false - discardBatchRecords - } + if (batch.hasProducerId && activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence)) + BatchRetention.RETAIN_EMPTY + else if (discardBatchRecords) + BatchRetention.DELETE + else + BatchRetention.DELETE_EMPTY } - override def shouldRetain(batch: RecordBatch, record: Record): Boolean = { - if (retainLastBatchSequence && batch.lastSequence == record.sequence) - // always retain the record with the last sequence number - true - else if (discardBatchRecords) - // remove the record if the batch would have otherwise been discarded + override def shouldRetainRecord(batch: RecordBatch, record: Record): Boolean = { + if (discardBatchRecords) + // The batch is only retained to preserve producer sequence information; the records can be removed false else - shouldRetainRecord(source, map, retainDeletes, batch, record, stats) + Cleaner.this.shouldRetainRecord(source, map, retainDeletes, batch, record, stats) } } @@ -731,7 +742,8 @@ private[log] class Cleaner(val id: Int, } else { val isAborted = transactionMetadata.onBatchRead(batch) if (isAborted) { - // abort markers are supported in v2 and above, which means count is defined + // If the batch is aborted, do not bother populating the offset map. + // Note that abort markers are supported in v2 and above, which means count is defined. stats.indexMessagesRead(batch.countOrNull) } else { for (record <- batch.asScala) { @@ -849,7 +861,7 @@ private[log] object CleanedTransactionMetadata { private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn], val transactionIndex: Option[TransactionIndex] = None) { val ongoingCommittedTxns = mutable.Set.empty[Long] - val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTxn] + val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTransactionMetadata] /** * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner. @@ -863,14 +875,16 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P val producerId = controlBatch.producerId controlType match { case ControlRecordType.ABORT => - val maybeAbortedTxn = ongoingAbortedTxns.remove(producerId) - maybeAbortedTxn.foreach { abortedTxn => - transactionIndex.foreach(_.append(abortedTxn)) + ongoingAbortedTxns.remove(producerId) match { + // Retain the marker until all batches from the transaction have been removed + case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined => + transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn)) + false + case _ => true } - true case ControlRecordType.COMMIT => - // this marker is eligible for deletion if we didn't traverse any records from the transaction + // This marker is eligible for deletion if we didn't traverse any batches from the transaction !ongoingCommittedTxns.remove(producerId) case _ => false @@ -880,7 +894,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P private def consumeAbortedTxnsUpTo(offset: Long): Unit = { while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) { val abortedTxn = abortedTransactions.dequeue() - ongoingAbortedTxns += abortedTxn.producerId -> abortedTxn + ongoingAbortedTxns += abortedTxn.producerId -> new AbortedTransactionMetadata(abortedTxn) } } @@ -891,11 +905,13 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P def onBatchRead(batch: RecordBatch): Boolean = { consumeAbortedTxnsUpTo(batch.lastOffset) if (batch.isTransactional) { - if (ongoingAbortedTxns.contains(batch.producerId)) - true - else { - ongoingCommittedTxns += batch.producerId - false + ongoingAbortedTxns.get(batch.producerId) match { + case Some(abortedTransactionMetadata) => + abortedTransactionMetadata.lastObservedBatchOffset = Some(batch.lastOffset) + true + case None => + ongoingCommittedTxns += batch.producerId + false } } else { false @@ -903,3 +919,7 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.P } } + +private class AbortedTransactionMetadata(val abortedTxn: AbortedTxn) { + var lastObservedBatchOffset: Option[Long] = None +} http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/main/scala/kafka/log/LogValidator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index e7d7963..b6b20e3 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -78,6 +78,9 @@ private[kafka] object LogValidator extends Logging { if (batch.isControlBatch) throw new InvalidRecordException("Clients are not allowed to write control records") + + if (Option(batch.countOrNull).contains(0)) + throw new InvalidRecordException("Record batches must contain at least one record") } if (batch.isTransactional && toMagic < RecordBatch.MAGIC_VALUE_V2) http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index dabd2d6..19ea699 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -19,7 +19,7 @@ package kafka.log import java.io.File import java.nio._ -import java.nio.file.{Files, Paths} +import java.nio.file.Paths import java.util.Properties import kafka.common._ @@ -107,8 +107,10 @@ class LogCleanerTest extends JUnitSuite { log.roll() cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) - assertEquals(List(2, 3, 3, 4, 1, 4), keysInLog(log)) - assertEquals(List(1, 2, 3, 5, 6, 7), offsetsInLog(log)) + assertEquals(List(2, 5, 7), lastOffsetsPerBatchInLog(log)) + assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1), lastSequencesInLog(log)) + assertEquals(List(2, 3, 1, 4), keysInLog(log)) + assertEquals(List(1, 3, 6, 7), offsetsInLog(log)) // we have to reload the log to validate that the cleaner maintained sequence numbers correctly def reloadLog(): Unit = { @@ -137,8 +139,10 @@ class LogCleanerTest extends JUnitSuite { appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2)) log.roll() cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) - assertEquals(List(3, 3, 4, 1, 4, 2), keysInLog(log)) - assertEquals(List(2, 3, 5, 6, 7, 8), offsetsInLog(log)) + assertEquals(Map(pid1 -> 2, pid2 -> 2, pid3 -> 1, pid4 -> 0), lastSequencesInLog(log)) + assertEquals(List(2, 5, 7, 8), lastOffsetsPerBatchInLog(log)) + assertEquals(List(3, 1, 4, 2), keysInLog(log)) + assertEquals(List(3, 6, 7, 8), offsetsInLog(log)) reloadLog() @@ -266,12 +270,59 @@ class LogCleanerTest extends JUnitSuite { assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log)) // clean again with large delete horizon and verify the marker is removed - cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue) + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 assertEquals(List(2, 1, 3), keysInLog(log)) assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log)) } @Test + def testCommitMarkerRetentionWithEmptyBatch(): Unit = { + val tp = new TopicPartition("test", 0) + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + val producerEpoch = 0.toShort + val producerId = 1L + val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch) + + appendProducer(Seq(2, 3)) // batch last offset is 1 + log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false) + log.roll() + + log.appendAsLeader(record(2, 2), leaderEpoch = 0) + log.appendAsLeader(record(3, 3), leaderEpoch = 0) + log.roll() + + // first time through the records are removed + var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertEquals(List(2, 3), keysInLog(log)) + assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is retained + assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty batch is retained + + // the empty batch remains if cleaned again because it still holds the last sequence + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertEquals(List(2, 3), keysInLog(log)) + assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is still retained + assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty batch is retained + + // append a new record from the producer to allow cleaning of the empty batch + appendProducer(Seq(1)) + log.roll() + + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertEquals(List(2, 3, 1), keysInLog(log)) + assertEquals(List(2, 3, 4, 5), offsetsInLog(log)) // commit marker is still retained + assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch should be gone + + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertEquals(List(2, 3, 1), keysInLog(log)) + assertEquals(List(3, 4, 5), offsetsInLog(log)) // commit marker is gone + assertEquals(List(3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch is gone + } + + @Test def testAbortMarkerRemoval(): Unit = { val tp = new TopicPartition("test", 0) val cleaner = makeCleaner(Int.MaxValue) @@ -301,6 +352,65 @@ class LogCleanerTest extends JUnitSuite { assertEquals(List(4, 5), offsetsInLog(log)) } + @Test + def testAbortMarkerRetentionWithEmptyBatch(): Unit = { + val tp = new TopicPartition("test", 0) + val cleaner = makeCleaner(Int.MaxValue) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer) + val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps)) + + val producerEpoch = 0.toShort + val producerId = 1L + val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch) + + appendProducer(Seq(2, 3)) // batch last offset is 1 + log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false) + log.roll() + + def assertAbortedTransactionIndexed(): Unit = { + val abortedTxns = log.collectAbortedTransactions(0L, 100L) + assertEquals(1, abortedTxns.size) + assertEquals(producerId, abortedTxns.head.producerId) + assertEquals(0, abortedTxns.head.firstOffset) + assertEquals(2, abortedTxns.head.lastOffset) + } + + assertAbortedTransactionIndexed() + + // first time through the records are removed + var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertAbortedTransactionIndexed() + assertEquals(List(), keysInLog(log)) + assertEquals(List(2), offsetsInLog(log)) // abort marker is retained + assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained + + // the empty batch remains if cleaned again because it still holds the last sequence + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertAbortedTransactionIndexed() + assertEquals(List(), keysInLog(log)) + assertEquals(List(2), offsetsInLog(log)) // abort marker is still retained + assertEquals(List(1, 2), lastOffsetsPerBatchInLog(log)) // empty batch is retained + + // now update the last sequence so that the empty batch can be removed + appendProducer(Seq(1)) + log.roll() + + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertAbortedTransactionIndexed() + assertEquals(List(1), keysInLog(log)) + assertEquals(List(2, 3), offsetsInLog(log)) // abort marker is not yet gone because we read the empty batch + assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) // but we do not preserve the empty batch + + dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1 + assertEquals(List(1), keysInLog(log)) + assertEquals(List(3), offsetsInLog(log)) // abort marker is gone + assertEquals(List(3), lastOffsetsPerBatchInLog(log)) + + // we do not bother retaining the aborted transaction in the index + assertEquals(0, log.collectAbortedTransactions(0L, 100L).size) + } + /** * Test log cleaning with logs containing messages larger than default message size */ @@ -404,8 +514,10 @@ class LogCleanerTest extends JUnitSuite { log.roll() cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) - assertEquals(List(0, 0, 1), keysInLog(log)) - assertEquals(List(1, 3, 4), offsetsInLog(log)) + assertEquals(List(1, 3, 4), lastOffsetsPerBatchInLog(log)) + assertEquals(Map(1L -> 0, 2L -> 1, 3L -> 0), lastSequencesInLog(log)) + assertEquals(List(0, 1), keysInLog(log)) + assertEquals(List(3, 4), offsetsInLog(log)) } @Test @@ -425,8 +537,20 @@ class LogCleanerTest extends JUnitSuite { log.roll() cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) - assertEquals(List(3), keysInLog(log)) - assertEquals(List(2, 3), offsetsInLog(log)) + assertEquals(List(2, 3), lastOffsetsPerBatchInLog(log)) + assertEquals(Map(producerId -> 2), lastSequencesInLog(log)) + assertEquals(List(), keysInLog(log)) + assertEquals(List(3), offsetsInLog(log)) + + // Append a new entry from the producer and verify that the empty batch is cleaned up + appendProducer(Seq(1, 5)) + log.roll() + cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset)) + + assertEquals(List(3, 5), lastOffsetsPerBatchInLog(log)) + assertEquals(Map(producerId -> 4), lastSequencesInLog(log)) + assertEquals(List(1, 5), keysInLog(log)) + assertEquals(List(3, 4, 5), offsetsInLog(log)) } @Test @@ -588,6 +712,17 @@ class LogCleanerTest extends JUnitSuite { yield TestUtils.readString(record.key).toInt } + def lastOffsetsPerBatchInLog(log: Log): Iterable[Long] = { + for (segment <- log.logSegments; batch <- segment.log.batches.asScala) + yield batch.lastOffset + } + + def lastSequencesInLog(log: Log): Map[Long, Int] = { + (for (segment <- log.logSegments; + batch <- segment.log.batches.asScala if !batch.isControlBatch && batch.hasProducerId) + yield batch.producerId -> batch.lastSequence).toMap + } + /* extract all the offsets from a log */ def offsetsInLog(log: Log): Iterable[Long] = log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(m => m.offset)) http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 630cfcf..65a4eeb 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -32,6 +32,7 @@ import kafka.server.{BrokerTopicStats, KafkaConfig} import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter +import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.IsolationLevel @@ -401,7 +402,8 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { - override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey + override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY + override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() val filteredRecords = MemoryRecords.readableRecords(filtered) @@ -428,6 +430,49 @@ class LogTest { } @Test + def testRebuildProducerStateWithEmptyCompactedBatch() { + val log = createLog(2048) + val pid = 1L + val epoch = 0.toShort + val seq = 0 + val baseOffset = 23L + + // create an empty batch + val records = TestUtils.records(producerId = pid, producerEpoch = epoch, sequence = seq, baseOffset = baseOffset, records = List( + new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "a".getBytes), + new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "b".getBytes))) + records.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) + + val filtered = ByteBuffer.allocate(2048) + records.filterTo(new TopicPartition("foo", 0), new RecordFilter { + override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.RETAIN_EMPTY + override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = false + }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) + filtered.flip() + val filteredRecords = MemoryRecords.readableRecords(filtered) + + log.appendAsFollower(filteredRecords) + + // append some more data and then truncate to force rebuilding of the PID map + val moreRecords = TestUtils.records(baseOffset = baseOffset + 2, records = List( + new SimpleRecord(System.currentTimeMillis(), "e".getBytes), + new SimpleRecord(System.currentTimeMillis(), "f".getBytes))) + moreRecords.batches.asScala.foreach(_.setPartitionLeaderEpoch(0)) + log.appendAsFollower(moreRecords) + + log.truncateTo(baseOffset + 2) + + val activeProducers = log.activeProducers + assertTrue(activeProducers.contains(pid)) + + val entry = activeProducers(pid) + assertEquals(0, entry.firstSeq) + assertEquals(baseOffset, entry.firstOffset) + assertEquals(1, entry.lastSeq) + assertEquals(baseOffset + 1, entry.lastOffset) + } + + @Test def testUpdatePidMapWithCompactedData() { val log = createLog(2048) val pid = 1L @@ -445,7 +490,8 @@ class LogTest { val filtered = ByteBuffer.allocate(2048) records.filterTo(new TopicPartition("foo", 0), new RecordFilter { - override def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey + override def checkBatchRetention(batch: RecordBatch): BatchRetention = RecordFilter.BatchRetention.DELETE_EMPTY + override def shouldRetainRecord(recordBatch: RecordBatch, record: Record): Boolean = !record.hasKey }, filtered, Int.MaxValue, BufferSupplier.NO_CACHING) filtered.flip() val filteredRecords = MemoryRecords.readableRecords(filtered) http://git-wip-us.apache.org/repos/asf/kafka/blob/41fff32a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index 3ab9732..2225ca6 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -19,7 +19,7 @@ package kafka.log import java.nio.ByteBuffer import kafka.common.LongRef -import kafka.message.{DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec} +import kafka.message.{CompressionCodec, DefaultCompressionCodec, GZIPCompressionCodec, NoCompressionCodec, SnappyCompressionCodec} import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.test.TestUtils @@ -909,6 +909,44 @@ class LogValidatorTest { isFromClient = true) } + @Test(expected = classOf[InvalidRecordException]) + def testCompressedBatchWithoutRecordsNotAllowed(): Unit = { + testBatchWithoutRecordsNotAllowed(DefaultCompressionCodec, DefaultCompressionCodec) + } + + @Test(expected = classOf[InvalidRecordException]) + def testUncompressedBatchWithoutRecordsNotAllowed(): Unit = { + testBatchWithoutRecordsNotAllowed(NoCompressionCodec, NoCompressionCodec) + } + + @Test(expected = classOf[InvalidRecordException]) + def testRecompressedBatchWithoutRecordsNotAllowed(): Unit = { + testBatchWithoutRecordsNotAllowed(NoCompressionCodec, DefaultCompressionCodec) + } + + private def testBatchWithoutRecordsNotAllowed(sourceCodec: CompressionCodec, targetCodec: CompressionCodec): Unit = { + val offset = 1234567 + val (producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch) = + (1324L, 10.toShort, 984, true, 40) + val buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD) + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, + baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), + isTransactional, false) + buffer.flip() + val records = MemoryRecords.readableRecords(buffer) + LogValidator.validateMessagesAndAssignOffsets(records, + offsetCounter = new LongRef(offset), + now = System.currentTimeMillis(), + sourceCodec = sourceCodec, + targetCodec = targetCodec, + compactedTopic = false, + magic = RecordBatch.CURRENT_MAGIC_VALUE, + timestampType = TimestampType.CREATE_TIME, + timestampDiffMaxMs = 5000L, + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH, + isFromClient = true) + } + private def createRecords(magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE, timestamp: Long = RecordBatch.NO_TIMESTAMP, codec: CompressionType = CompressionType.NONE): MemoryRecords = {