[ 
https://issues.apache.org/jira/browse/KAFKA-14213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-14213:
-------------------------------
    Description: 
We found that the StopReplica request's local processing time may be quite long 
due to the reasons explained below. The impact of a time-consuming StopReplica 
request is that all subsequent requests from the controller are blocked, 
causing slow convergence on the metadata plane.

 

The long local processing time is because the 
control-plane-kafka-request-handler thread is blocked on a lock to abort 
logCleaning with the following stack trace:

 
{code:java}
"control-plane-kafka-request-handler-0"
   java.lang.Thread.State: WAITING
        at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method)
        at 
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
        at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
        at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
        at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
        at 
java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
        at 
kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266)
        at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205)
        at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code}
 

 

In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
figuring out the filthiest compacted log, which can be a very time consuming 
task. Our periodic thread dumps captured many snapshots with the 
kafka-log-cleaner-thread in either of the following 2 stack traces:

 

 
{code:java}
"kafka-log-cleaner-thread-0"
   java.lang.Thread.State: RUNNABLE
        at kafka.log.TimeIndex.<init>(TimeIndex.scala:57)
        at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109)
        at kafka.log.LazyIndex$$$Lambda$1871/0x0000000800f4d840.apply(Unknown 
Source)
        at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63)
        at kafka.log.LazyIndex.get(LazyIndex.scala:60)
        at kafka.log.LogSegment.timeIndex(LogSegment.scala:66)
        at kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107)
        at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113)
        at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640)
        at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
        at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
        at 
kafka.log.LogCleanerManager$$$Lambda$2215/0x000000080104d040.apply(Unknown 
Source)
        at scala.collection.Iterator.find(Iterator.scala:993)
        at scala.collection.Iterator.find$(Iterator.scala:990)
        at scala.collection.AbstractIterator.find(Iterator.scala:1429)
        at scala.collection.IterableLike.find(IterableLike.scala:81)
        at scala.collection.IterableLike.find$(IterableLike.scala:80)
        at scala.collection.AbstractIterable.find(Iterable.scala:56)
        at 
kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616)
        at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
        at 
kafka.log.LogCleanerManager$$Lambda$2212/0x000000080104f040.apply(Unknown 
Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown 
Source)
        at scala.collection.immutable.List.foreach(List.scala:392) {code}
 
{code:java}
"kafka-log-cleaner-thread-0"
   java.lang.Thread.State: RUNNABLE
        at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method)
        at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
        at 
java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274)
        at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245)
        at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
        at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796)
        at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114)
        at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087)
        at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
        at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
        at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
        at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
        at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
        at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
        at kafka.log.LogSegment.loadFirstBatchTimestamp(LogSegment.scala:516)
        at kafka.log.LogSegment.getFirstBatchTimestamp(LogSegment.scala:543)
        at 
kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1(Log.scala:2201)
        at 
kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1$adapted(Log.scala:2200)
        at kafka.log.Log$$Lambda$2223/0x000000080105a040.apply(Unknown Source)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown 
Source)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at kafka.log.Log.getFirstBatchTimestampForSegments(Log.scala:2199)
        at 
kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:552)
        at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:190)
 {code}
 

Upon inspection of the grabFilthiestCompactedLog method, it seems the most time 
consuming part can be performed without holding the lock.

  was:
We found that the StopReplica request's local processing time may be quite long 
due to the reasons explained below. The impact of a time-consuming StopReplica 
request is that all subsequent requests from the controller are blocked, 
causing slow convergence on the metadata plane.

 

The long local processing time is because the 
control-plane-kafka-request-handler thread is blocked on a lock to abort 
logCleaning with the following stack trace:

 
{code:java}
"control-plane-kafka-request-handler-0"
java.lang.Thread.State: WAITING at 
java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at 
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
 at 
java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
 at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at 
kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at 
kafka.log.LogManager.asyncDelete(LogManager.scala:1039)
{code}
 

 

In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
figuring out the filthiest compacted log, which can be a very time consuming 
task. Our periodic thread dumps captured many snapshots with the 
kafka-log-cleaner-thread in either of the following 2 stack traces:

 

 
{code:java}
"kafka-log-cleaner-thread-0"
java.lang.Thread.State: RUNNABLE at 
kafka.log.TimeIndex.<init>(TimeIndex.scala:57) at 
kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at 
kafka.log.LazyIndex$$$Lambda$1871/0x0000000800f4d840.apply(Unknown Source) at 
kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at 
kafka.log.LazyIndex.get(LazyIndex.scala:60) at 
kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at 
kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at 
kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at 
kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
 at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
 at kafka.log.LogCleanerManager$$$Lambda$2215/0x000000080104d040.apply(Unknown 
Source) at scala.collection.Iterator.find(Iterator.scala:993) at 
scala.collection.Iterator.find$(Iterator.scala:990) at 
scala.collection.AbstractIterator.find(Iterator.scala:1429) at 
scala.collection.IterableLike.find(IterableLike.scala:81) at 
scala.collection.IterableLike.find$(IterableLike.scala:80) at 
scala.collection.AbstractIterable.find(Iterable.scala:56) at 
kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
 at kafka.log.LogCleanerManager$$Lambda$2212/0x000000080104f040.apply(Unknown 
Source) at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown 
Source) at scala.collection.immutable.List.foreach(List.scala:392)
{code}
 

 

 
{code:java}
"kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
 at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) 
at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
 at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) 
at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at 
org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
 at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
 at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
 at 
org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
 at kafka.log.LogSegment.loadFirstBatchTimestamp(LogSegment.scala:516) at 
kafka.log.LogSegment.getFirstBatchTimestamp(LogSegment.scala:543) at 
kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1(Log.scala:2201) at 
kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1$adapted(Log.scala:2200)
 at kafka.log.Log$$Lambda$2223/0x000000080105a040.apply(Unknown Source) at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown 
Source) at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
kafka.log.Log.getFirstBatchTimestampForSegments(Log.scala:2199) at 
kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:552) at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:190)
 
{code}
 

Upon inspection of the grabFilthiestCompactedLog method, it seems the most time 
consuming part can be performed without holding the lock.


> Reduce lock contention between control-plane-kafka-request-handler and 
> kafka-log-cleaner-thread
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14213
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14213
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Priority: Major
>
> We found that the StopReplica request's local processing time may be quite 
> long due to the reasons explained below. The impact of a time-consuming 
> StopReplica request is that all subsequent requests from the controller are 
> blocked, causing slow convergence on the metadata plane.
>  
> The long local processing time is because the 
> control-plane-kafka-request-handler thread is blocked on a lock to abort 
> logCleaning with the following stack trace:
>  
> {code:java}
> "control-plane-kafka-request-handler-0"
>    java.lang.Thread.State: WAITING
>         at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method)
>         at 
> java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
>         at 
> java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
>         at 
> java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
>         at 
> java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
>         at 
> java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
>         at 
> kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266)
>         at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205)
>         at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code}
>  
>  
> In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
> figuring out the filthiest compacted log, which can be a very time consuming 
> task. Our periodic thread dumps captured many snapshots with the 
> kafka-log-cleaner-thread in either of the following 2 stack traces:
>  
>  
> {code:java}
> "kafka-log-cleaner-thread-0"
>    java.lang.Thread.State: RUNNABLE
>         at kafka.log.TimeIndex.<init>(TimeIndex.scala:57)
>         at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109)
>         at kafka.log.LazyIndex$$$Lambda$1871/0x0000000800f4d840.apply(Unknown 
> Source)
>         at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63)
>         at kafka.log.LazyIndex.get(LazyIndex.scala:60)
>         at kafka.log.LogSegment.timeIndex(LogSegment.scala:66)
>         at 
> kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107)
>         at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113)
>         at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640)
>         at 
> kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
>         at 
> kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
>         at 
> kafka.log.LogCleanerManager$$$Lambda$2215/0x000000080104d040.apply(Unknown 
> Source)
>         at scala.collection.Iterator.find(Iterator.scala:993)
>         at scala.collection.Iterator.find$(Iterator.scala:990)
>         at scala.collection.AbstractIterator.find(Iterator.scala:1429)
>         at scala.collection.IterableLike.find(IterableLike.scala:81)
>         at scala.collection.IterableLike.find$(IterableLike.scala:80)
>         at scala.collection.AbstractIterable.find(Iterable.scala:56)
>         at 
> kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616)
>         at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
>         at 
> kafka.log.LogCleanerManager$$Lambda$2212/0x000000080104f040.apply(Unknown 
> Source)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>         at 
> scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown 
> Source)
>         at scala.collection.immutable.List.foreach(List.scala:392) {code}
>  
> {code:java}
> "kafka-log-cleaner-thread-0"
>    java.lang.Thread.State: RUNNABLE
>         at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native 
> Method)
>         at 
> java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
>         at 
> java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274)
>         at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245)
>         at 
> java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
>         at 
> java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796)
>         at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114)
>         at 
> org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087)
>         at 
> org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
>         at 
> org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
>         at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
>         at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
>         at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
>         at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
>         at kafka.log.LogSegment.loadFirstBatchTimestamp(LogSegment.scala:516)
>         at kafka.log.LogSegment.getFirstBatchTimestamp(LogSegment.scala:543)
>         at 
> kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1(Log.scala:2201)
>         at 
> kafka.log.Log.$anonfun$getFirstBatchTimestampForSegments$1$adapted(Log.scala:2200)
>         at kafka.log.Log$$Lambda$2223/0x000000080105a040.apply(Unknown Source)
>         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>         at 
> scala.collection.TraversableLike$$Lambda$891/0x0000000800a89840.apply(Unknown 
> Source)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>         at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>         at kafka.log.Log.getFirstBatchTimestampForSegments(Log.scala:2199)
>         at 
> kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:552)
>         at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:190)
>  {code}
>  
> Upon inspection of the grabFilthiestCompactedLog method, it seems the most 
> time consuming part can be performed without holding the lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to