Repository: kafka Updated Branches: refs/heads/trunk 040fde8ec -> b9b2cfc28
MINOR: Close the producer batch append stream when the batch gets full to free up resources Of particular importance are compression buffers (64 KB for LZ4, for example). Author: Apurva Mehta <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2796 from apurvam/idempotent-producer-close-data-stream Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9b2cfc2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9b2cfc2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9b2cfc2 Branch: refs/heads/trunk Commit: b9b2cfc28c055745f606ba1a20c5e8e428acefcf Parents: 040fde8 Author: Apurva Mehta <[email protected]> Authored: Tue Apr 4 00:13:47 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Apr 4 00:13:56 2017 +0100 ---------------------------------------------------------------------- .../producer/internals/ProducerBatch.java | 8 ++++ .../producer/internals/RecordAccumulator.java | 14 ++++-- .../common/record/MemoryRecordsBuilder.java | 47 +++++++++++++++++--- core/src/main/scala/kafka/log/Log.scala | 2 - core/src/main/scala/kafka/log/LogCleaner.scala | 2 +- 5 files changed, 61 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 4f68c20..9621794 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -234,6 +234,14 @@ public final class ProducerBatch { recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence); } + /** + * Release resources required for record appends (e.g. compression buffers). Once this method is called, it's only + * possible to update the RecordBatch header. + */ + public void closeForRecordAppends() { + recordsBuilder.closeForRecordAppends(); + } + public void close() { recordsBuilder.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 299356e..be03142 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -237,16 +237,22 @@ public final class RecordAccumulator { } /** - * Try to append to a ProducerBatch. If it is full, we return null and a new batch is created. If the existing batch is - * full, it will be closed right before send, or if it is expired, or when the producer is closed, whichever - * comes first. + * Try to append to a ProducerBatch. + * + * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up + * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written + * and memory records built) in one of the following cases (whichever comes first): right before send, + * if it is expired, or when the producer is closed. */ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) { ProducerBatch last = deque.peekLast(); if (last != null) { FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); - if (future != null) + if (future == null) + last.closeForRecordAppends(); + else return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false); + } return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index d238093..7f66193 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -30,6 +30,10 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; * This class is used to write new log data in memory, i.e. this is the write path for {@link MemoryRecords}. * It transparently handles compression and exposes methods for appending new records, possibly with message * format conversion. + * + * In cases where keeping memory retention low is important and there's a gap between the time that record appends stop + * and the builder is closed (e.g. the Producer), it's important to call `closeForRecordAppends` when the former happens. + * This will release resources like compression buffers that can be relatively large (64 KB for LZ4). */ public class MemoryRecordsBuilder { private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; @@ -50,7 +54,11 @@ public class MemoryRecordsBuilder { private final TimestampType timestampType; private final CompressionType compressionType; + // Used to append records, may compress data on the fly private final DataOutputStream appendStream; + // Used to hold a reference to the underlying ByteBuffer so that we can write the record batch header and access + // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if the existing one is not large enough, + // so it's not safe to hold a direct reference to the underlying ByteBuffer. private final ByteBufferOutputStream bufferStream; private final byte magic; private final int initPos; @@ -61,6 +69,7 @@ public class MemoryRecordsBuilder { private final int writeLimit; private final int initialCapacity; + private boolean appendStreamIsClosed = false; private long producerId; private short producerEpoch; private int baseSequence; @@ -206,15 +215,26 @@ public class MemoryRecordsBuilder { this.baseSequence = baseSequence; } + /** + * Release resources required for record appends (e.g. compression buffers). Once this method is called, it's only + * possible to update the RecordBatch header. + */ + public void closeForRecordAppends() { + if (!appendStreamIsClosed) { + try { + appendStream.close(); + appendStreamIsClosed = true; + } catch (IOException e) { + throw new KafkaException(e); + } + } + } + public void close() { if (builtRecords != null) return; - try { - appendStream.close(); - } catch (IOException e) { - throw new KafkaException(e); - } + closeForRecordAppends(); if (numRecords == 0L) { buffer().position(initPos); @@ -233,6 +253,7 @@ public class MemoryRecordsBuilder { } private void writeDefaultBatchHeader() { + ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); buffer.position(initPos); @@ -257,6 +278,7 @@ public class MemoryRecordsBuilder { } private void writeLegacyCompressedWrapperHeader() { + ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); buffer.position(initPos); @@ -416,6 +438,7 @@ public class MemoryRecordsBuilder { * @param record The record to add */ public void appendUncheckedWithOffset(long offset, LegacyRecord record) { + ensureOpenForRecordAppend(); try { int size = record.sizeInBytes(); AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size); @@ -469,6 +492,7 @@ public class MemoryRecordsBuilder { private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { + ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - baseTimestamp; long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers); @@ -478,6 +502,7 @@ public class MemoryRecordsBuilder { } private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException { + ensureOpenForRecordAppend(); if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME) timestamp = logAppendTime; @@ -515,6 +540,18 @@ public class MemoryRecordsBuilder { } } + private void ensureOpenForRecordAppend() { + if (appendStreamIsClosed) + throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed for record appends"); + if (isClosed()) + throw new IllegalStateException("Tried to append a record, but MemoryRecordsBuilder is closed"); + } + + private void ensureOpenForRecordBatchWrite() { + if (isClosed()) + throw new IllegalStateException("Tried to write record batch header, but MemoryRecordsBuilder is closed"); + } + /** * Get an estimate of the number of bytes written (based on the estimation factor hard-coded in {@link CompressionType}. * @return The estimated number of bytes written http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 95a6896..a052a9e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -578,8 +578,6 @@ class Log(@volatile var dir: File, val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]() for (batch <- records.batches.asScala) { - if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0) - throw new InvalidRecordException("Client produce requests should not have more than one batch") // update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. http://git-wip-us.apache.org/repos/asf/kafka/blob/b9b2cfc2/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 830f906..d0e8ec4 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -499,7 +499,7 @@ private[log] class Cleaner(val id: Int, if (record.isControlRecord) return true - // retain the entry if it is the last one produced by an active idempotent producer to ensure that + // retain the record if it is the last one produced by an active idempotent producer to ensure that // the PID is not removed from the log before it has been expired if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset)) return true
