hachikuji commented on a change in pull request #11189:
URL: https://github.com/apache/kafka/pull/11189#discussion_r685608200



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2810,4 +2814,183 @@ class ReplicaManagerTest {
           Replicas.NONE, Replicas.NONE, 2, 123, 456)))),
     replicaManager.calculateDeltaChanges(TEST_DELTA))
   }
+
+  @Test
+  def testDeltaFromLeaderToFollower(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val numOfRecords = 3
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    // Make the local replica the leader
+    val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, 
epoch))
+    replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, 
epoch))
+
+    // Check the state of that partition and fetcher
+    val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+    assertTrue(leaderPartition.isLeader)
+    assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+    assertEquals(epoch, leaderPartition.getLeaderEpoch)
+
+    assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+
+    // Send a produce request and advance the highwatermark
+    val leaderResponse = sendProducerAppend(replicaManager, topicPartition, 
numOfRecords)
+    fetchMessages(
+      replicaManager,
+      otherId,
+      topicPartition,
+      new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()),
+      Int.MaxValue,
+      IsolationLevel.READ_UNCOMMITTED,
+      None
+    )
+    assertEquals(Errors.NONE, leaderResponse.get.error)
+
+    // Change the local replica to follower
+    val followerMetadataImage = imageFromTopics(topicsImage(localId, false, 
epoch + 1))
+    replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, 
false, epoch + 1))
+
+    // Append on a follower should fail
+    val followerResponse = sendProducerAppend(replicaManager, topicPartition, 
numOfRecords)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
+
+    // Check the state of that partition and fetcher
+    val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+    assertFalse(followerPartition.isLeader)
+    assertEquals(epoch + 1, followerPartition.getLeaderEpoch)
+
+    val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+    assertNotEquals(None, fetcher)
+    assertEquals(BrokerEndPoint(otherId, "localhost", 9093), 
fetcher.get.sourceBroker)
+  }
+
+  @Test
+  def testDeltaFromFollowerToLeader(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val numOfRecords = 3
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    // Make the local replica the follower
+    val followerMetadataImage = imageFromTopics(topicsImage(localId, false, 
epoch))
+    replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, 
false, epoch))
+
+    // Check the state of that partition and fetcher
+    val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+    assertFalse(followerPartition.isLeader)
+    assertEquals(epoch, followerPartition.getLeaderEpoch)
+
+    val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+    assertNotEquals(None, fetcher)
+    assertEquals(BrokerEndPoint(otherId, "localhost", 9093), 
fetcher.get.sourceBroker)
+
+    // Append on a follower should fail
+    val followerResponse = sendProducerAppend(replicaManager, topicPartition, 
numOfRecords)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, followerResponse.get.error)
+
+    // Change the local replica to leader
+    val leaderMetadataImage = imageFromTopics(topicsImage(localId, true, epoch 
+ 1))
+    replicaManager.applyDelta(leaderMetadataImage, topicsDelta(localId, true, 
epoch + 1))
+
+    // Send a produce request and advance the highwatermark
+    val leaderResponse = sendProducerAppend(replicaManager, topicPartition, 
numOfRecords)
+    fetchMessages(
+      replicaManager,
+      otherId,
+      topicPartition,
+      new PartitionData(numOfRecords, 0, Int.MaxValue, Optional.empty()),
+      Int.MaxValue,
+      IsolationLevel.READ_UNCOMMITTED,
+      None
+    )
+    assertEquals(Errors.NONE, leaderResponse.get.error)
+
+    val HostedPartition.Online(leaderPartition) = 
replicaManager.getPartition(topicPartition)
+    assertTrue(leaderPartition.isLeader)
+    assertEquals(Set(localId, otherId), leaderPartition.inSyncReplicaIds)
+    assertEquals(epoch + 1, leaderPartition.getLeaderEpoch)
+
+    assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+  }
+
+  @Test
+  def testDeltaFollowerWithNoChange(): Unit = {
+    val localId = 1
+    val otherId = localId + 1
+    val epoch = 100
+    val topicPartition = new TopicPartition("foo", 0)
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+    // Make the local replica the follower
+    val followerMetadataImage = imageFromTopics(topicsImage(localId, false, 
epoch))
+    replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, 
false, epoch))
+
+    // Check the state of that partition and fetcher
+    val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+    assertFalse(followerPartition.isLeader)
+    assertEquals(epoch, followerPartition.getLeaderEpoch)
+
+    val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+    assertNotEquals(None, fetcher)
+    assertEquals(BrokerEndPoint(otherId, "localhost", 9093), 
fetcher.get.sourceBroker)
+
+    // Apply the same delta again
+    replicaManager.applyDelta(followerMetadataImage, topicsDelta(localId, 
false, epoch))
+
+    // Check that the state stays the same
+    val HostedPartition.Online(noChangePartition) = 
replicaManager.getPartition(topicPartition)
+    assertFalse(noChangePartition.isLeader)
+    assertEquals(epoch, noChangePartition.getLeaderEpoch)
+
+    val noChangeFetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+    assertNotEquals(None, noChangeFetcher)
+    assertEquals(BrokerEndPoint(otherId, "localhost", 9093), 
noChangeFetcher.get.sourceBroker)

Review comment:
       nit: maybe a simple way to write these two assertions:
   ```
   assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
noChangeFetcher.map(_.sourceBroker))
   ```

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2235,8 +2244,15 @@ class ReplicaManager(val config: KafkaConfig,
         }
       }
     }
-    updateLeaderAndFollowerMetrics(newFollowerTopicSet)
+
+    
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
+    stateChangeLogger.info(s"Stopped fetchers as part of become-follower for 
${partitionsToMakeFollower.size} partitions")
+
     replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
+    stateChangeLogger.info(s"Started fetchers as part of become-follower for 
${partitionsToMakeFollower.size} partitions")
+
+    updateLeaderAndFollowerMetrics(newFollowerTopicSet)
+

Review comment:
       nit: remove unneeded newline




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to