[ 
https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365989#comment-16365989
 ] 

ASF GitHub Bot commented on KAFKA-6512:
---------------------------------------

rajinisivaram closed pull request #4570: KAFKA-6512: Discard references to 
buffers used for compression
URL: https://github.com/apache/kafka/pull/4570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
index 8cfc37be826..591ab169364 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -34,7 +33,7 @@
  *
  * This class is not thread-safe.
  */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+public final class KafkaLZ4BlockOutputStream extends OutputStream {
 
     public static final int MAGIC = 0x184D2204;
     public static final int LZ4_MAX_HEADER_LENGTH = 19;
@@ -52,9 +51,10 @@
     private final boolean useBrokenFlagDescriptorChecksum;
     private final FLG flg;
     private final BD bd;
-    private final byte[] buffer;
-    private final byte[] compressedBuffer;
     private final int maxBlockSize;
+    private OutputStream out;
+    private byte[] buffer;
+    private byte[] compressedBuffer;
     private int bufferOffset;
     private boolean finished;
 
@@ -71,7 +71,7 @@
      * @throws IOException
      */
     public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean 
blockChecksum, boolean useBrokenFlagDescriptorChecksum) throws IOException {
-        super(out);
+        this.out = out;
         compressor = LZ4Factory.fastestInstance().fastCompressor();
         checksum = XXHashFactory.fastestInstance().hash32();
         this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
@@ -204,7 +204,6 @@ private void writeBlock() throws IOException {
     private void writeEndMark() throws IOException {
         ByteUtils.writeUnsignedIntLE(out, 0);
         // TODO implement content checksum, update flg.validate()
-        finished = true;
     }
 
     @Override
@@ -259,15 +258,26 @@ private void ensureNotFinished() {
 
     @Override
     public void close() throws IOException {
-        if (!finished) {
-            // basically flush the buffer writing the last block
-            writeBlock();
-            // write the end block and finish the stream
-            writeEndMark();
-        }
-        if (out != null) {
-            out.close();
-            out = null;
+        try {
+            if (!finished) {
+                // basically flush the buffer writing the last block
+                writeBlock();
+                // write the end block
+                writeEndMark();
+            }
+        } finally {
+            try {
+                if (out != null) {
+                    try (OutputStream outStream = out) {
+                        outStream.flush();
+                    }
+                }
+            } finally {
+                out = null;
+                buffer = null;
+                compressedBuffer = null;
+                finished = true;
+            }
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a9b57ac22df..6f6404fa2d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -23,6 +23,7 @@
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import static org.apache.kafka.common.utils.Utils.wrapNullable;
@@ -38,11 +39,15 @@
  */
 public class MemoryRecordsBuilder {
     private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    private static final DataOutputStream CLOSED_STREAM = new 
DataOutputStream(new OutputStream() {
+        @Override
+        public void write(int b) throws IOException {
+            throw new IllegalStateException("MemoryRecordsBuilder is closed 
for record appends");
+        }
+    });
 
     private final TimestampType timestampType;
     private final CompressionType compressionType;
-    // Used to append records, may compress data on the fly
-    private final DataOutputStream appendStream;
     // Used to hold a reference to the underlying ByteBuffer so that we can 
write the record batch header and access
     // the written bytes. ByteBufferOutputStream allocates a new ByteBuffer if 
the existing one is not large enough,
     // so it's not safe to hold a direct reference to the underlying 
ByteBuffer.
@@ -60,7 +65,8 @@
     // from previous batches before appending any records.
     private float estimatedCompressionRatio = 1.0F;
 
-    private boolean appendStreamIsClosed = false;
+    // Used to append records, may compress data on the fly
+    private DataOutputStream appendStream;
     private boolean isTransactional;
     private long producerId;
     private short producerEpoch;
@@ -265,12 +271,13 @@ public void overrideLastOffset(long lastOffset) {
      * possible to update the RecordBatch header.
      */
     public void closeForRecordAppends() {
-        if (!appendStreamIsClosed) {
+        if (appendStream != CLOSED_STREAM) {
             try {
                 appendStream.close();
-                appendStreamIsClosed = true;
             } catch (IOException e) {
                 throw new KafkaException(e);
+            } finally {
+                appendStream = CLOSED_STREAM;
             }
         }
     }
@@ -663,7 +670,7 @@ private void recordWritten(long offset, long timestamp, int 
size) {
     }
 
     private void ensureOpenForRecordAppend() {
-        if (appendStreamIsClosed)
+        if (appendStream == CLOSED_STREAM)
             throw new IllegalStateException("Tried to append a record, but 
MemoryRecordsBuilder is closed for record appends");
     }
 
@@ -738,7 +745,7 @@ public boolean isClosed() {
     public boolean isFull() {
         // note that the write limit is respected only after the first record 
is added which ensures we can always
         // create non-empty batches (this is used to disable batching when the 
producer's batch size is set to 0).
-        return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit 
<= estimatedBytesWritten());
+        return appendStream == CLOSED_STREAM || (this.numRecords > 0 && 
this.writeLimit <= estimatedBytesWritten());
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c713d17f615..a90fb299c52 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -28,6 +28,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -644,6 +645,39 @@ public void 
shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws Excepti
         return values;
     }
 
+    @Test
+    public void testBuffersDereferencedOnClose() {
+        Runtime runtime = Runtime.getRuntime();
+        int payloadLen = 1024 * 1024;
+        ByteBuffer buffer = ByteBuffer.allocate(payloadLen * 2);
+        byte[] key = new byte[0];
+        byte[] value = new byte[payloadLen];
+        new Random().nextBytes(value); // Use random payload so that 
compressed buffer is large
+        List<MemoryRecordsBuilder> builders = new ArrayList<>(100);
+        long startMem = 0;
+        long memUsed = 0;
+        int iterations =  0;
+        while (iterations++ < 100) {
+            buffer.rewind();
+            MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, 
RecordBatch.MAGIC_VALUE_V2, compressionType,
+                    TimestampType.CREATE_TIME, 0L, 0L, 
RecordBatch.NO_PRODUCER_ID,
+                    RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 
false, false,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH, 0);
+            builder.append(1L, new byte[0], value);
+            builder.build();
+            builders.add(builder);
+
+            System.gc();
+            memUsed = runtime.totalMemory() - runtime.freeMemory() - startMem;
+            // Ignore memory usage during initialization
+            if (iterations == 2)
+                startMem = memUsed;
+            else if (iterations > 2 && memUsed < (iterations - 2) * 1024)
+                break;
+        }
+        assertTrue("Memory usage too high: " + memUsed, iterations < 100);
+    }
+
     private void verifyRecordsProcessingStats(RecordsProcessingStats 
processingStats, int numRecords,
             int numRecordsConverted, long finalBytes, long preConvertedBytes) {
         assertNotNull("Records processing info is null", processingStats);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Java Producer: Excessive memory usage with compression enabled
> --------------------------------------------------------------
>
>                 Key: KAFKA-6512
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6512
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.0.0
>         Environment: Windows 10
>            Reporter: Kyle Tinker
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.2.0
>
>         Attachments: KafkaSender.java
>
>
> h2. User Story
> As a user of the Java producer, I want a predictable memory usage for the 
> Kafka client so that I can ensure that my system is sized appropriately and 
> will be stable even under heavy usage.
> As a user of the Java producer, I want a smaller memory footprint so that my 
> systems don't consume as many resources.
> h2. Acceptance Criteria
>  * Enabling Compression in Kafka should not significantly increase the memory 
> usage of Kafka
>  * The memory usage of Kafka's Java Producer should be roughly in line with 
> the buffer size (buffer.memory) and the number of producers declared.
> h2. Additional Information
> I've observed high memory usage in the producer when enabling compression 
> (gzip or lz4).  I don't observe the behavior with compression off, but with 
> it on I'll run out of heap (2GB).  Using a Java profiler, I see the data is 
> in the KafkaLZ4BlockOutputStream (or related class for gzip).   I see that 
> MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but 
> is not successful.  I'm most likely network bottlenecked, so I expect the 
> producer buffers to be full while the job is running and potentially a lot of 
> unacknowledged records.
> I've tried using the default buffer.memory with 20 producers (across 20 
> threads) and sending data as quickly as I can.  I've also tried 1MB of 
> buffer.memory, which seemed to reduce memory consumption but I could still 
> run OOM in certain cases.  I have max.in.flight.requests.per.connection set 
> to 1.  In short, I should only have ~20 MB (20* 1MB) of data in buffers, but 
> I can easily exhaust 2000 MB used by Kafka.
> In looking at the code more, it looks like the KafkaLZ4BlockOutputStream 
> doesn't clear the compressedBuffer or buffer when close() is called.  In my 
> heap dump, both of those are ~65k size each, meaning that each batch is 
> taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 
> and messages are 1k each until the batch fills).
> Kafka tries to manage memory usage by calling 
> MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release 
> resources required for record appends (e.g. compression buffers)".  However, 
> this method doesn't actually clear those buffers because 
> KafkaLZ4BlockOutputStream.close() only writes the block and end mark and 
> closes the output stream.  It doesn't actually clear the buffer and 
> compressedBuffer in KafkaLZ4BlockOutputStream.  Those stay allocated in RAM 
> until the block is acknowledged by the broker, processed in 
> Sender:handleProduceResponse(), and the batch is deallocated.  This memory 
> usage therefore increases, possibly without bound.  In my test program, the 
> program died with approximately 345 unprocessed batches per producer (20 
> producers), despite having max.in.flight.requests.per.connection=1.
> h2. Steps to Reproduce
>  # Create a topic test with plenty of storage
>  # Use a connection with a very fast upload pipe and limited download.  This 
> allows the outbound data to go out, but acknowledgements to be delayed 
> flowing in.
>  # Download KafkaSender.java (attached to this ticket)
>  # Set line 17 to reference your Kafka broker
>  # Run the program with a 1GB Xmx value
> h2. Possible solutions
> There are a few possible optimizations I can think of:
>  # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as 
> non-final and null them in the close() method
>  # We could declare the MemoryRecordsBuilder.appendStream non-final and null 
> it in the closeForRecordAppends() method
>  # We could have the ProducerBatch discard the recordsBuilder in 
> closeForRecordAppends(), however, this is likely a bad idea because the 
> recordsBuilder contains significant metadata that is likely needed after the 
> stream is closed.  It is also final.
>  # We could try to limit the number of non-acknowledged batches in flight.  
> This would bound the maximum memory usage but may negatively impact 
> performance.
>  
> Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
> Fix #2 would improve all algorithms, compression and otherwise.  Of the 3 
> proposed here, it seems the best.  This would also involve having to check 
> appendStreamIsClosed in every usage of appendStream within 
> MemoryRecordsBuilder to avoid NPE's.
> Fix #4 is likely necessary if we want to bound the maximum memory usage of 
> Kafka.  Removing the buffers in Fix 1 or 2 will reduce the memory usage by 
> ~90%, but theoretically there is still no limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to