Kyle Ambroff created KAFKA-5297:
-----------------------------------
Summary: Broker can take a long time to shut down if there are
many active log segments
Key: KAFKA-5297
URL: https://issues.apache.org/jira/browse/KAFKA-5297
Project: Kafka
Issue Type: Improvement
Reporter: Kyle Ambroff
Priority: Minor
Attachments: shutdown-flame-graph.png
After the changes for KIP-33 were merged, we started noticing that our cluster
restart times were quite a bit longer. In some cases it was taking four times
as long as expected to do a rolling restart of every broker in the cluster.
This meant that doing a deploy to one of our Kafka clusters went from taking
about 3 hours to more than 12 hours!
We looked into this and we have some data from a couple of runs with a sampling
profiler. It turns out that it isn't unusual for us to have a broker sit in
kafka.log.Log#close for up to 30 minutes if it has been running for several
weeks. There are just so many active log segments that it just takes a long
time to truncate all of the indexes.
I've attached a flame graph that was generated from 10 minutes of stack samples
collected during shutdown of a broker that took about 30 minutes total to shut
down cleanly.
* About 60% of the time was spent in kafka.log.AbstractIndex#resize, where
every index and timeindex file is truncated to the size of the number of
entries in that index.
* Another big chunk of time is spent reading the last entry from the index,
which is used to make any final updates to the timeindex file. This is
something that can be cached. For a broker that's been running for a long time
the bulk of these indexes are not likely to be in the page cache anymore. We
cache the largestTimestamp and offsetOfLargestTimestamp in LogSegment, so we
could add a cache for this as well.
Looking at these changes and considering KIP-33, it isn't surprising that the
broker shutdown time has increased so dramatically. The extra index plus the
extra reads have increased the amount of work performed by kafka.log.Log#close
by about 4x (in terms of system calls and potential page faults). Breaking down
what this function does:
# Read the max timestamp from the timeindex. Could lead to a disk seek.
# Read the max offset from the index. Could lead to a disk seek.
# Append the timestamp and offset of the most recently written message to the
timeindex if it hasn't been written there for some reason.
# Truncate the index file
## Get the position in the index of the last entry written
## If on Windows then unmap and close the index
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek()
system call.
## Close the newly reopenned and mapped index
# Same thing as #4 but for the timeindex.
## Get the position in the timeindex of the last entry written
## If on Windows then unmap and close the timeindex
## reopen
## truncate to the number of entries * entry size. (ftruncate() system call)
## mmap()
## Set the position back to where it was before the original. Leads to lseek()
system call.
## Close the newly reopenned and mapped timeindex
# Finalize the log segment
## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for
that log segment.
## Truncate the log segment if it doesn't have enough messages written to fill
up the whole thing. Potentially leads to a ftruncate() system call.
## Set the position to the end of the segment after truncation. Leads to a
lseek() system call.
## Close and unmap the channel.
Looking in to the current implementation of kafka.log.AbstractIndex#resize, it
appears to do quite a bit of extra work to avoid keeping an instance of
RandomAccessFile around. It has to reopen the file, truncate, mmap(),
potentially perform an additional disk seek, all before imediately closing the
file.
You wouldn't think this would amount to much, but I put together a benchmark
using jmh to measure the difference between the current code and a new
implementation that didn't have to recreate the page mapping during resize(),
and the difference is pretty dramatic.
{noformat}
Result "currentImplementation":
2063.386 ±(99.9%) 81.758 ops/s [Average]
(min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
Result "optimizedImplementation":
3497.354 ±(99.9%) 31.575 ops/s [Average]
(min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)
# Run complete. Total time: 00:03:37
Benchmark Mode Cnt Score Error
Units
LogSegmentBenchmark.currentImplementation thrpt 60 2063.386 ± 81.758
ops/s
LogSegmentBenchmark.optimizedImplementation thrpt 60 3497.354 ± 31.575
ops/s
{noformat}
I ran this benchmark on a Linux workstation. It just measures the throughput of
Log#close after 20 segments have been created. Not having to reopen the file
amounts to a 70% increase in throughput.
I think there are two totally valid approaches to making this better:
* Premptively truncate index files when log rotation happens. Once a log is
rotated, jobs could be added to an ExecutorService which truncates indexes so
that they don't all have to be truncated on shutdown. The new shutdown code
would enqueue all remaining active indexes and then drain the queue.
* Alternatively we could just add a RandomAccessFile instance variable to
AbstractIndex so that it doesn't have to recreate the page mapping on resize().
This means an extra file handle for each segment but that doesn't seem like a
big deal to me.
No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)