[ https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Badai Aqrandista updated KAFKA-8546: ------------------------------------ Description: When a heavily used broker uses gzip compression on all topics, sometime you can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This is caused by memory leak caused by JDK-6293787 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is caused by JDK-4797189 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]). In summary, this is what happen: * Inflater class contains finalizer method. * Whenever a class with finalizer method is instantiated, a Finalizer object is created. * GC finalizer thread is responsible to process all Finalizer objects. * If the rate of Finalizer object creation exceed the rate of GC finalizer thread ability to process it, Finalizer object number grows continuously, and eventually triggers full GC (because it is stored in Old Gen). Following stack trace shows what happen when a process is frozen doing full GC: {code:java} kafka-request-handler-13 Runnable Thread ID: 79 java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259 java.util.zip.InflaterInputStream.read(byte[], int, int) InflaterInputStream.java:152 java.util.zip.GZIPInputStream.read(byte[], int, int) GZIPInputStream.java:117 java.io.BufferedInputStream.fill() BufferedInputStream.java:246 java.io.BufferedInputStream.read() BufferedInputStream.java:265 java.io.DataInputStream.readByte() DataInputStream.java:265 org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) ByteUtils.java:168 org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, long, int, Long) DefaultRecord.java:292 org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, int, Long) DefaultRecordBatch.java:264 org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:563 org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:532 org.apache.kafka.common.record.DefaultRecordBatch.iterator() DefaultRecordBatch.java:327 scala.collection.convert.Wrappers$JIterableWrapper.iterator() Wrappers.scala:54 scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch) LogValidator.scala:267 kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object) LogValidator.scala:259 scala.collection.Iterator$class.foreach(Iterator, Function1) Iterator.scala:891 scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334 scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:259 kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:70 kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, LongRef, long) Log.scala:771 kafka.log.Log$$anonfun$append$2.apply() Log.scala:770 kafka.log.Log$$anonfun$append$2.apply() Log.scala:752 kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842 kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752 kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722 kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660 kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648 kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251 kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257 kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:647 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) ReplicaManager.scala:745 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) ReplicaManager.scala:733 scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234 scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) HashMap.scala:130 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) HashMap.scala:130 scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) HashTable.scala:236 scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40 scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130 scala.collection.TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:234 scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104 kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:733 kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:471 kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:489 kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113 kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69 java.lang.Thread.run() Thread.java:748 {code} And the attached screenshot shows that the process heap is full of Finalizer objects. I think Kafka needs to manually call System#runFinalization method after closing GZip input stream to avoid this full GC. was: When a heavily used broker uses gzip compression on all topics, sometime you can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This is caused by memory leak caused by JDK-6293787 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is caused by JDK-4797189 ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]). In summary, this is what happen: * Inflater class contains finalizer method. * Whenever a class with finalizer method is instantiated, a Finalizer object is created. * GC finalizer thread is responsible to process all Finalizer objects. * If the rate of Finalizer object creation exceed the rate of GC finalizer thread ability to process it, Finalizer object number grows continuously, and eventually triggers full GC (because it is stored in Old Gen). Following stack trace shows what happen when a process is frozen doing full GC: {code:java} kafka-request-handler-13 Runnable Thread ID: 79 java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259 java.util.zip.InflaterInputStream.read(byte[], int, int) InflaterInputStream.java:152 java.util.zip.GZIPInputStream.read(byte[], int, int) GZIPInputStream.java:117 java.io.BufferedInputStream.fill() BufferedInputStream.java:246 java.io.BufferedInputStream.read() BufferedInputStream.java:265 java.io.DataInputStream.readByte() DataInputStream.java:265 org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) ByteUtils.java:168 org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, long, int, Long) DefaultRecord.java:292 org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, int, Long) DefaultRecordBatch.java:264 org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:563 org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() DefaultRecordBatch.java:532 org.apache.kafka.common.record.DefaultRecordBatch.iterator() DefaultRecordBatch.java:327 scala.collection.convert.Wrappers$JIterableWrapper.iterator() Wrappers.scala:54 scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch) LogValidator.scala:267 kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object) LogValidator.scala:259 scala.collection.Iterator$class.foreach(Iterator, Function1) Iterator.scala:891 scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334 scala.collection.IterableLike$class.foreach(IterableLike, Function1) IterableLike.scala:72 scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:259 kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, TimestampType, long, int, boolean) LogValidator.scala:70 kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, LongRef, long) Log.scala:771 kafka.log.Log$$anonfun$append$2.apply() Log.scala:770 kafka.log.Log$$anonfun$append$2.apply() Log.scala:752 kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842 kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752 kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722 kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660 kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648 kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251 kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) CoreUtils.scala:257 kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) Partition.scala:647 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) ReplicaManager.scala:745 kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) ReplicaManager.scala:733 scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234 scala.collection.TraversableLike$$anonfun$map$1.apply(Object) TraversableLike.scala:234 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) HashMap.scala:130 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) HashMap.scala:130 scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) HashTable.scala:236 scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40 scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130 scala.collection.TraversableLike$class.map(TraversableLike, Function1, CanBuildFrom) TraversableLike.scala:234 scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) Traversable.scala:104 kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) ReplicaManager.scala:733 kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, Map, Function1, Option, Function1) ReplicaManager.scala:471 kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) KafkaApis.scala:489 kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113 kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69 java.lang.Thread.run() Thread.java:748 {code} And following screenshot shows that the process heap is full of Finalizer objects. !Screen Shot 2019-05-30 at 1.27.25 pm.png! I think Kafka needs to manually call System$runFinalization method after closing GZip input stream to avoid this full GC. > Call System#runFinalization to avoid memory leak caused by JDK-6293787 > ---------------------------------------------------------------------- > > Key: KAFKA-8546 > URL: https://issues.apache.org/jira/browse/KAFKA-8546 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.0.1 > Reporter: Badai Aqrandista > Priority: Major > Attachments: Screen Shot 2019-05-30 at 1.27.25 pm.png > > > When a heavily used broker uses gzip compression on all topics, sometime you > can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This > is caused by memory leak caused by JDK-6293787 > ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is > caused by JDK-4797189 > ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]). > > In summary, this is what happen: > * Inflater class contains finalizer method. > * Whenever a class with finalizer method is instantiated, a Finalizer object > is created. > * GC finalizer thread is responsible to process all Finalizer objects. > * If the rate of Finalizer object creation exceed the rate of GC finalizer > thread ability to process it, Finalizer object number grows continuously, and > eventually triggers full GC (because it is stored in Old Gen). > > Following stack trace shows what happen when a process is frozen doing full > GC: > > {code:java} > kafka-request-handler-13 Runnable Thread ID: 79 > java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java > java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259 > java.util.zip.InflaterInputStream.read(byte[], int, int) > InflaterInputStream.java:152 > java.util.zip.GZIPInputStream.read(byte[], int, int) > GZIPInputStream.java:117 > java.io.BufferedInputStream.fill() BufferedInputStream.java:246 > java.io.BufferedInputStream.read() BufferedInputStream.java:265 > java.io.DataInputStream.readByte() DataInputStream.java:265 > org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) > ByteUtils.java:168 > org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, > long, int, Long) DefaultRecord.java:292 > org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, > int, Long) DefaultRecordBatch.java:264 > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() > DefaultRecordBatch.java:563 > org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() > DefaultRecordBatch.java:532 > org.apache.kafka.common.record.DefaultRecordBatch.iterator() > DefaultRecordBatch.java:327 > scala.collection.convert.Wrappers$JIterableWrapper.iterator() > Wrappers.scala:54 > scala.collection.IterableLike$class.foreach(IterableLike, Function1) > IterableLike.scala:72 > scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 > > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch) > LogValidator.scala:267 > > kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object) > LogValidator.scala:259 > scala.collection.Iterator$class.foreach(Iterator, Function1) > Iterator.scala:891 > scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334 > scala.collection.IterableLike$class.foreach(IterableLike, Function1) > IterableLike.scala:72 > scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54 > > kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords, > LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, > TimestampType, long, int, boolean) LogValidator.scala:259 > kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, > LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, > TimestampType, long, int, boolean) LogValidator.scala:70 > kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, > LongRef, long) Log.scala:771 > kafka.log.Log$$anonfun$append$2.apply() Log.scala:770 > kafka.log.Log$$anonfun$append$2.apply() Log.scala:752 > kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842 > kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752 > kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722 > kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660 > kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648 > kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251 > kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) > CoreUtils.scala:257 > kafka.cluster.Partition.appendRecordsToLeader(MemoryRecords, boolean, int) > Partition.scala:647 > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Tuple2) > ReplicaManager.scala:745 > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(Object) > ReplicaManager.scala:733 > scala.collection.TraversableLike$$anonfun$map$1.apply(Object) > TraversableLike.scala:234 > scala.collection.TraversableLike$$anonfun$map$1.apply(Object) > TraversableLike.scala:234 > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(DefaultEntry) > HashMap.scala:130 > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(Object) > HashMap.scala:130 > scala.collection.mutable.HashTable$class.foreachEntry(HashTable, Function1) > HashTable.scala:236 > scala.collection.mutable.HashMap.foreachEntry(Function1) HashMap.scala:40 > scala.collection.mutable.HashMap.foreach(Function1) HashMap.scala:130 > scala.collection.TraversableLike$class.map(TraversableLike, Function1, > CanBuildFrom) TraversableLike.scala:234 > scala.collection.AbstractTraversable.map(Function1, CanBuildFrom) > Traversable.scala:104 > kafka.server.ReplicaManager.appendToLocalLog(boolean, boolean, Map, short) > ReplicaManager.scala:733 > kafka.server.ReplicaManager.appendRecords(long, short, boolean, boolean, > Map, Function1, Option, Function1) ReplicaManager.scala:471 > kafka.server.KafkaApis.handleProduceRequest(RequestChannel$Request) > KafkaApis.scala:489 > kafka.server.KafkaApis.handle(RequestChannel$Request) KafkaApis.scala:113 > kafka.server.KafkaRequestHandler.run() KafkaRequestHandler.scala:69 > java.lang.Thread.run() Thread.java:748 > {code} > > And the attached screenshot shows that the process heap is full of Finalizer > objects. > I think Kafka needs to manually call System#runFinalization method after > closing GZip input stream to avoid this full GC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)