[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8546:
----------------------------------

    Assignee: Lee Dongjin

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-8546
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8546
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 2.0.1
>            Reporter: Badai Aqrandista
>            Assignee: Lee Dongjin
>            Priority: Minor
>         Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) 
> Partition.scala:647
>   kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) 
> ReplicaManager.scala:745
>   kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) 
> ReplicaManager.scala:733
>   scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
> TraversableLike.scala:234
>   scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
> TraversableLike.scala:234
>   scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) 
> HashMap.scala:130
>   scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) 
> HashMap.scala:130
>   scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) 
> HashTable.scala:236
>   scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
>   scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
>   scala.collection.TraversableLike$class.map(TraversableLike, Function1, 
> CanBuildFrom) TraversableLike.scala:234
>   scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) 
> Traversable.scala:104
>   kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) 
> ReplicaManager.scala:733
>   kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, 
> Map, Function1, Option, Function1) ReplicaManager.scala:471
>   kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) 
> KafkaApis.scala:489
>   kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
>   kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
>   java.lang.Thread.run() Thread.java:748
> {code}
>  
> And the attached screenshot shows that the process heap is full of Finalizer 
> objects. 
> I think Kafka needs to manually call System#runFinalization method after 
> closing GZip input stream to avoid this full GC. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to