[ https://issues.apache.org/jira/browse/KAFKA-4602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051313#comment-16051313 ]
Peter Davis commented on KAFKA-4602: ------------------------------------ Does this address increased memory used by decompression of compressed messages during down-conversion for an older consumer? Here's a stack trace from 0.10.2. As far as we can tell, a bad client using the old consumer made a FetchRequest with maxBytes=1000000000 (1 billion). Two problems: * With socket.request.max.bytes=100MB on the broker, shouldn't the fetch size be limited, regardless? * The request was for a topic with large, highly compressed messages. A heap dump shows *6GB* of log records for apparently held in memory for just this one request before it threw the OutOfMemoryError. (100MB -> 5GB would be consistent without our compression ratio.) {code} [2017-06-09 15:16:59,937] ERROR [KafkaApi-105] Error when handling request {replica_id=-1,max_wait_time=0,min_bytes=0,topics=[{topic=transportation.unclassified.legacy-export-consol.export-consol-changed-v2,partitions=[{partition=0,fetch _offset=19,max_bytes=1000000000}]}]} (kafka.server.KafkaApis) java.lang.OutOfMemoryError: Java heap space at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:186) at java.io.DataOutputStream.writeInt(DataOutputStream.java:197) at org.apache.kafka.common.record.LogEntry.writeHeader(LogEntry.java:143) at org.apache.kafka.common.record.MemoryRecordsBuilder.appendUnchecked(MemoryRecordsBuilder.java:302) at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:324) at org.apache.kafka.common.record.MemoryRecords.builderWithEntries(MemoryRecords.java:411) at org.apache.kafka.common.record.MemoryRecords.builderWithEntries(MemoryRecords.java:393) at org.apache.kafka.common.record.MemoryRecords.withLogEntries(MemoryRecords.java:378) at org.apache.kafka.common.record.MemoryRecords.withLogEntries(MemoryRecords.java:337) at org.apache.kafka.common.record.AbstractRecords.toMessageFormat(AbstractRecords.java:79) at kafka.server.KafkaApis$$anonfun$28.apply(KafkaApis.scala:481) at kafka.server.KafkaApis$$anonfun$28.apply(KafkaApis.scala:468) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$3(KafkaApis.scala:468) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:538) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:538) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:478) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:530) at kafka.server.KafkaApis.handle(KafkaApis.scala:81) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:62) at java.lang.Thread.run(Thread.java:745) {code} !screenshot-1.png! > KIP-72 Allow putting a bound on memory consumed by Incoming requests > -------------------------------------------------------------------- > > Key: KAFKA-4602 > URL: https://issues.apache.org/jira/browse/KAFKA-4602 > Project: Kafka > Issue Type: New Feature > Components: core > Reporter: radai rosenblatt > Assignee: radai rosenblatt > Fix For: 0.11.1.0 > > Attachments: screenshot-1.png > > > this issue tracks the implementation of KIP-72, as outlined here - > https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests -- This message was sent by Atlassian JIRA (v6.4.14#64029)