This is an automated email from the ASF dual-hosted git repository.

sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a61594d  KAFKA-6432: Make index lookup more cache friendly (#5346)
a61594d is described below

commit a61594dee143ba089300546d1994b875a16ba521
Author: ying-zheng <zheng.y...@rocketmail.com>
AuthorDate: Fri Jul 27 14:00:19 2018 -0700

    KAFKA-6432: Make index lookup more cache friendly (#5346)
    
     KAFKA-6432: Make index lookup more cache friendly
    
    For each topic-partition, Kafka broker maintains two indices: one for 
message offset, one for message timestamp. By default, a new index entry is 
appended to each index for every 4KB messages. The lookup of the indices is a 
simple binary search. The indices are mmaped files, and cached by Linux page 
cache.
    
    Both consumer fetch and follower fetch have to do an offset lookup, before 
accessing the actual message data. The simple binary search algorithm used for 
looking up the index is not cache friendly, and may cause page faults even on 
high QPS topic-partitions.
    
    In a normal Kafka broker, all the follower fetch requests, and most 
consumer fetch requests should only look up the last few entries of the index. 
We can make the index lookup more cache friendly, by searching in the last one 
or two pages of the index first.
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Guozhang Wang 
<wangg...@gmail.com>, Ted Yu <yuzhih...@gmail.com>,  Ismael Juma 
<git...@juma.me.uk>, Sriharsha Chintalapani <srihar...@apache.org>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala | 101 ++++++++++++++++++----
 1 file changed, 84 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index e653802..ec9d55f 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -44,9 +44,67 @@ abstract class AbstractIndex[K, V](@volatile var file: File, 
val baseOffset: Lon
   // Length of the index file
   @volatile
   private var _length: Long = _
-
   protected def entrySize: Int
 
+  /*
+   Kafka mmaps index files into memory, and all the read / write operations of 
the index is through OS page cache. This
+   avoids blocked disk I/O in most cases.
+
+   To the extent of our knowledge, all the modern operating systems use LRU 
policy or its variants to manage page
+   cache. Kafka always appends to the end of the index file, and almost all 
the index lookups (typically from in-sync
+   followers or consumers) are very close to the end of the index. So, the LRU 
cache replacement policy should work very
+   well with Kafka's index access pattern.
+
+   However, when looking up index, the standard binary search algorithm is not 
cache friendly, and can cause unnecessary
+   page faults (the thread is blocked to wait for reading some index entries 
from hard disk, as those entries are not
+   cached in the page cache).
+
+   For example, in an index with 13 pages, to lookup an entry in the last page 
(page #12), the standard binary search
+   algorithm will read index entries in page #0, 6, 9, 11, and 12.
+   page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
+   steps:       |1| | | | | |3| | |4|  |5 |2/6|
+   In each page, there are hundreds log entries, corresponding to hundreds to 
thousands of kafka messages. When the
+   index gradually growing from the 1st entry in page #12 to the last entry in 
page #12, all the write (append)
+   operations are in page #12, and all the in-sync follower / consumer lookups 
read page #0,6,9,11,12. As these pages
+   are always used in each in-sync lookup, we can assume these pages are 
fairly recently used, and are very likely to be
+   in the page cache. When the index grows to page #13, the pages needed in a 
in-sync lookup change to #0, 7, 10, 12,
+   and 13:
+   page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
+   steps:       |1| | | | | | |3| | | 4|5 | 6|2/7|
+   Page #7 and page #10 have not been used for a very long time. They are much 
less likely to be in the page cache, than
+   the other pages. The 1st lookup, after the 1st index entry in page #13 is 
appended, is likely to have to read page #7
+   and page #10 from disk (page fault), which can take up to more than a 
second. In our test, this can cause the
+   at-least-once produce latency to jump to about 1 second from a few ms.
+
+   Here, we use a more cache-friendly lookup algorithm:
+   if (target > indexEntry[end - N]) // if the target is in the last N entries 
of the index
+      binarySearch(end - N, end)
+   else
+      binarySearch(begin, end - N)
+
+   If possible, we only look up in the last N entries of the index. By 
choosing a proper constant N, all the in-sync
+   lookups should go to the 1st branch. We call the last N entries the "warm" 
section. As we frequently look up in this
+   relatively small section, the pages containing this section are more likely 
to be in the page cache.
+
+   We set N (_warmEntries) to 8192, because
+   1. This number is small enough to guarantee all the pages of the "warm" 
section is touched in every warm-section
+      lookup. So that, the entire warm section is really "warm".
+      When doing warm-section lookup, following 3 entries are always touched: 
indexEntry(end), indexEntry(end-N),
+      and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section 
pages (3 or fewer) are touched, when we
+      touch those 3 entries. As of 2018, 4096 is the smallest page size for 
all the processors (x86-32, x86-64, MIPS,
+      SPARC, Power, ARM etc.).
+   2. This number is large enough to guarantee most of the in-sync lookups are 
in the warm-section. With default Kafka
+      settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB 
(time index) log messages.
+
+   We can't set make N (_warmEntries) to be larger than 8192, as there is no 
simple way to guarantee all the "warm"
+   section pages are really warm (touched in every lookup) on a typical 
4KB-page host.
+
+   In there future, we may use a backend thread to periodically touch the 
entire warm section. So that, we can
+   1) support larger warm section
+   2) make sure the warm section of low QPS topic-partitions are really warm.
+ */
+  protected def _warmEntries: Int = 8192 / entrySize
+
   protected val lock = new ReentrantLock
 
   @volatile
@@ -311,26 +369,35 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
     if(_entries == 0)
       return (-1, -1)
 
+    def binarySearch(begin: Int, end: Int) : (Int, Int) = {
+      // binary search for the entry
+      var lo = begin
+      var hi = end
+      while(lo < hi) {
+        val mid = ceil(hi/2.0 + lo/2.0).toInt
+        val found = parseEntry(idx, mid)
+        val compareResult = compareIndexEntry(found, target, searchEntity)
+        if(compareResult > 0)
+          hi = mid - 1
+        else if(compareResult < 0)
+          lo = mid
+        else
+          return (mid, mid)
+      }
+      (lo, if (lo == _entries - 1) -1 else lo + 1)
+    }
+
+    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
+    // check if the target offset is in the warm section of the index
+    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) 
< 0) {
+      return binarySearch(firstHotEntry, _entries - 1)
+    }
+
     // check if the target offset is smaller than the least offset
     if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
       return (-1, 0)
 
-    // binary search for the entry
-    var lo = 0
-    var hi = _entries - 1
-    while(lo < hi) {
-      val mid = ceil(hi/2.0 + lo/2.0).toInt
-      val found = parseEntry(idx, mid)
-      val compareResult = compareIndexEntry(found, target, searchEntity)
-      if(compareResult > 0)
-        hi = mid - 1
-      else if(compareResult < 0)
-        lo = mid
-      else
-        return (mid, mid)
-    }
-
-    (lo, if (lo == _entries - 1) -1 else lo + 1)
+    return binarySearch(0, firstHotEntry)
   }
 
   private def compareIndexEntry(indexEntry: IndexEntry, target: Long, 
searchEntity: IndexSearchEntity): Int = {

Reply via email to