[ https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ray Chiang updated KAFKA-5297: ------------------------------ Component/s: log > Broker can take a long time to shut down if there are many active log segments > ------------------------------------------------------------------------------ > > Key: KAFKA-5297 > URL: https://issues.apache.org/jira/browse/KAFKA-5297 > Project: Kafka > Issue Type: Bug > Components: log > Reporter: Kyle Ambroff-Kao > Priority: Minor > Labels: windows > Attachments: LogSegmentBenchmark.java, flame graph of broker during > shut down.png, shutdown-flame-graph.png > > > After the changes for KIP-33 were merged, we started noticing that our > cluster restart times were quite a bit longer. In some cases it was taking > four times as long as expected to do a rolling restart of every broker in the > cluster. This meant that doing a deploy to one of our Kafka clusters went > from taking about 3 hours to more than 12 hours! > We looked into this and we have some data from a couple of runs with a > sampling profiler. It turns out that it isn't unusual for us to have a broker > sit in kafka.log.Log#close for up to 30 minutes if it has been running for > several weeks. There are just so many active log segments that it just takes > a long time to truncate all of the indexes. > I've attached a flame graph that was generated from 10 minutes of stack > samples collected during shutdown of a broker that took about 30 minutes > total to shut down cleanly. > * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where > every index and timeindex file is truncated to the size of the number of > entries in that index. > * Another big chunk of time is spent reading the last entry from the index, > which is used to make any final updates to the timeindex file. This is > something that can be cached. For a broker that's been running for a long > time the bulk of these indexes are not likely to be in the page cache > anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in > LogSegment, so we could add a cache for this as well. > Looking at these changes and considering KIP-33, it isn't surprising that the > broker shutdown time has increased so dramatically. The extra index plus the > extra reads have increased the amount of work performed by > kafka.log.Log#close by about 4x (in terms of system calls and potential page > faults). Breaking down what this function does: > # Read the max timestamp from the timeindex. Could lead to a disk seek. > # Read the max offset from the index. Could lead to a disk seek. > # Append the timestamp and offset of the most recently written message to the > timeindex if it hasn't been written there for some reason. > # Truncate the index file > ## Get the position in the index of the last entry written > ## If on Windows then unmap and close the index > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped index > # Same thing as #4 but for the timeindex. > ## Get the position in the timeindex of the last entry written > ## If on Windows then unmap and close the timeindex > ## reopen > ## truncate to the number of entries * entry size. (ftruncate() system call) > ## mmap() > ## Set the position back to where it was before the original. Leads to > lseek() system call. > ## Close the newly reopenned and mapped timeindex > # Finalize the log segment > ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for > that log segment. > ## Truncate the log segment if it doesn't have enough messages written to > fill up the whole thing. Potentially leads to a ftruncate() system call. > ## Set the position to the end of the segment after truncation. Leads to a > lseek() system call. > ## Close and unmap the channel. > Looking in to the current implementation of kafka.log.AbstractIndex#resize, > it appears to do quite a bit of extra work to avoid keeping an instance of > RandomAccessFile around. It has to reopen the file, truncate, mmap(), > potentially perform an additional disk seek, all before imediately closing > the file. > You wouldn't think this would amount to much, but I put together a benchmark > using jmh to measure the difference between the current code and a new > implementation that didn't have to recreate the page mapping during resize(), > and the difference is pretty dramatic. > {noformat} > Result "currentImplementation": > 2063.386 ±(99.9%) 81.758 ops/s [Average] > (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863 > CI (99.9%): [1981.628, 2145.144] (assumes normal distribution) > Result "optimizedImplementation": > 3497.354 ±(99.9%) 31.575 ops/s [Average] > (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623 > CI (99.9%): [3465.778, 3528.929] (assumes normal distribution) > # Run complete. Total time: 00:03:37 > Benchmark Mode Cnt Score Error > Units > LogSegmentBenchmark.currentImplementation thrpt 60 2063.386 ± 81.758 > ops/s > LogSegmentBenchmark.optimizedImplementation thrpt 60 3497.354 ± 31.575 > ops/s > {noformat} > I ran this benchmark on a Linux workstation. It just measures the throughput > of Log#close after 20 segments have been created. Not having to reopen the > file amounts to a 70% increase in throughput. > I think there are two totally valid approaches to making this better: > * Premptively truncate index files when log rotation happens. Once a log is > rotated, jobs could be added to an ExecutorService which truncates indexes so > that they don't all have to be truncated on shutdown. The new shutdown code > would enqueue all remaining active indexes and then drain the queue. > * Alternatively we could just add a RandomAccessFile instance variable to > AbstractIndex so that it doesn't have to recreate the page mapping on > resize(). This means an extra file handle for each segment but that doesn't > seem like a big deal to me. > No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry. -- This message was sent by Atlassian JIRA (v7.6.3#76005)