This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new ebd63b54bd KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411) ebd63b54bd is described below commit ebd63b54bddae886f8125d904e1676333e8f4e58 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Mon Jul 25 13:24:40 2022 -0700 KAFKA-14078; Do leader/epoch validation in Fetch before checking for valid replica (#12411) After the fix for https://github.com/apache/kafka/pull/12150, if a follower receives a request from another replica, it will return UNKNOWN_LEADER_EPOCH even if the leader epoch matches. We need to do epoch leader/epoch validation first before we check whether we have a valid replica. Reviewers: David Jacot <dja...@confluent.io> --- core/src/main/scala/kafka/cluster/Partition.scala | 36 +++++++++++++------ .../scala/unit/kafka/cluster/PartitionTest.scala | 41 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 319025226c..538c51f903 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1205,22 +1205,32 @@ class Partition(val topicPartition: TopicPartition, minOneMessage: Boolean, updateFetchState: Boolean ): LogReadInfo = { - def readFromLocalLog(): LogReadInfo = { + def readFromLocalLog(log: UnifiedLog): LogReadInfo = { readRecords( + log, fetchPartitionData.lastFetchedEpoch, fetchPartitionData.fetchOffset, fetchPartitionData.currentLeaderEpoch, maxBytes, fetchParams.isolation, - minOneMessage, - fetchParams.fetchOnlyLeader + minOneMessage ) } if (fetchParams.isFromFollower) { // Check that the request is from a valid replica before doing the read - val replica = followerReplicaOrThrow(fetchParams.replicaId, fetchPartitionData) - val logReadInfo = readFromLocalLog() + val (replica, logReadInfo) = inReadLock(leaderIsrUpdateLock) { + val localLog = localLogWithEpochOrThrow( + fetchPartitionData.currentLeaderEpoch, + fetchParams.fetchOnlyLeader + ) + val replica = followerReplicaOrThrow( + fetchParams.replicaId, + fetchPartitionData + ) + val logReadInfo = readFromLocalLog(localLog) + (replica, logReadInfo) + } if (updateFetchState && logReadInfo.divergingEpoch.isEmpty) { updateFollowerFetchState( @@ -1234,7 +1244,13 @@ class Partition(val topicPartition: TopicPartition, logReadInfo } else { - readFromLocalLog() + inReadLock(leaderIsrUpdateLock) { + val localLog = localLogWithEpochOrThrow( + fetchPartitionData.currentLeaderEpoch, + fetchParams.fetchOnlyLeader + ) + readFromLocalLog(localLog) + } } } @@ -1270,16 +1286,14 @@ class Partition(val topicPartition: TopicPartition, } private def readRecords( + localLog: UnifiedLog, lastFetchedEpoch: Optional[Integer], fetchOffset: Long, currentLeaderEpoch: Optional[Integer], maxBytes: Int, fetchIsolation: FetchIsolation, - minOneMessage: Boolean, - fetchOnlyFromLeader: Boolean - ): LogReadInfo = inReadLock(leaderIsrUpdateLock) { - val localLog = localLogWithEpochOrThrow(currentLeaderEpoch, fetchOnlyFromLeader) - + minOneMessage: Boolean + ): LogReadInfo = { // Note we use the log end offset prior to the read. This ensures that any appends following // the fetch do not prevent a follower from coming into sync. val initialHighWatermark = localLog.highWatermark diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 65a6cdadf4..5038219579 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -238,6 +238,47 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(None, partition.futureLog) } + @Test + def testReplicaFetchToFollower(): Unit = { + val controllerEpoch = 3 + val followerId = brokerId + 1 + val leaderId = brokerId + 2 + val replicas = List[Integer](brokerId, followerId, leaderId).asJava + val isr = List[Integer](brokerId, followerId, leaderId).asJava + val leaderEpoch = 8 + val partitionEpoch = 1 + + assertTrue(partition.makeFollower(new LeaderAndIsrPartitionState() + .setControllerEpoch(controllerEpoch) + .setLeader(leaderId) + .setLeaderEpoch(leaderEpoch) + .setIsr(isr) + .setPartitionEpoch(partitionEpoch) + .setReplicas(replicas) + .setIsNew(true), + offsetCheckpoints, None + )) + + def assertFetchFromReplicaFails[T <: ApiException]( + expectedExceptionClass: Class[T], + leaderEpoch: Option[Int] + ): Unit = { + assertThrows(expectedExceptionClass, () => { + fetchFollower( + partition, + replicaId = followerId, + fetchOffset = 0L, + leaderEpoch = leaderEpoch + ) + }) + } + + assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], None) + assertFetchFromReplicaFails(classOf[NotLeaderOrFollowerException], Some(leaderEpoch)) + assertFetchFromReplicaFails(classOf[UnknownLeaderEpochException], Some(leaderEpoch + 1)) + assertFetchFromReplicaFails(classOf[FencedLeaderEpochException], Some(leaderEpoch - 1)) + } + @Test def testFetchFromUnrecognizedFollower(): Unit = { val controllerEpoch = 3