This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new d33dc18  KAFKA-10706; Ensure leader epoch cache is cleaned after 
truncation to end offset (#9633)
d33dc18 is described below

commit d33dc1869ba09da822dd065e17c54bfb461f8c47
Author: Jason Gustafson <[email protected]>
AuthorDate: Sat Nov 21 09:25:54 2020 -0800

    KAFKA-10706; Ensure leader epoch cache is cleaned after truncation to end 
offset (#9633)
    
    This patch fixes a liveness bug which prevents follower truncation from 
completing after a leader election. If there are consecutive leader elections 
without writing any data entries, then the leader and follower may have 
conflicting epoch entries at the end of the log.
    
    The problem is the shortcut return in `Log.truncateTo` when the truncation 
offset is larger than or equal to the end offset, which prevents the 
conflicting entries from being resolved. Here we change this case to ensure 
`LeaderEpochFileCache.truncateFromEnd` is still called.
    
    Reviewers: Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/log/Log.scala          |  9 +++++++
 core/src/test/scala/unit/kafka/log/LogTest.scala | 31 ++++++++++++++++++++++++
 2 files changed, 40 insertions(+)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 36f4859..f316bb4 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2052,6 +2052,15 @@ class Log(@volatile var dir: File,
         throw new IllegalArgumentException(s"Cannot truncate partition 
$topicPartition to a negative offset (%d).".format(targetOffset))
       if (targetOffset >= logEndOffset) {
         info(s"Truncating to $targetOffset has no effect as the largest offset 
in the log is ${logEndOffset - 1}")
+
+        // Always truncate epoch cache since we may have a conflicting epoch 
entry at the
+        // end of the log from the leader. This could happen if this broker 
was a leader
+        // and inserted the first start offset entry, but then failed to 
append any entries
+        // before another leader was elected.
+        lock synchronized {
+          leaderEpochCache.foreach(_.truncateFromEnd(logEndOffset))
+        }
+
         false
       } else {
         info(s"Truncating to offset $targetOffset")
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 41718da..4a669bd 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -469,6 +469,37 @@ class LogTest {
     assertEquals(101L, log.logEndOffset)
   }
 
+  @Test
+  def testTruncateToEndOffsetClearsEpochCache(): Unit = {
+    val log = createLog(logDir, LogConfig())
+
+    // Seed some initial data in the log
+    val records = TestUtils.records(List(new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes)),
+      baseOffset = 27)
+    appendAsFollower(log, records, leaderEpoch = 19)
+    assertEquals(Some(19), log.leaderEpochCache.flatMap(_.latestEpoch))
+    assertEquals(29, log.logEndOffset)
+
+    def verifyTruncationClearsEpochCache(epoch: Int, truncationOffset: Long): 
Unit = {
+      // Simulate becoming a leader
+      log.maybeAssignEpochStartOffset(leaderEpoch = epoch, startOffset = 
log.logEndOffset)
+      assertEquals(Some(epoch), log.leaderEpochCache.flatMap(_.latestEpoch))
+      assertEquals(29, log.logEndOffset)
+
+      // Now we become the follower and truncate to an offset greater
+      // than or equal to the log end offset. The trivial epoch entry
+      // at the end of the log should be gone
+      log.truncateTo(truncationOffset)
+      assertEquals(Some(19), log.leaderEpochCache.flatMap(_.latestEpoch))
+      assertEquals(29, log.logEndOffset)
+    }
+
+    // Truncations greater than or equal to the log end offset should
+    // clear the epoch cache
+    verifyTruncationClearsEpochCache(epoch = 20, truncationOffset = 
log.logEndOffset)
+    verifyTruncationClearsEpochCache(epoch = 24, truncationOffset = 
log.logEndOffset + 1)
+  }
+
   /**
    * Test the values returned by the logSegments call
    */

Reply via email to