jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-      leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    handleAndConvertLogAppendInfo(
+      log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+        leaderEpoch = epoch,
+        origin = AppendOrigin.Coordinator
+      )
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+    appendInfo.firstOffset match {
+      case Some(firstOffset) =>
+        if (firstOffset.relativePositionInSegment == 0) {
+          // Assume that a new segment was created if the relative position is 0
+          log.deleteOldSegments()
+        }
+        new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+      case None =>
+        throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+    }
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+            s"and the log start offset ($startOffset)"
+          )
+        }
+      }.orElse(0)
+    }
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
     val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-      new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+      if (oldestSnapshotId.isPresent() &&
+        offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+        offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
       First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that sending a snapshot is not required. 
When thinking through the code in `ReplicatedLog::endOffsetForEpoch`, I always 
interpreted as trying to find the largest offset at which it the follower could 
have diverged. I would argue that the lower bound for this would be the 
`oldestSnapshotId` hence the modification if the leader cache couldn't find the 
end offset for the given epoch.
   
   > However, we can handle both cases the same. We can send the latest 
snapshot id to the follower, which will cause it to truncate.
   
   Regarding the "last fetched epoch" > `quorum.epoch()` case:
   Even though this cannot happen based on my earlier comment, why not return 
`ValidatedFetchOffsetAndEpoch.diverging(logEndOffset, quorum.epoch())`? This is 
much easier for me to reason about and I think it will have the same effect on 
the follower. The follower will truncate to the end offset where the largest 
local epoch <= the leader epoch?
   
   For example, Sending `largestSnapshotId` will not cause the follower to 
`maybeTruncateFullyToLatestSnapshot` because `largestSnapshotId.epoch < 
log.latestEpoch` on the follower.
   
   I need to think about it some more but I think the reality is that 
`KafkaRaftClient` doesn't handle the case very well when quorum.epoch() is not 
strictly increasing after a leader change.
   
   ### Proposed Changes
   
   I think we can make this code much easier to understand if we remove 
`ReplicatedLog::endOffsetForEpoch` and instead add a method to `ReplicatedLog` 
with the following signature `ValidatedFetchOffsetAndEpoch 
validateOffsetAndEpoch(long offset, int epoch)`. We can move the code in 
`KafkaRaftClient::validateFetchOffsetAndEpoch` to this new method. I'll also 
rename `ValidatedFetchOffsetAndEpoch` to `ValidOffsetAndEpoch` but let me know 
if you have a better name. :)
   
   ```java
       /**
          * Validate the given offset and epoch against the log and oldest 
snapshot.
          *
          * Returns the largest valid offset and epoch given `offset` and 
`epoch` as the upper bound.
          * This can result in three possible values returned:
          *
          * 1. ValidatedOffsetAndEpoch.valid if the given offset and epoch is 
valid in the log.
          *
          * 2. ValidatedOffsetAndEpoch.diverging if the given offset and epoch 
is not valid; and the
          * largest valid offset and epoch is in the log.
          *
          * 3. ValidatedOffsetAndEpoch.snapshot if the given offset and epoch 
is not valid; and the largest
          * valid offset and epoch is less than the oldest snapshot.
          *
          * @param offset the offset to validate
          * @param epoch the epoch of the record at offset - 1
          * @return the largest valid offset and epoch
          */
       ValidOffsetAndEpoch validateOffsetAndEpoch(long offset, int epoch);
   ```
   
   I am liking this design because it would also allow us to move some of the 
tests from `KafkaRaftClientSnapshotTest` to `KafkaMetadataLogTest`. Tests 
against `KafkaMetadataLogTest` are much easier to write and read. This design 
will also require the code to handle all possible values for `offset` and 
`epoch` since it is in `ReplicatedLog` and we cannot assume a specific 
implementation of `RaftClient`.
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to