hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r564989172



##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-      leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    handleAndConvertLogAppendInfo(
+      log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+        leaderEpoch = epoch,
+        origin = AppendOrigin.Coordinator
+      )
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+    appendInfo.firstOffset match {
+      case Some(firstOffset) =>
+        if (firstOffset.relativePositionInSegment == 0) {
+          // Assume that a new segment was created if the relative position is 0
+          log.deleteOldSegments()
+        }
+        new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+      case None =>
+        throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+    }
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+            s"and the log start offset ($startOffset)"
+          )
+        }
+      }.orElse(0)
+    }
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
     val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-      new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+      if (oldestSnapshotId.isPresent() &&
+        offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+        offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
       Always tough to think my way through this logic. So here we are handling 
the case where the client has requested the end offset for an epoch which is 
smaller than the entries remaining in the log. The current epoch cache 
implementation handles this by returning the requested epoch with an end offset 
equal to the log start offset. So we detect the case here by checking that the 
returned epoch matches the requested epoch and the end offset matches the log 
start offset, which we assume to be equal to the offset corresponding to the 
oldest snapshot. Right so far?
   
   So there are still a couple cases that are possible after verifying this:
   
   1. The requested epoch matches the snapshot epoch. We know then that the end 
offset must be equal to the snapshot offset. This is a case where there is a 
new epoch which begins on the first offset in the log.
   2. The requested epoch is less than the snapshot epoch. The follower might 
have been offline for a long time and is requesting an ancient epoch which we 
no longer have any knowledge of.
   
   I'm convinced about the handling for the first case, but less sure about the 
second one. The guarantee that this API provides is that it will return the 
largest epoch which is less than or equal to the requested epoch and its last 
offset. But for this case, the snapshot epoch is larger than the request epoch. 
In fact, we have no way of satisfying this API because we have lost the old 
data.
   
   Thinking in terms of what we want to happen might be helpful. We want to 
detect this as an out of range fetch which causes the follower to fetch from 
the latest snapshot. What if we return `Optional.empty` since we cannot 
determine the end offset for the requested epoch? Currently we do the following 
in `KafkaRaftClient`:
   
   ```java
          OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> {
             return new IllegalStateException(
                   String.format(
                       "Expected to find an end offset for epoch %s since it 
must be less than the current epoch %s",
                       lastFetchedEpoch,
                       quorum.epoch()
                   )
               );
           });
   ```
   
   There are two cases we want to handle: the requested epoch is less than any 
known epoch we have and the requested epoch is larger than any known epoch. The 
latter case might be possible if there was an "unclean" election of some kind. 
However, we can handle both cases the same. We can send the latest snapshot id 
to the follower, which will cause it to truncate. The follower may or may not 
see this as a loss of committed data and fail, but that is ok. At least we will 
get a clear message about the failure.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to