hachikuji commented on a change in pull request #9434:
URL: https://github.com/apache/kafka/pull/9434#discussion_r505678344



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -1162,6 +1162,13 @@ class Partition(val topicPartition: TopicPartition,
     localLog.fetchOffsetSnapshot
   }
 
+  def hasDivergingEpoch(currentLeaderEpoch: Optional[Integer],
+                        lastFetchedEpoch: Int,
+                        fetchOffset: Long): Boolean = {
+    val epochEndOffset = lastOffsetForLeaderEpoch(currentLeaderEpoch, 
lastFetchedEpoch, fetchOnlyFromLeader = false)

Review comment:
       Do we need to check the error? Or are you relying on the check below 
failing if `UNDEFINED_EPOCH` is returned?

##########
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##########
@@ -96,6 +98,7 @@ class DelayedFetch(delayMs: Long,
               case FetchTxnCommitted => offsetSnapshot.lastStableOffset
             }
 
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/server/FetchSession.scala
##########
@@ -77,7 +77,8 @@ class CachedPartition(val topic: String,
                       var highWatermark: Long,
                       var leaderEpoch: Optional[Integer],
                       var fetcherLogStartOffset: Long,
-                      var localLogStartOffset: Long)
+                      var localLogStartOffset: Long,
+                      var lastFetchedEpoch: Optional[Integer] = 
Optional.empty[Integer])

Review comment:
       Do we need to provide a default here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to