[ https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lee Dongjin reassigned KAFKA-8546: ---------------------------------- Assignee: Lee Dongjin > Call System#runFinalization to avoid memory leak caused by JDK-6293787 > ---------------------------------------------------------------------- > > Key: KAFKA-8546 > URL: https://issues.apache.org/jira/browse/KAFKA-8546 > Project: Kafka > Issue Type: Improvement > Affects Versions: 2.0.1 > Reporter: Badai Aqrandista > Assignee: Lee Dongjin > Priority: Minor > Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 > pm.png > > > When a heavily used broker uses gzip compression on all topics, sometime you > can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This > is caused by memory leak caused by JDK-6293787 > ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is > caused by JDK-4797189 > ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]). > > In summary, this is what happen: > * Inflater class contains finalizer method. > * Whenever a class with finalizer method is instantiated, a Finalizer object > is created. > * GC finalizer thread is responsible to process all Finalizer objects. > * If the rate of Finalizer object creation exceed the rate of GC finalizer > thread ability to process it, Finalizer object number grows continuously, and > eventually triggers full GC (because it is stored in Old Gen). > > Following stack trace shows what happen when a process is frozen doing full > GC: > > {code:java} > kafka-request-handler-13 Runnable Thread ID: 79 > java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java > java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259 > java.util.zip.InflaterInputStream.read(byte[], int, int) > InflaterInputStream.java:152 > java.util.zip.GZIPInputStream.read(byte[], int, int) > GZIPInputStream.java:117 > java.io.BufferedInputStream.fill() BufferedInputStream.java:246 > java.io.BufferedInputStream.read() BufferedInputStream.java:265 > java.io.DataInputStream.readByte() DataInputStream.java:265 > org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) > ByteUtils.java:168 > org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, > long, int, Long) DefaultRecord.java:292 > org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, > int, Long) DefaultRecordBatch.java:264 > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() > DefaultRecordBatch.java:563 > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() > DefaultRecordBatch.java:532 > org.apache.kafka.common.record.DefaultRecordBatch.iterator() > DefaultRecordBatch.java:327 > scala.collection.convert.Wrappers$JIterableWrapper.iterator() > Wrappers.scala:54 > scala.collection.IterableLike$class.foreach(IterableLike, Function1) > IterableLike.scala:72 > scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 > > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch) > LogValidator.scala:267 > > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object) > LogValidator.scala:259 > scala.collection.Iterator$class.foreach(Iterator, Function1) > Iterator.scala:891 > scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334 > scala.collection.IterableLike$class.foreach(IterableLike, Function1) > IterableLike.scala:72 > scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 > > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords, > LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, > TimestampType, long, int, boolean) LogValidator.scala:259 > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, > LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, > TimestampType, long, int, boolean) LogValidator.scala:70 > kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, > LongRef, long) Log.scala:771 > kafka.log.Log$$anonfun$append$2.apply() Log.scala:770 > kafka.log.Log$$anonfun$append$2.apply() Log.scala:752 > kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842 > kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752 > kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722 > kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660 > kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648 > kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251 > kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) > CoreUtils.scala:257 > kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) > Partition.scala:647 > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) > ReplicaManager.scala:745 > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) > ReplicaManager.scala:733 > scala.collection.TraversableLike$$anonfun$map$1.apply(Object) > TraversableLike.scala:234 > scala.collection.TraversableLike$$anonfun$map$1.apply(Object) > TraversableLike.scala:234 > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) > HashMap.scala:130 > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) > HashMap.scala:130 > scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) > HashTable.scala:236 > scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40 > scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130 > scala.collection.TraversableLike$class.map(TraversableLike, Function1, > CanBuildFrom) TraversableLike.scala:234 > scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) > Traversable.scala:104 > kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) > ReplicaManager.scala:733 > kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, > Map, Function1, Option, Function1) ReplicaManager.scala:471 > kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) > KafkaApis.scala:489 > kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113 > kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69 > java.lang.Thread.run() Thread.java:748 > {code} > > And the attached screenshot shows that the process heap is full of Finalizer > objects. > I think Kafka needs to manually call System#runFinalization method after > closing GZip input stream to avoid this full GC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)