[ 
https://issues.apache.org/jira/browse/KAFKA-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051313#comment-16051313
 ] 

Peter Davis commented on KAFKA-4602:
------------------------------------

Does this address increased memory used by decompression of compressed messages 
during down-conversion for an older consumer?

Here's a stack trace from 0.10.2.  As far as we can tell, a bad client using 
the old consumer made a FetchRequest with maxBytes=1000000000 (1 billion).  Two 
problems:
* With socket.request.max.bytes=100MB on the broker, shouldn't the fetch size 
be limited, regardless?
* The request was for a topic with large, highly compressed messages.  A heap 
dump shows *6GB* of log records for apparently held in memory for just this one 
request before it threw the OutOfMemoryError.  (100MB -> 5GB would be 
consistent without our compression ratio.)

{code}
[2017-06-09 15:16:59,937] ERROR [KafkaApi-105] Error when handling request 
{replica_id=-1,max_wait_time=0,min_bytes=0,topics=[{topic=transportation.unclassified.legacy-export-consol.export-consol-changed-v2,partitions=[{partition=0,fetch
_offset=19,max_bytes=1000000000}]}]} (kafka.server.KafkaApis)
java.lang.OutOfMemoryError: Java heap space
        at 
java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:186)
        at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
        at 
org.apache.kafka.common.record.LogEntry.writeHeader(LogEntry.java:143)
        at 
org.apache.kafka.common.record.MemoryRecordsBuilder.appendUnchecked(MemoryRecordsBuilder.java:302)
        at 
org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:324)
        at 
org.apache.kafka.common.record.MemoryRecords.builderWithEntries(MemoryRecords.java:411)
        at 
org.apache.kafka.common.record.MemoryRecords.builderWithEntries(MemoryRecords.java:393)
        at 
org.apache.kafka.common.record.MemoryRecords.withLogEntries(MemoryRecords.java:378)
        at 
org.apache.kafka.common.record.MemoryRecords.withLogEntries(MemoryRecords.java:337)
        at 
org.apache.kafka.common.record.AbstractRecords.toMessageFormat(AbstractRecords.java:79)
        at kafka.server.KafkaApis$$anonfun$28.apply(KafkaApis.scala:481)
        at kafka.server.KafkaApis$$anonfun$28.apply(KafkaApis.scala:468)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$3(KafkaApis.scala:468)
        at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:538)
        at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:538)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:478)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:530)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:81)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62)
        at java.lang.Thread.run(Thread.java:745)
{code}
!screenshot-1.png!

> KIP-72 Allow putting a bound on memory consumed by Incoming requests
> --------------------------------------------------------------------
>
>                 Key: KAFKA-4602
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4602
>             Project: Kafka
>          Issue Type: New Feature
>          Components: core
>            Reporter: radai rosenblatt
>            Assignee: radai rosenblatt
>             Fix For: 0.11.1.0
>
>         Attachments: screenshot-1.png
>
>
> this issue tracks the implementation of KIP-72, as outlined here - 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to