[ https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-6512. ----------------------------------- Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 Implemented options 1) and 2) from the description. > Java Producer: Excessive memory usage with compression enabled > -------------------------------------------------------------- > > Key: KAFKA-6512 > URL: https://issues.apache.org/jira/browse/KAFKA-6512 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 1.0.0 > Environment: Windows 10 > Reporter: Kyle Tinker > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.1.0 > > Attachments: KafkaSender.java > > > h2. User Story > As a user of the Java producer, I want a predictable memory usage for the > Kafka client so that I can ensure that my system is sized appropriately and > will be stable even under heavy usage. > As a user of the Java producer, I want a smaller memory footprint so that my > systems don't consume as many resources. > h2. Acceptance Criteria > * Enabling Compression in Kafka should not significantly increase the memory > usage of Kafka > * The memory usage of Kafka's Java Producer should be roughly in line with > the buffer size (buffer.memory) and the number of producers declared. > h2. Additional Information > I've observed high memory usage in the producer when enabling compression > (gzip or lz4). I don't observe the behavior with compression off, but with > it on I'll run out of heap (2GB). Using a Java profiler, I see the data is > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but > is not successful. I'm most likely network bottlenecked, so I expect the > producer buffers to be full while the job is running and potentially a lot of > unacknowledged records. > I've tried using the default buffer.memory with 20 producers (across 20 > threads) and sending data as quickly as I can. I've also tried 1MB of > buffer.memory, which seemed to reduce memory consumption but I could still > run OOM in certain cases. I have max.in.flight.requests.per.connection set > to 1. In short, I should only have ~20 MB (20* 1MB) of data in buffers, but > I can easily exhaust 2000 MB used by Kafka. > In looking at the code more, it looks like the KafkaLZ4BlockOutputStream > doesn't clear the compressedBuffer or buffer when close() is called. In my > heap dump, both of those are ~65k size each, meaning that each batch is > taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 > and messages are 1k each until the batch fills). > Kafka tries to manage memory usage by calling > MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release > resources required for record appends (e.g. compression buffers)". However, > this method doesn't actually clear those buffers because > KafkaLZ4BlockOutputStream.close() only writes the block and end mark and > closes the output stream. It doesn't actually clear the buffer and > compressedBuffer in KafkaLZ4BlockOutputStream. Those stay allocated in RAM > until the block is acknowledged by the broker, processed in > Sender:handleProduceResponse(), and the batch is deallocated. This memory > usage therefore increases, possibly without bound. In my test program, the > program died with approximately 345 unprocessed batches per producer (20 > producers), despite having max.in.flight.requests.per.connection=1. > h2. Steps to Reproduce > # Create a topic test with plenty of storage > # Use a connection with a very fast upload pipe and limited download. This > allows the outbound data to go out, but acknowledgements to be delayed > flowing in. > # Download KafkaSender.java (attached to this ticket) > # Set line 17 to reference your Kafka broker > # Run the program with a 1GB Xmx value > h2. Possible solutions > There are a few possible optimizations I can think of: > # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as > non-final and null them in the close() method > # We could declare the MemoryRecordsBuilder.appendStream non-final and null > it in the closeForRecordAppends() method > # We could have the ProducerBatch discard the recordsBuilder in > closeForRecordAppends(), however, this is likely a bad idea because the > recordsBuilder contains significant metadata that is likely needed after the > stream is closed. It is also final. > # We could try to limit the number of non-acknowledged batches in flight. > This would bound the maximum memory usage but may negatively impact > performance. > > Fix #1 would only improve the LZ4 algorithm, and not any other algorithms. > Fix #2 would improve all algorithms, compression and otherwise. Of the 3 > proposed here, it seems the best. This would also involve having to check > appendStreamIsClosed in every usage of appendStream within > MemoryRecordsBuilder to avoid NPE's. > Fix #4 is likely necessary if we want to bound the maximum memory usage of > Kafka. Removing the buffers in Fix 1 or 2 will reduce the memory usage by > ~90%, but theoretically there is still no limit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)