[
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kyle Ambroff updated KAFKA-5297:
--------------------------------
Attachment: LogSegmentBenchmark.java
> Broker can take a long time to shut down if there are many active log segments
> ------------------------------------------------------------------------------
>
> Key: KAFKA-5297
> URL: https://issues.apache.org/jira/browse/KAFKA-5297
> Project: Kafka
> Issue Type: Improvement
> Reporter: Kyle Ambroff
> Priority: Minor
> Attachments: LogSegmentBenchmark.java, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our
> cluster restart times were quite a bit longer. In some cases it was taking
> four times as long as expected to do a rolling restart of every broker in the
> cluster. This meant that doing a deploy to one of our Kafka clusters went
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a
> sampling profiler. It turns out that it isn't unusual for us to have a broker
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for
> several weeks. There are just so many active log segments that it just takes
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack
> samples collected during shutdown of a broker that took about 30 minutes
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where
> every index and timeindex file is truncated to the size of the number of
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index,
> which is used to make any final updates to the timeindex file. This is
> something that can be cached. For a broker that's been running for a long
> time the bulk of these indexes are not likely to be in the page cache
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the
> broker shutdown time has increased so dramatically. The extra index plus the
> extra reads have increased the amount of work performed by
> kafka.log.Log#close by about 4x (in terms of system calls and potential page
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize,
> it appears to do quite a bit of extra work to avoid keeping an instance of
> RandomAccessFile around. It has to reopen the file, truncate, mmap(),
> potentially perform an additional disk seek, all before imediately closing
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark
> using jmh to measure the difference between the current code and a new
> implementation that didn't have to recreate the page mapping during resize(),
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
> 2063.386 ±(99.9%) 81.758 ops/s [Average]
> (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
> CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
> 3497.354 ±(99.9%) 31.575 ops/s [Average]
> (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
> CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)
> # Run complete. Total time: 00:03:37
> Benchmark Mode Cnt Score Error
> Units
> LogSegmentBenchmark.currentImplementation thrpt 60 2063.386 ± 81.758
> ops/s
> LogSegmentBenchmark.optimizedImplementation thrpt 60 3497.354 ± 31.575
> ops/s
> {noformat}
> I ran this benchmark on a Linux workstation. It just measures the throughput
> of Log#close after 20 segments have been created. Not having to reopen the
> file amounts to a 70% increase in throughput.
> I think there are two totally valid approaches to making this better:
> * Premptively truncate index files when log rotation happens. Once a log is
> rotated, jobs could be added to an ExecutorService which truncates indexes so
> that they don't all have to be truncated on shutdown. The new shutdown code
> would enqueue all remaining active indexes and then drain the queue.
> * Alternatively we could just add a RandomAccessFile instance variable to
> AbstractIndex so that it doesn't have to recreate the page mapping on
> resize(). This means an extra file handle for each segment but that doesn't
> seem like a big deal to me.
> No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)