This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new ec1c3c8  KAFKA-7786; Ignore OffsetsForLeaderEpoch response if epoch 
changed while request in flight (#6101)
ec1c3c8 is described below

commit ec1c3c8a272b0fe75041bc901b7f27365a382923
Author: Anna Povzner <[email protected]>
AuthorDate: Wed Jan 9 11:09:48 2019 -0800

    KAFKA-7786; Ignore OffsetsForLeaderEpoch response if epoch changed while 
request in flight (#6101)
    
    There is a race condition in ReplicaFetcherThread, where we can update 
PartitionFetchState with the new leader epoch (same leader) before handling the 
OffsetsForLeaderEpoch response with FENCED_LEADER_EPOCH error which causes 
removing partition from partitionStates, which in turn causes no fetching until 
the next LeaderAndIsr.
    
    This patch adds logic to ensure that the leader epoch doesn't change while 
an OffsetsForLeaderEpoch request is in flight (which could happen with 
back-to-back leader elections). If it has changed, we ignore the response.
    
    Also added toString() implementation to PartitionData, because some log 
messages did not show useful info which I found while investigating the above 
system test failure.
    
    Reviewers: Ismael Juma <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../requests/OffsetsForLeaderEpochRequest.java     |  8 ++
 .../requests/OffsetsForLeaderEpochResponse.java    | 10 +++
 .../scala/kafka/server/AbstractFetcherThread.scala | 13 +++-
 .../kafka/server/AbstractFetcherThreadTest.scala   | 85 ++++++++++++++++++++++
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 13 +++-
 6 files changed, 125 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index feeb875..57949a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -180,5 +180,13 @@ public class OffsetsForLeaderEpochRequest extends 
AbstractRequest {
             this.leaderEpoch = leaderEpoch;
         }
 
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(currentLeaderEpoch=").append(currentLeaderEpoch).
+                append(", leaderEpoch=").append(leaderEpoch).
+                append(")");
+            return bld.toString();
+        }
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index 6f70850..d5d1265 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -166,4 +166,14 @@ public class OffsetsForLeaderEpochResponse extends 
AbstractResponse {
         responseStruct.set(TOPICS, topics.toArray());
         return responseStruct;
     }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("(type=OffsetsForLeaderEpochResponse, ")
+            .append(", throttleTimeMs=").append(throttleTimeMs)
+            .append(", 
epochEndOffsetsByPartition=").append(epochEndOffsetsByPartition)
+            .append(")");
+        return bld.toString();
+    }
 }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 797b0f5..a78c2a1 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -179,8 +179,17 @@ abstract class AbstractFetcherThread(name: String,
       val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
       //Ensure we hold a lock during truncation.
       inLock(partitionMapLock) {
-        //Check no leadership changes happened whilst we were unlocked, 
fetching epochs
-        val leaderEpochs = fetchedEpochs.filter { case (tp, _) => 
partitionStates.contains(tp) }
+        //Check no leadership and no leader epoch changes happened whilst we 
were unlocked, fetching epochs
+        val leaderEpochs = fetchedEpochs.filter { case (tp, _) =>
+          val curPartitionState = partitionStates.stateValue(tp)
+          val partitionEpochRequest = epochRequests.get(tp).getOrElse {
+            throw new IllegalStateException(
+              s"Leader replied with partition $tp not requested in 
OffsetsForLeaderEpoch request")
+          }
+          val leaderEpochInRequest = 
partitionEpochRequest.currentLeaderEpoch.get
+          curPartitionState != null && leaderEpochInRequest == 
curPartitionState.currentLeaderEpoch
+        }
+
         val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncate(leaderEpochs)
         handlePartitionsWithErrors(partitionsWithError)
         updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 77ba934..e9e4e33 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -39,6 +39,7 @@ import org.junit.{Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Set, mutable}
 import scala.util.Random
+import org.scalatest.Assertions.assertThrows
 
 class AbstractFetcherThreadTest {
 
@@ -499,6 +500,90 @@ class AbstractFetcherThreadTest {
     assertEquals(2L, replicaState.logEndOffset)
   }
 
+  @Test
+  def testLeaderEpochChangeDuringFencedFetchEpochsFromLeader(): Unit = {
+    // The leader is on the new epoch when the OffsetsForLeaderEpoch with old 
epoch is sent, so it
+    // returns the fence error. Validate that response is ignored if the 
leader epoch changes on
+    // the follower while OffsetsForLeaderEpoch request is in flight, but able 
to truncate and fetch
+    // in the next of round of "doWork"
+    testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 1)
+  }
+
+  @Test
+  def testLeaderEpochChangeDuringSuccessfulFetchEpochsFromLeader(): Unit = {
+    // The leader is on the old epoch when the OffsetsForLeaderEpoch with old 
epoch is sent
+    // and returns the valid response. Validate that response is ignored if 
the leader epoch changes
+    // on the follower while OffsetsForLeaderEpoch request is in flight, but 
able to truncate and
+    // fetch once the leader is on the newer epoch (same as follower)
+    testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader = 0)
+  }
+
+  private def 
testLeaderEpochChangeDuringFetchEpochsFromLeader(leaderEpochOnLeader: Int): 
Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val initialLeaderEpochOnFollower = 0
+    val nextLeaderEpochOnFollower = initialLeaderEpochOnFollower + 1
+
+    val fetcher = new MockFetcherThread {
+      var fetchEpochsFromLeaderOnce = false
+      override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+        val fetchedEpochs = super.fetchEpochsFromLeader(partitions)
+        if (!fetchEpochsFromLeaderOnce) {
+          // leader epoch changes while fetching epochs from leader
+          removePartitions(Set(partition))
+          setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = nextLeaderEpochOnFollower))
+          addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = 
nextLeaderEpochOnFollower)))
+          fetchEpochsFromLeaderOnce = true
+        }
+        fetchedEpochs
+      }
+    }
+
+    fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = initialLeaderEpochOnFollower))
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = 
initialLeaderEpochOnFollower)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = initialLeaderEpochOnFollower, new 
SimpleRecord("c".getBytes)))
+    val leaderState = MockFetcherThread.PartitionState(leaderLog, 
leaderEpochOnLeader, highWatermark = 0L)
+    fetcher.setLeaderState(partition, leaderState)
+
+    // first round of truncation
+    fetcher.doWork()
+
+    // Since leader epoch changed, fetch epochs response is ignored due to 
partition being in
+    // truncating state with the updated leader epoch
+    assertEquals(Option(Truncating), 
fetcher.fetchState(partition).map(_.state))
+    assertEquals(Option(nextLeaderEpochOnFollower), 
fetcher.fetchState(partition).map(_.currentLeaderEpoch))
+
+    if (leaderEpochOnLeader < nextLeaderEpochOnFollower) {
+      fetcher.setLeaderState(
+        partition, MockFetcherThread.PartitionState(leaderLog, 
nextLeaderEpochOnFollower, highWatermark = 0L))
+    }
+
+    // make sure the fetcher is now able to truncate and fetch
+    fetcher.doWork()
+    assertEquals(fetcher.leaderPartitionState(partition).log, 
fetcher.replicaPartitionState(partition).log)
+  }
+
+  @Test
+  def 
testTruncationThrowsExceptionIfLeaderReturnsPartitionsNotRequestedInFetchEpochs():
 Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread {
+      override def fetchEpochsFromLeader(partitions: Map[TopicPartition, 
EpochData]): Map[TopicPartition, EpochEndOffset] = {
+        val unrequestedTp = new TopicPartition("topic2", 0)
+        super.fetchEpochsFromLeader(partitions) + (unrequestedTp -> new 
EpochEndOffset(0, 0))
+      }
+    }
+
+    fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+    fetcher.addPartitions(Map(partition -> offsetAndEpoch(0L, leaderEpoch = 
0)))
+    fetcher.setLeaderState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
+
+    // first round of truncation should throw an exception
+    assertThrows[IllegalStateException] {
+      fetcher.doWork()
+    }
+  }
+
   object MockFetcherThread {
     class PartitionState(var log: mutable.Buffer[RecordBatch],
                          var leaderEpoch: Int,
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index c65c254..ef39aa5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -669,7 +669,7 @@ class ReplicaFetcherThreadTest {
     //Create the thread
     val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, 
brokerEndPoint, new SystemTime())
     val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, 
configs(0), replicaManager, new Metrics(), new SystemTime(), quota, 
Some(mockNetwork))
-    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t2p1 -> 
offsetAndEpoch(0L)))
+    thread.addPartitions(Map(t1p0 -> offsetAndEpoch(0L), t1p1 -> 
offsetAndEpoch(0L)))
 
     //Run thread 3 times
     (0 to 3).foreach { _ =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c4ca2cb..5fd7697 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -547,12 +547,13 @@ class ReplicaManagerTest {
     val controllerId = 0
     val controllerEpoch = 0
     var leaderEpoch = 1
+    val leaderEpochIncrement = 2
     val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
     val countDownLatch = new CountDownLatch(1)
 
     // Prepare the mocked components for the test
     val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
-      topicPartition, followerBrokerId, leaderBrokerId, countDownLatch, 
expectTruncation = true)
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch, expectTruncation = true)
 
     // Initialize partition state to follower, with leader = 1, leaderEpoch = 1
     val partition = replicaManager.getOrCreatePartition(new 
TopicPartition(topic, topicPartition))
@@ -563,7 +564,7 @@ class ReplicaManagerTest {
 
     // Make local partition a follower - because epoch increased by more than 
1, truncation should
     // trigger even though leader does not change
-    leaderEpoch += 2
+    leaderEpoch += leaderEpochIncrement
     val leaderAndIsrRequest0 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
       controllerId, controllerEpoch,
       collection.immutable.Map(new TopicPartition(topic, topicPartition) ->
@@ -578,7 +579,13 @@ class ReplicaManagerTest {
     EasyMock.verify(mockLogMgr)
   }
 
+  /**
+   * This method assumes that the test using created ReplicaManager calls
+   * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest 
containing
+   * 'leaderEpochInLeaderAndIsr' leader epoch for partition 'topicPartition'.
+   */
   private def prepareReplicaManagerAndLogManager(topicPartition: Int,
+                                                 leaderEpochInLeaderAndIsr: 
Int,
                                                  followerBrokerId: Int,
                                                  leaderBrokerId: Int,
                                                  countDownLatch: 
CountDownLatch,
@@ -671,7 +678,7 @@ class ReplicaManagerTest {
               override def doWork() = {
                 // In case the thread starts before the partition is added by 
AbstractFetcherManager,
                 // add it here (it's a no-op if already added)
-                val initialOffset = OffsetAndEpoch(offset = 0L, leaderEpoch = 
1)
+                val initialOffset = OffsetAndEpoch(offset = 0L, leaderEpoch = 
leaderEpochInLeaderAndIsr)
                 addPartitions(Map(new TopicPartition(topic, topicPartition) -> 
initialOffset))
                 super.doWork()
 

Reply via email to