[ https://issues.apache.org/jira/browse/KAFKA-14213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucas Wang updated KAFKA-14213: ------------------------------- Description: We found that the StopReplica request's local processing time may be quite long due to the reasons explained below. The impact of a time-consuming StopReplica request is that all subsequent requests from the controller are blocked, causing slow convergence on the metadata plane. The long local processing time is because the control-plane-kafka-request-handler thread is blocked on a lock to abort logCleaning with the following stack trace: {code:java} "control-plane-kafka-request-handler-0" java.lang.Thread.State: WAITING at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240) at java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267) at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code} In the mean time, the kafka-log-cleaner-thread is holding the lock and busy figuring out the filthiest compacted log, which can be a very time consuming task. Our periodic thread dumps captured many snapshots with the kafka-log-cleaner-thread in either of the following 2 stack traces: {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at kafka.log.TimeIndex.<init>(TimeIndex.scala:57) at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at kafka.log.LazyIndex$$$Lambda$1871/0x0000000800f4d840.apply(Unknown Source) at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at kafka.log.LazyIndex.get(LazyIndex.scala:60) at kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager$$$Lambda$2215/0x000000080104d040.apply(Unknown Source) at scala.collection.Iterator.find(Iterator.scala:993) at scala.collection.Iterator.find$(Iterator.scala:990) at scala.collection.AbstractIterator.find(Iterator.scala:1429) at scala.collection.IterableLike.find(IterableLike.scala:81) at scala.collection.IterableLike.find$(IterableLike.scala:80) at scala.collection.AbstractIterable.find(Iterable.scala:56) at kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186) at kafka.log.LogCleanerManager$$Lambda$2212/0x000000080104f040.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:392) {code} {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54) at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) at kafka.log.LogSegment.loadFirstBatchTimestamp(LogSegment.scala:516) at kafka.log.LogSegment.getFirstBatchTimestamp(LogSegment.scala:543) at kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1(Log.scala:2201) at kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1$adapted(Log.scala:2200) at kafka.log.Log$$Lambda$2223/0x000000080105a040.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.log.Log.getFirstBatchTimestampForSegments(Log.scala:2199) at kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:552) at kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:190) {code} Upon inspection of the grabFilthiestCompactedLog method, it seems the most time consuming part can be performed without holding the lock. was: We found that the StopReplica request's local processing time may be quite long due to the reasons explained below. The impact of a time-consuming StopReplica request is that all subsequent requests from the controller are blocked, causing slow convergence on the metadata plane. The long local processing time is because the control-plane-kafka-request-handler thread is blocked on a lock to abort logCleaning with the following stack trace: {code:java} "control-plane-kafka-request-handler-0" java.lang.Thread.State: WAITING at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240) at java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267) at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code} In the mean time, the kafka-log-cleaner-thread is holding the lock and busy figuring out the filthiest compacted log, which can be a very time consuming task. Our periodic thread dumps captured many snapshots with the kafka-log-cleaner-thread in either of the following 2 stack traces: {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at kafka.log.TimeIndex.<init>(TimeIndex.scala:57) at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at kafka.log.LazyIndex$$$Lambda$1871/0x0000000800f4d840.apply(Unknown Source) at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at kafka.log.LazyIndex.get(LazyIndex.scala:60) at kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager$$$Lambda$2215/0x000000080104d040.apply(Unknown Source) at scala.collection.Iterator.find(Iterator.scala:993) at scala.collection.Iterator.find$(Iterator.scala:990) at scala.collection.AbstractIterator.find(Iterator.scala:1429) at scala.collection.IterableLike.find(IterableLike.scala:81) at scala.collection.IterableLike.find$(IterableLike.scala:80) at scala.collection.AbstractIterable.find(Iterable.scala:56) at kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186) at kafka.log.LogCleanerManager$$Lambda$2212/0x000000080104f040.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:392) {code} {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54) at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) at kafka.log.LogSegment.loadFirstBatchTimestamp(LogSegment.scala:516) at kafka.log.LogSegment.getFirstBatchTimestamp(LogSegment.scala:543) at kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1(Log.scala:2201) at kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1$adapted(Log.scala:2200) at kafka.log.Log$$Lambda$2223/0x000000080105a040.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown Source) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.log.Log.getFirstBatchTimestampForSegments(Log.scala:2199) at kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:552) at kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:190) {code} Upon inspection of the grabFilthiestCompactedLog method, it seems the most time consuming part can be performed without holding the lock. > Reduce lock contention between control-plane-kafka-request-handler and > kafka-log-cleaner-thread > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-14213 > URL: https://issues.apache.org/jira/browse/KAFKA-14213 > Project: Kafka > Issue Type: Improvement > Reporter: Lucas Wang > Priority: Major > > We found that the StopReplica request's local processing time may be quite > long due to the reasons explained below. The impact of a time-consuming > StopReplica request is that all subsequent requests from the controller are > blocked, causing slow convergence on the metadata plane. > > The long local processing time is because the > control-plane-kafka-request-handler thread is blocked on a lock to abort > logCleaning with the following stack trace: > > {code:java} > "control-plane-kafka-request-handler-0" > java.lang.Thread.State: WAITING > at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) > at > java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) > at > java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) > at > java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917) > at > java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240) > at > java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267) > at > kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) > at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) > at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code} > > > In the mean time, the kafka-log-cleaner-thread is holding the lock and busy > figuring out the filthiest compacted log, which can be a very time consuming > task. Our periodic thread dumps captured many snapshots with the > kafka-log-cleaner-thread in either of the following 2 stack traces: > > > {code:java} > "kafka-log-cleaner-thread-0" > java.lang.Thread.State: RUNNABLE > at kafka.log.TimeIndex.<init>(TimeIndex.scala:57) > at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) > at kafka.log.LazyIndex$$$Lambda$1871/0x0000000800f4d840.apply(Unknown > Source) > at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) > at kafka.log.LazyIndex.get(LazyIndex.scala:60) > at kafka.log.LogSegment.timeIndex(LogSegment.scala:66) > at > kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) > at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) > at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) > at > kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617) > at > kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616) > at > kafka.log.LogCleanerManager$$$Lambda$2215/0x000000080104d040.apply(Unknown > Source) > at scala.collection.Iterator.find(Iterator.scala:993) > at scala.collection.Iterator.find$(Iterator.scala:990) > at scala.collection.AbstractIterator.find(Iterator.scala:1429) > at scala.collection.IterableLike.find(IterableLike.scala:81) > at scala.collection.IterableLike.find$(IterableLike.scala:80) > at scala.collection.AbstractIterable.find(Iterable.scala:56) > at > kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186) > at > kafka.log.LogCleanerManager$$Lambda$2212/0x000000080104f040.apply(Unknown > Source) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at > scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown > Source) > at scala.collection.immutable.List.foreach(List.scala:392) {code} > > {code:java} > "kafka-log-cleaner-thread-0" > java.lang.Thread.State: RUNNABLE > at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native > Method) > at > java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54) > at > java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) > at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) > at > java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811) > at > java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) > at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) > at > org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) > at > org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69) > at > org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42) > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) > at > org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) > at > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > at > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > at kafka.log.LogSegment.loadFirstBatchTimestamp(LogSegment.scala:516) > at kafka.log.LogSegment.getFirstBatchTimestamp(LogSegment.scala:543) > at > kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1(Log.scala:2201) > at > kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1$adapted(Log.scala:2200) > at kafka.log.Log$$Lambda$2223/0x000000080105a040.apply(Unknown Source) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at > scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at kafka.log.Log.getFirstBatchTimestampForSegments(Log.scala:2199) > at > kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:552) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:190) > {code} > > Upon inspection of the grabFilthiestCompactedLog method, it seems the most > time consuming part can be performed without holding the lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)