[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-12-02 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r534424325



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +451,35 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+  currentState
+} else if (initialFetchState.initOffset < 0) {
+  fetchOffsetAndTruncate(tp, initialFetchState.currentLeaderEpoch)
+} else if (isTruncationOnFetchSupported) {
+  val lastFetchedEpoch = latestEpoch(tp)
+  val state = if (lastFetchedEpoch.exists(_ != 
EpochEndOffset.UNDEFINED_EPOCH)) Fetching else Truncating

Review comment:
   Hmm.. Do we actually return `Some(EpochEndOffset.UNDEFINED_EPOCH)` from 
`latestEpoch`? That seems surprising.
   
   Might be worth a comment here that we still go through the `Truncating` 
state here when the message format is old.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##
@@ -64,8 +64,8 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   def resizeThreadPool(newSize: Int): Unit = {
 def migratePartitions(newSize: Int): Unit = {
   fetcherThreadMap.forKeyValue { (id, thread) =>
-val removedPartitions = thread.partitionsAndOffsets
-removeFetcherForPartitions(removedPartitions.keySet)
+val removedPartitions = thread.removeAllPartitions()
+removeFetcherForPartitions(removedPartitions.keySet) // clear state 
for removed partitions

Review comment:
   This reads a bit odd following `removeAllPartitions`. I guess what we 
get from `removeFetcherForPartitions` is the clearing of `failedPartitions` and 
de-registration from `fetcherLagStats`. Not super important, but wonder if it's 
worth trying to consolidate a little. Maybe `removeFetcherForPartitions` could 
return the initial fetch offsets or something.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1691,6 +1692,18 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  /**
+   * From IBP 2.7 onwards, we send latest fetch epoch in the request and 
truncate if a
+   * diverging epoch is returned in the response, avoiding the need for a 
separate
+   * OffsetForLeaderEpoch request.
+   */
+  private def initialFetchOffset(log: Log): Long = {

Review comment:
   I think this could be saved for a follow-up, but I wonder if we should 
consider similarly letting the initial offset be determined by the fetcher 
thread on initialization rather than being passed in. I find it confusing that 
we expect this to be the high watermark in some cases. It seems a little 
slippery the way we rely on it in 
`AbstractFetcherThread.truncateToHighWatermark`.

##
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##
@@ -453,6 +466,107 @@ class ReplicaFetcherThreadTest {
truncateToCapture.getValues.asScala.contains(101))
   }
 
+  @Test
+  def shouldTruncateIfLeaderRepliesWithDivergingEpochNotKnownToFollower(): 
Unit = {
+
+// Create a capture to track what partitions/offsets are truncated
+val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
+
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+
+// Setup all dependencies
+val quota: ReplicationQuotaManager = 
createNiceMock(classOf[ReplicationQuotaManager])
+val logManager: LogManager = createMock(classOf[LogManager])
+val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = 
createMock(classOf[ReplicaAlterLogDirsManager])
+val log: Log = createNiceMock(classOf[Log])
+val partition: Partition = createNiceMock(classOf[Partition])
+val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
+
+val initialLEO = 200
+var latestLogEpoch: Option[Int] = Some(5)
+
+// Stubs
+expect(partition.truncateTo(capture(truncateToCapture), 
anyBoolean())).anyTimes()
+expect(partition.localLogOrException).andReturn(log).anyTimes()
+expect(log.highWatermark).andReturn(115).anyTimes()
+expect(log.latestEpoch).andAnswer(() => 

[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-12-01 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r533648722



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+  currentState
+} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 
0 && initialFetchState.lastFetchedEpoch.nonEmpty &&
+  (currentState == null || currentState.state == Fetching)) {
+  PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)

Review comment:
   This check is a still a little hard to follow. I think we expect that if 
`initOffset` is negative, then `lastFetchedEpoch` will be empty and we will hit 
the `fetchOffsetAndTruncate` case below. Is that right? On the other hand, if 
`lastFetchedEpoch` is empty, then `initOffset` could still be non-negative if 
we have an old message format, which means we need to enter `Truncating` so 
that we can truncate to the high watermark. 
   
One case that is not so clear is when `currentState` is non-null. Then we 
will enter the `Truncating` state below regardless whether 
`isTruncationOnFetchSupported` is set or not. Is that what we want?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -669,11 +714,18 @@ abstract class AbstractFetcherThread(name: String,
 Option(partitionStates.stateValue(topicPartition))
   }
 
+  /**
+   * Returns current fetch state for each partition assigned to this thread. 
This is used to reassign
+   * partitions when thread pool is resized. We return `lastFetchedEpoch=None` 
to ensure we go through

Review comment:
   This is probably ok. I guess an alternative would be to not take the 
initial last fetched epoch from `InitialFetchState`, but instead use 
`latestEpoch`.

##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -102,6 +103,7 @@ class ReplicaFetcherThread(name: String,
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
   private val brokerSupportsLeaderEpochRequest = 
brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
+  private val brokerSupportsTruncationOnFetch = 
ApiVersion.isTruncationOnFetchSupported(brokerConfig.interBrokerProtocolVersion)

Review comment:
   nit: I don't think we need this. We can override 
`isTruncationOnFetchSupported` with a `val`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-27 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r512370043



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1388,6 +1390,7 @@ class Log(@volatile private var _dir: File,
 var validBytesCount = 0
 var firstOffset: Option[Long] = None
 var lastOffset = -1L
+var lastLeaderEpoch: Option[Int] = None

Review comment:
   nit: not sure how much it matters, but maybe we can avoid the extra 
garbage and just use an integer until we're ready to build the result?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +451,34 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (currentState != null && currentState.currentLeaderEpoch == 
initialFetchState.currentLeaderEpoch) {
+  currentState
+} else if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 
0 && initialFetchState.lastFetchedEpoch.nonEmpty &&
+  (currentState == null || currentState.state == Fetching)) {
+  PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)

Review comment:
   I am wondering in what situation we would find `currentState` non-null 
here. The current logic in `ReplicaManager.makeFollowers` always calls 
`removeFetcherForPartitions` before adding the partition back. The reason I ask 
is that I wasn't sure we should be taking the last fetched epoch from the 
initial state or if we should keep the current one. It seems like the latter 
might be more current?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -221,7 +223,15 @@ abstract class AbstractFetcherThread(name: String,
 
   val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, latestEpochsForPartitions)
   handlePartitionsWithErrors(partitionsWithError, 
"truncateToEpochEndOffsets")
-  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
+  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
isTruncationOnFetchSupported)
+}
+  }
+
+  private def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+inLock(partitionMapLock) {
+  val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+  handlePartitionsWithErrors(partitionsWithError, 
"truncateOnFetchResponse")
+  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets, 
maySkipTruncation = false)

Review comment:
   It's not clear to me why we set `maySkipTruncation` to false here. If 
the truncation is not complete, wouldn't that put us in the `Truncating` state?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -454,15 +492,23 @@ abstract class AbstractFetcherThread(name: String,
 * truncation completed if their offsetTruncationState indicates truncation 
completed
 *
 * @param fetchOffsets the partitions to update fetch offset and maybe mark 
truncation complete
+* @param maySkipTruncation true if we can stay in Fetching mode and 
perform truncation later based on
+   *   diverging epochs from fetch responses.
 */
-  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState]): Unit = {
+  private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: 
Map[TopicPartition, OffsetTruncationState],
+  
maySkipTruncation: Boolean): Unit = {
 val newStates: Map[TopicPartition, PartitionFetchState] = 
partitionStates.partitionStateMap.asScala
   .map { case (topicPartition, currentFetchState) =>
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
-val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+val (state, lastFetchedEpoch) = if 
(offsetTruncationState.truncationCompleted)
+  (Fetching, latestEpoch(topicPartition))
+else if 

[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-21 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r509482332



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
 val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating
+// Resetting `lastFetchedEpoch` since we are truncating and don't 
expect diverging epoch in the next fetch

Review comment:
   This is a little unclear to me. I guess it is safe to reset 
`lastFetchedEpoch` as long as we reinitialize it after the next leader change. 
On the other hand, it seems safer to always retain the value.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -426,21 +454,42 @@ abstract class AbstractFetcherThread(name: String,
 warn(s"Partition $topicPartition marked as failed")
   }
 
-  def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition] = {
+  /**
+   * Returns initial partition fetch state based on current state and the 
provided `initialFetchState`.
+   * From IBP 2.7 onwards, we can rely on truncation based on diverging data 
returned in fetch responses.
+   * For older versions, we can skip the truncation step iff the leader epoch 
matches the existing epoch.
+   */
+  private def partitionFetchState(tp: TopicPartition, initialFetchState: 
InitialFetchState, currentState: PartitionFetchState): PartitionFetchState = {
+if (isTruncationOnFetchSupported && initialFetchState.initOffset >= 0 && 
initialFetchState.lastFetchedEpoch.nonEmpty) {
+  if (currentState == null) {
+return PartitionFetchState(initialFetchState.initOffset, None, 
initialFetchState.currentLeaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)
+  }
+  // If we are in `Fetching` state can continue to fetch regardless of 
current leader epoch and truncate
+  // if necessary based on diverging epochs returned by the leader. If we 
are currently in Truncating state,
+  // fall through and handle based on current epoch.
+  if (currentState.state == Fetching) {
+return currentState

Review comment:
   Is it not possible that the `InitialFetchState` has a bump to the 
current leader epoch? We will still need the latest epoch in order to continue 
fetching.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -341,11 +352,18 @@ abstract class AbstractFetcherThread(name: String,
   // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
   if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
 // Update partitionStates only if there is no 
exception during processPartitionData
-val newFetchState = PartitionFetchState(nextOffset, 
Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
+val newFetchState = PartitionFetchState(nextOffset, 
Some(lag),
+  currentFetchState.currentLeaderEpoch, state = 
Fetching,
+  Some(currentFetchState.currentLeaderEpoch))

Review comment:
   This doesn't seem right. The last fetched epoch is supposed to represent 
the epoch of the last fetched batch. The fetcher could be fetching the data 
from an older epoch here.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -461,8 +510,9 @@ abstract class AbstractFetcherThread(name: String,
 val maybeTruncationComplete = fetchOffsets.get(topicPartition) match {
   case Some(offsetTruncationState) =>
 val state = if (offsetTruncationState.truncationCompleted) 
Fetching else Truncating

Review comment:
   Do we need to adjust this? I think we want to remain in the `Fetching` 
state if truncation detection is through `Fetch`.

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -629,7 +680,9 @@ abstract class AbstractFetcherThread(name: String,
 
   val initialLag = leaderEndOffset - offsetToFetch
   fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
-  PartitionFetchState(offsetToFetch, Some(initialLag), currentLeaderEpoch, 
state = Fetching)
+  // We don't expect diverging epochs from the next fetch request, so 
resetting `lastFetchedEpoch`

Review comment:
   Again it seems safe to keep `lastFetchedEpoch` in sync with the local 
log. If we have done a full truncation above, then `lastFetchedEpoch` will be 
`None`, but otherwise it seems like we should set it.

##
File path: 

[GitHub] [kafka] hachikuji commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-10-13 Thread GitBox


hachikuji commented on a change in pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#discussion_r504331562



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -813,8 +852,9 @@ case class OffsetTruncationState(offset: Long, 
truncationCompleted: Boolean) {
   override def toString: String = 
"offset:%d-truncationCompleted:%b".format(offset, truncationCompleted)
 }
 
-case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) {
+case class OffsetAndEpoch(offset: Long, leaderEpoch: Int, lastFetchedEpoch: 
Option[Int] = None) {

Review comment:
   Wondering if it might be better not to change this type since it is used 
in contexts where `lastFetchedEpoch` is not relevant. Following the types 
through here, we first have use `InitialFetchState` in `AbstractFetcherManager`:
   ```scala
   def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, 
InitialFetchState])
   ```
   We then convert to `OffsetAndEpoch` which gets passed down to 
`AbstractFetcherThread`:
   ```scala
   def addPartitions(initialFetchStates: Map[TopicPartition, OffsetAndEpoch]): 
Set[TopicPartition]
   ```
   Then this gets converted to `PartitionFetchState`. I wonder if it's possible 
to skip the conversion through `OffsetAndEpoch` and use `InitialFetchState` 
consistently? Perhaps the only reason the current code doesn't do that is that 
`InitialFetchState` includes the broker end point which is not really relevant 
to the fetcher thread. Maybe that's not such a big deal?

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String,
   failedPartitions.removeAll(initialFetchStates.keySet)
 
   initialFetchStates.forKeyValue { (tp, initialFetchState) =>
-// We can skip the truncation step iff the leader epoch matches the 
existing epoch
+// For IBP 2.7 onwards, we can rely on truncation based on diverging 
data returned in fetch responses.
+// For older versions, we can skip the truncation step iff the leader 
epoch matches the existing epoch
 val currentState = partitionStates.stateValue(tp)
-val updatedState = if (currentState != null && 
currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) {
+val updatedState = if (initialFetchState.offset >= 0 && 
isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) {
+  if (currentState != null)
+currentState
+  else
+PartitionFetchState(initialFetchState.offset, None, 
initialFetchState.leaderEpoch,
+  state = Fetching, initialFetchState.lastFetchedEpoch)
+} else if (currentState != null && (currentState.currentLeaderEpoch == 
initialFetchState.leaderEpoch)) {

Review comment:
   nit: unnecessary parenthesis

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -225,6 +227,20 @@ abstract class AbstractFetcherThread(name: String,
 }
   }
 
+  private def truncateOnFetchResponse(responseData: Map[TopicPartition, 
FetchData]): Unit = {
+val epochEndOffsets = responseData
+  .filter { case (tp, fetchData) => fetchData.error == Errors.NONE && 
fetchData.divergingEpoch.isPresent }
+  .map { case (tp, fetchData) =>
+val divergingEpoch = fetchData.divergingEpoch.get
+tp -> new EpochEndOffset(Errors.NONE, divergingEpoch.epoch, 
divergingEpoch.endOffset)
+  }.toMap
+inLock(partitionMapLock) {

Review comment:
   Borderline overkill perhaps, but we could check if `epochEndOffsets` is 
non-empty before acquiring the lock

##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -225,6 +227,20 @@ abstract class AbstractFetcherThread(name: String,
 }
   }
 
+  private def truncateOnFetchResponse(responseData: Map[TopicPartition, 
FetchData]): Unit = {
+val epochEndOffsets = responseData

Review comment:
   Rather than doing an additional pass over the response partitions, would 
it be reasonable to build `epochEndOffsets` inline with the other error 
handling in `processFetchRequest`?

##
File path: core/src/main/scala/kafka/server/DelayedFetch.scala
##
@@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long,
* Case E: This broker is the leader, but the requested epoch is now fenced
* Case F: The fetch offset locates not on the last segment of the log
* Case G: The accumulated bytes from all the fetching partitions exceeds 
the minimum bytes
+   * Case H: A diverging epoch was found, return response to trigger truncation

Review comment:
   Good catch here and in `FetchSession`. Do you think we should consider 
doing these fixes separately so that we can get them into 2.7? Otherwise it 
might be difficult to tie this behavior to the 2.7 IBP.

##
File path: