[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista updated KAFKA-8546:
------------------------------------
    Description: 
When a heavily used broker uses gzip compression on all topics, sometime you 
can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This is 
caused by memory leak caused by JDK-6293787 
([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
caused by JDK-4797189 
([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).

 

In summary, this is what happen:
 * Inflater class contains finalizer method.
 * Whenever a class with finalizer method is instantiated, a Finalizer object 
is created. 
 * GC finalizer thread is responsible to process all Finalizer objects.
 * If the rate of Finalizer object creation exceed the rate of GC finalizer 
thread ability to process it, Finalizer object number grows continuously, and 
eventually triggers full GC (because it is stored in Old Gen).

 

Following stack trace shows what happen when a process is frozen doing full GC:

 
{code:java}
kafka-request-handler-13  Runnable Thread ID: 79
  java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
  java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
  java.util.zip.InflaterInputStream.read(byte[], int, int) 
InflaterInputStream.java:152
  java.util.zip.GZIPInputStream.read(byte[], int, int) GZIPInputStream.java:117
  java.io.BufferedInputStream.fill() BufferedInputStream.java:246
  java.io.BufferedInputStream.read() BufferedInputStream.java:265
  java.io.DataInputStream.readByte() DataInputStream.java:265
  org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
ByteUtils.java:168
  org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, long, 
int, Long) DefaultRecord.java:292
  org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, int, 
Long) DefaultRecordBatch.java:264
  org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
DefaultRecordBatch.java:563
  org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
DefaultRecordBatch.java:532
  org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
DefaultRecordBatch.java:327
  scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
Wrappers.scala:54
  scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
IterableLike.scala:72
 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
  
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
 LogValidator.scala:267
  
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
 LogValidator.scala:259
  scala.collection.Iterator$class.foreach(Iterator, Function1) 
Iterator.scala:891
  scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
  scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
IterableLike.scala:72
  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
  
kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
 LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
TimestampType, long, int, boolean) LogValidator.scala:259
  kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
TimestampType, long, int, boolean) LogValidator.scala:70
  kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
LongRef, long) Log.scala:771
  kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
  kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
  kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
  kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
  kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
  kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
  kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
  kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
  kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
CoreUtils.scala:257
  kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) 
Partition.scala:647
  kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) 
ReplicaManager.scala:745
  kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) 
ReplicaManager.scala:733
  scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
TraversableLike.scala:234
  scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
TraversableLike.scala:234
  scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) 
HashMap.scala:130
  scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) 
HashMap.scala:130
  scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) 
HashTable.scala:236
  scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
  scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
  scala.collection.TraversableLike$class.map(TraversableLike, Function1, 
CanBuildFrom) TraversableLike.scala:234
  scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) 
Traversable.scala:104
  kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) 
ReplicaManager.scala:733
  kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, 
Function1, Option, Function1) ReplicaManager.scala:471
  kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) 
KafkaApis.scala:489
  kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
  kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
  java.lang.Thread.run() Thread.java:748
{code}
 
And the attached screenshot shows that the process heap is full of Finalizer 
objects. 

I think Kafka needs to manually call System#runFinalization method after 
closing GZip input stream to avoid this full GC. 

  was:
When a heavily used broker uses gzip compression on all topics, sometime you 
can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This is 
caused by memory leak caused by JDK-6293787 
([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
caused by JDK-4797189 
([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).

 

In summary, this is what happen:
 * Inflater class contains finalizer method.
 * Whenever a class with finalizer method is instantiated, a Finalizer object 
is created. 
 * GC finalizer thread is responsible to process all Finalizer objects.
 * If the rate of Finalizer object creation exceed the rate of GC finalizer 
thread ability to process it, Finalizer object number grows continuously, and 
eventually triggers full GC (because it is stored in Old Gen).

 

Following stack trace shows what happen when a process is frozen doing full GC:

 
{code:java}
kafka-request-handler-13  Runnable Thread ID: 79
  java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
  java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
  java.util.zip.InflaterInputStream.read(byte[], int, int) 
InflaterInputStream.java:152
  java.util.zip.GZIPInputStream.read(byte[], int, int) GZIPInputStream.java:117
  java.io.BufferedInputStream.fill() BufferedInputStream.java:246
  java.io.BufferedInputStream.read() BufferedInputStream.java:265
  java.io.DataInputStream.readByte() DataInputStream.java:265
  org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
ByteUtils.java:168
  org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, long, 
int, Long) DefaultRecord.java:292
  org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, int, 
Long) DefaultRecordBatch.java:264
  org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
DefaultRecordBatch.java:563
  org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
DefaultRecordBatch.java:532
  org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
DefaultRecordBatch.java:327
  scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
Wrappers.scala:54
  scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
IterableLike.scala:72
 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
  
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
 LogValidator.scala:267
  
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
 LogValidator.scala:259
  scala.collection.Iterator$class.foreach(Iterator, Function1) 
Iterator.scala:891
  scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
  scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
IterableLike.scala:72
  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
  
kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
 LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
TimestampType, long, int, boolean) LogValidator.scala:259
  kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
TimestampType, long, int, boolean) LogValidator.scala:70
  kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
LongRef, long) Log.scala:771
  kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
  kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
  kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
  kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
  kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
  kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
  kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
  kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
  kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
CoreUtils.scala:257
  kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) 
Partition.scala:647
  kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) 
ReplicaManager.scala:745
  kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) 
ReplicaManager.scala:733
  scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
TraversableLike.scala:234
  scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
TraversableLike.scala:234
  scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) 
HashMap.scala:130
  scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) 
HashMap.scala:130
  scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) 
HashTable.scala:236
  scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
  scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
  scala.collection.TraversableLike$class.map(TraversableLike, Function1, 
CanBuildFrom) TraversableLike.scala:234
  scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) 
Traversable.scala:104
  kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) 
ReplicaManager.scala:733
  kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, 
Function1, Option, Function1) ReplicaManager.scala:471
  kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) 
KafkaApis.scala:489
  kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
  kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
  java.lang.Thread.run() Thread.java:748
{code}
 

And following screenshot shows that the process heap is full of Finalizer 
objects. 

!Screen Shot 2019-05-30 at 1.27.25 pm.png!

I think Kafka needs to manually call System$runFinalization method after 
closing GZip input stream to avoid this full GC. 


> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-8546
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8546
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.1
>            Reporter: Badai Aqrandista
>            Priority: Major
>         Attachments: Screen Shot 2019-05-30 at 1.27.25 pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) 
> Partition.scala:647
>   kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) 
> ReplicaManager.scala:745
>   kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) 
> ReplicaManager.scala:733
>   scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
> TraversableLike.scala:234
>   scala.collection.TraversableLike$$anonfun$map$1.apply(Object) 
> TraversableLike.scala:234
>   scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) 
> HashMap.scala:130
>   scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) 
> HashMap.scala:130
>   scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) 
> HashTable.scala:236
>   scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40
>   scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130
>   scala.collection.TraversableLike$class.map(TraversableLike, Function1, 
> CanBuildFrom) TraversableLike.scala:234
>   scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) 
> Traversable.scala:104
>   kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) 
> ReplicaManager.scala:733
>   kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, 
> Map, Function1, Option, Function1) ReplicaManager.scala:471
>   kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) 
> KafkaApis.scala:489
>   kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113
>   kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69
>   java.lang.Thread.run() Thread.java:748
> {code}
>  
> And the attached screenshot shows that the process heap is full of Finalizer 
> objects. 
> I think Kafka needs to manually call System#runFinalization method after 
> closing GZip input stream to avoid this full GC. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to