[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16868842#comment-16868842
 ] 

ASF GitHub Bot commented on KAFKA-8546:
---------------------------------------

dongjinleekr commented on pull request #6976: KAFKA-8546: Call 
System#runFinalization to avoid memory leak caused by JDK-6293787
URL: https://github.com/apache/kafka/pull/6976
 
 
   This PR prevents the surge of `[Inflater, Deflater]` objects, by calling 
`System#runFinalization` on closing `GZip[Input, Output]Stream`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-8546
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8546
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 2.0.1
>            Reporter: Badai Aqrandista
>            Priority: Minor
>         Attachments: Screen Shot 2019-05-30 at 1.27.25 pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) 
> Partition.scala:647
>   kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) 
> ReplicaManager.scala:745
>   kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) 
> ReplicaManager.scala:733
>   scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
> TraversableLike.scala:234
>   scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
> TraversableLike.scala:234
>   scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) 
> HashMap.scala:130
>   scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) 
> HashMap.scala:130
>   scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) 
> HashTable.scala:236
>   scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
>   scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
>   scala.collection.TraversableLike$class.map(TraversableLike, Function1, 
> CanBuildFrom) TraversableLike.scala:234
>   scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) 
> Traversable.scala:104
>   kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) 
> ReplicaManager.scala:733
>   kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, 
> Map, Function1, Option, Function1) ReplicaManager.scala:471
>   kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) 
> KafkaApis.scala:489
>   kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
>   kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
>   java.lang.Thread.run() Thread.java:748
> {code}
>  
> And the attached screenshot shows that the process heap is full of Finalizer 
> objects. 
> I think Kafka needs to manually call System#runFinalization method after 
> closing GZip input stream to avoid this full GC. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to