Kyle Tinker created KAFKA-6512:
----------------------------------
Summary: Java Producer: Excessive memory usage with compression
enabled
Key: KAFKA-6512
URL: https://issues.apache.org/jira/browse/KAFKA-6512
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 1.0.0
Environment: Windows 10
Reporter: Kyle Tinker
Attachments: KafkaSender.java
h2. User Story
As a user of the Java producer, I want a predictable memory usage for the Kafka
client so that I can ensure that my system is sized appropriately and will be
stable even under heavy usage.
As a user of the Java producer, I want a smaller memory footprint so that my
systems don't consume as many resources.
h2. Acceptance Criteria
* Enabling Compression in Kafka should not significantly increase the memory
usage of Kafka
* The memory usage of Kafka's Java Producer should be roughly in line with the
buffer size (buffer.memory) and the number of producers declared.
h2. Additional Information
I've observed high memory usage in the producer when enabling compression (gzip
or lz4). I don't observe the behavior with compression off, but with it on
I'll run out of heap (2GB). Using a Java profiler, I see the data is in the
KafkaLZ4BlockOutputStream (or related class for gzip). I see that
MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but
is not successful. I'm most likely network bottlenecked, so I expect the
producer buffers to be full while the job is running and potentially a lot of
unacknowledged records.
I've tried using the default buffer.memory with 20 producers (across 20
threads) and sending data as quickly as I can. I've also tried 1MB of
buffer.memory, which seemed to reduce memory consumption but I could still run
OOM in certain cases. I have max.in.flight.requests.per.connection set to 1.
In short, I should only have ~20 MB (20* 1MB) of data in buffers, but I can
easily exhaust 2000 MB used by Kafka.
In looking at the code more, it looks like the KafkaLZ4BlockOutputStream
doesn't clear the compressedBuffer or buffer when close() is called. In my
heap dump, both of those are ~65k size each, meaning that each batch is taking
up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 and
messages are 1k each until the batch fills).
Kafka tries to manage memory usage by calling
MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release
resources required for record appends (e.g. compression buffers)". However,
this method doesn't actually clear those buffers because
KafkaLZ4BlockOutputStream.close() only writes the block and end mark and closes
the output stream. It doesn't actually clear the buffer and compressedBuffer
in KafkaLZ4BlockOutputStream. Those stay allocated in RAM until the block is
acknowledged by the broker, processed in Sender:handleProduceResponse(), and
the batch is deallocated. This memory usage therefore increases, possibly
without bound. In my test program, the program died with approximately 345
unprocessed batches per producer (20 producers), despite having
max.in.flight.requests.per.connection=1.
h2. Steps to Reproduce
# Create a topic test with plenty of storage
# Use a connection with a very fast upload pipe and limited download. This
allows the outbound data to go out, but acknowledgements to be delayed flowing
in.
# Download KafkaSender.java (attached to this ticket)
# Set line 17 to reference your Kafka broker
# Run the program with a 1GB Xmx value
h2. Possible solutions
There are a few possible optimizations I can think of:
# We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as
non-final and null them in the close() method
# We could declare the MemoryRecordsBuilder.appendStream non-final and null it
in the closeForRecordAppends() method
# We could have the ProducerBatch discard the recordsBuilder in
closeForRecordAppends(), however, this is likely a bad idea because the
recordsBuilder contains significant metadata that is likely needed after the
stream is closed. It is also final.
# We could try to limit the number of non-acknowledged batches in flight.
This would bound the maximum memory usage but may negatively impact performance.
Fix #1 would only improve the LZ4 algorithm, and not any other algorithms.
Fix #2 would improve all algorithms, compression and otherwise. Of the 3
proposed here, it seems the best. This would also involve having to check
appendStreamIsClosed in every usage of appendStream within MemoryRecordsBuilder
to avoid NPE's.
Fix #4 is likely necessary if we want to bound the maximum memory usage of
Kafka. Removing the buffers in Fix 1 or 2 will reduce the memory usage by
~90%, but theoretically there is still no limit.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)