[ 
https://issues.apache.org/jira/browse/KAFKA-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244022#comment-16244022
 ] 

Ismael Juma commented on KAFKA-6185:
------------------------------------

[~rsivaram] and I have been looking at this and the current thinking is that it 
is a regression caused by 
https://github.com/apache/kafka/commit/47ee8e954df62b9a79099e944ec4be29afe046f6#diff-d71b50516bd2143d208c14563842390aR92
 (KIP-72). `KafkaChannel` references are retained by `explicitlyMutedChannels` 
even after the channel is closed. If down conversion is taking place, the whole 
record batch will be retained by the channel and this can easily lead to an OOM 
(500 10 MB messages would exhaust a 5 GB heap). Verification of the theory and 
a fix are in progress.

> Selector memory leak with high likelihood of OOM in case of down conversion
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6185
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6185
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>         Environment: Ubuntu 14.04.5 LTS
> 5 brokers: 1&2 on 1.0.0 3,4,5 on 0.11.0.1
> inter.broker.protocol.version=0.11.0.1
> log.message.format.version=0.11.0.1
> clients a mix of 0.9, 0.10, 0.11
>            Reporter: Brett Rann
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>              Labels: regression
>             Fix For: 1.0.1
>
>         Attachments: Kafka_Internals___Datadog.png, 
> Kafka_Internals___Datadog.png
>
>
> We are testing 1.0.0 in a couple of environments.
> Both have about 5 brokers, with two 1.0.0 brokers and the rest 0.11.0.1 
> brokers.
> One is using on disk message format 0.9.0.1, the other 0.11.0.1
> we have 0.9, 0.10, and 0.11 clients connecting.
> The cluster on the 0.9.0.1 format is running fine for a week.
> But the cluster on the 0.11.0.1 format is consistently having memory issues, 
> only on the two upgraded brokers running 1.0.0.
> The first occurrence of the error comes along with this stack trace
> {noformat}
> {"timestamp":"2017-11-06 
> 14:22:32,402","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-7","message":"[KafkaApi-1]
>  Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.users,partitions=[{partition=0,fetch_offset=227537,max_bytes=11000000},{partition=4,fetch_offset=354468,max_bytes=11000000},{partition=5,fetch_offset=266524,max_bytes=11000000},{partition=8,fetch_offset=324562,max_bytes=11000000},{partition=10,fetch_offset=292931,max_bytes=11000000},{partition=12,fetch_offset=325718,max_bytes=11000000},{partition=15,fetch_offset=229036,max_bytes=11000000}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
>         at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>         at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>         at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
>         at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
>         at scala.Option.map(Option.scala:146)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
>         at scala.Option.flatMap(Option.scala:171)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
>         at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
>         at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
>         at 
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
>         at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
>         at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
>         at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587)
>         at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>         at 
> kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604)
>         at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820)
>         at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:100)
> {noformat}
> And then after a few of those it settles into this kind of pattern
> {noformat}
> {"timestamp":"2017-11-06 
> 15:06:48,114","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-1","message":"[KafkaApi-1]
>  Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.accounts,partitions=[{partition=4,fetch_offset=560631,max_bytes=11000000},{partition=8,fetch_offset=557589,max_bytes=11000000},{partition=12,fetch_offset=551712,max_bytes=11000000}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
> {"timestamp":"2017-11-06 
> 15:06:48,811","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-7","message":"[KafkaApi-1]
>  Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.accounts,partitions=[{partition=4,fetch_offset=560631,max_bytes=11000000},{partition=8,fetch_offset=557589,max_bytes=11000000},{partition=12,fetch_offset=551712,max_bytes=11000000}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
> {noformat}
> I've attached the heap use graphs. It steadily increases to max at which time 
> the error starts appearing.
> I've tripled the heap space for one of the 1.0.0 hosts to see what happens, 
> and it similarly climbs to near 6, then similarly starts having 
> java.lang.OutOfMemoryError errors. I've attached those heap space graphs 
> also, where the line that starts climbing from 2gb was when it was restarted 
> with 6gb heap. The out of memory error started right at the peak of the 
> flatline.
> Here's a snippit from the broker logs: 
> https://gist.github.com/brettrann/4bb8041e884a299b7b0b12645a04492d
> I've redacted some group names because I'd need to check with the teams about 
> making them public. Let me know what more is needed and I can gather it. This 
> is a test cluster and the problem appears reproducible easily enough. Happy 
> to gather as much info as needed.
> Our config is: 
> {noformat}
> broker.id=2
> delete.topic.enable=true
> auto.create.topics.enable=false
> auto.leader.rebalance.enable=true
> inter.broker.protocol.version=0.11.0.1
> log.message.format.version=0.11.0.1
> group.max.session.timeout.ms = 300000
> port=9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> replica.fetch.max.bytes=10485760
> log.dirs=/data/kafka/logs
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> log.retention.hours=168
> offsets.retention.minutes=10080
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> log.cleaner.enable=true
> zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181/kafka
> zookeeper.connection.timeout.ms=6000
> {noformat}
> This was also reported attached to the end of this ticket 
> https://issues.apache.org/jira/browse/KAFKA-6042 which is a broker lockup/FD 
> issue, but a new ticket was requested.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to