hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r512370043



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1388,6 +1390,7 @@ class Log(@volatile private var _dir: File,
     var validBytesCount = 0
     var firstOffset: Option[Long] = None
     var lastOffset = -1L
+    var lastLeaderEpoch: Option[Int] = None

Review comment:
       nit: not sure how much it matters, but maybe we can avoid the extra 
garbage and just use an integer until we're ready to build the result?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String,
     warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+    if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+      currentState
+    } else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 
0 && initialFetchState.lastFetchedEpoch.nonEmpty &&
+              (currentState == null || currentState.state == Fetching)) {
+      PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+          state = Fetching, initialFetchState.lastFetchedEpoch)

Review comment:
       I am wondering in what situation we would find `currentState` non-null 
here. The current logic in `ReplicaManager.makeFollowers` always calls 
`removeFetcherForPartitions` before adding the partition back. The reason I ask 
is that I wasn't sure we should be taking the last fetched epoch from the 
initial state or if we should keep the current one. It seems like the latter 
might be more current?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String,
 
       val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
       handlePartitionsWithErrors(partitionsWithError, 
"truncateToEpochEndOffsets")
-      updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
+      updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
isTruncationOnFetchSupported)
+    }
+  }
+
+  private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+    inLock(partitionMapLock) {
+      val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+      handlePartitionsWithErrors(partitionsWithError, 
"truncateOnFetchResponse")
+      updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
maySkipTruncation = false)

Review comment:
       It's not clear to me why we set `maySkipTruncation` to false here. If 
the truncation is not complete, wouldn't that put us in the `Truncating` state?

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##########
@@ -454,15 +492,23 @@ abstract class AbstractFetcherThread(name: String,
     * truncation completed if their offsetTruncationState indicates truncation 
completed
     *
     * @param fetchOffsets the partitions to update fetch offset and maybe mark 
truncation complete
+    * @param maySkipTruncation true if we can stay in Fetching mode and 
perform truncation later based on
+   *                           diverging epochs from fetch responses.
     */
-  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState]): Unit = {
+  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState],
+                                                              
maySkipTruncation: Boolean): Unit = {
     val newStates: Map[TopicPartition, PartitionFetchState] = 
partitionStates.partitionStateMap.asScala
       .map { case (topicPartition, currentFetchState) =>
         val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
           case Some(offsetTruncationState) =>
-            val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+            val (state, lastFetchedEpoch) = if 
(offsetTruncationState.truncationCompleted)
+              (Fetching, latestEpoch(topicPartition))
+            else if (maySkipTruncation && 
currentFetchState.lastFetchedEpoch.nonEmpty)
+              (Fetching, currentFetchState.lastFetchedEpoch)

Review comment:
       I'm a little uncertain about this case. If we have truncated to an 
earlier offset, wouldn't we also need to reset last fetched epoch? I am 
thinking we should remove this check and modify the first one:
   ```scala
   val (state, lastFetchedEpoch) = if (maySkipTruncation || 
offsetTruncationState.truncationCompleted)
     (Fetching, latestEpoch(topicPartition))
   ```
   I might be missing something though.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1667,8 +1667,9 @@ class ReplicaManager(val config: KafkaConfig,
         val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map { partition =>
           val leader = metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get
             .brokerEndPoint(config.interBrokerListenerName)
-          val fetchOffset = partition.localLogOrException.highWatermark
-          partition.topicPartition -> InitialFetchState(leader, 
partition.getLeaderEpoch, fetchOffset)
+          val log = partition.localLogOrException
+          val (fetchOffset, lastFetchedEpoch) = initialFetchOffsetAndEpoch(log)
+          partition.topicPartition -> InitialFetchState(leader, 
partition.getLeaderEpoch, fetchOffset, lastFetchedEpoch)

Review comment:
       nit: line below is misaligned

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -770,7 +770,7 @@ class ReplicaManager(val config: KafkaConfig,
             logManager.abortAndPauseCleaning(topicPartition)
 
             val initialFetchState = 
InitialFetchState(BrokerEndPoint(config.brokerId, "localhost", -1),
-              partition.getLeaderEpoch, futureLog.highWatermark)
+              partition.getLeaderEpoch, futureLog.highWatermark, 
lastFetchedEpoch = None)

Review comment:
       Sounds fair.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to