hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r564976547



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2118,7 +2139,7 @@ class Log(@volatile private var _dir: File,
    *
    *  @param newOffset The new offset to start the log with
    */
-  private[log] def truncateFullyAndStartAt(newOffset: Long): Unit = {
+  def truncateFullyAndStartAt(newOffset: Long): Unit = {

Review comment:
       It is a little unfortunate to expose the truncation APIs from `Log`. For 
logs which are managed by `LogManager`, truncation calls are expected to go 
through `LogManager`. That makes me think it might be useful to have a 
distinction between the low-level `Log` object and that which is managed by 
`LogManager`. For example, maybe we could use `ManagedLog` or something like 
that. Not something to address here, just food for thought in the way of future 
improvement.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1910,7 +1931,7 @@ class Log(@volatile private var _dir: File,
         in the header.
       */
       appendInfo.firstOffset match {
-        case Some(firstOffset) => roll(Some(firstOffset))
+        case Some(firstOffset) => roll(Some(firstOffset.messageOffset))

Review comment:
       nit: might be able to simplify this a little bit. Maybe something like 
this:
   ```scala
   val rollOffset = appendInfo.firstOffset.map(_.messageOffset)
     .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE))
   roll(Some(rollOffset))
   ```

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-      leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    handleAndConvertLogAppendInfo(
+      log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+        leaderEpoch = epoch,
+        origin = AppendOrigin.Coordinator
+      )
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+    appendInfo.firstOffset match {
+      case Some(firstOffset) =>
+        if (firstOffset.relativePositionInSegment == 0) {
+          // Assume that a new segment was created if the relative position is 0
+          log.deleteOldSegments()
+        }
+        new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+      case None =>
+        throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+    }
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+            s"and the log start offset ($startOffset)"
+          )
+        }
+      }.orElse(0)
+    }
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
     val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-      new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+      if (oldestSnapshotId.isPresent() &&
+        offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+        offsetAndEpoch.leaderEpoch == leaderEpoch) {
+
+        // The leaderEpoch is smaller thant the smallest epoch on the log. 
Overide the diverging
+        // epoch to the oldest snapshot which should be the snapshot at the 
log start offset
+        val snapshotId = oldestSnapshotId().get();

Review comment:
       nit: remove semicolon here and below. also some unnecessary parenthesis 
in here

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-      leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    handleAndConvertLogAppendInfo(
+      log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+        leaderEpoch = epoch,
+        origin = AppendOrigin.Coordinator
+      )
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+    appendInfo.firstOffset match {
+      case Some(firstOffset) =>
+        if (firstOffset.relativePositionInSegment == 0) {
+          // Assume that a new segment was created if the relative position is 0
+          log.deleteOldSegments()
+        }
+        new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+      case None =>
+        throw new KafkaException(s"Append failed unexpectedly: $appendInfo")

Review comment:
       The `LogAppendInfo` contains an `errorMessage` field. Perhaps we can 
include it here?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-      leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    handleAndConvertLogAppendInfo(
+      log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+        leaderEpoch = epoch,
+        origin = AppendOrigin.Coordinator
+      )
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+    appendInfo.firstOffset match {
+      case Some(firstOffset) =>
+        if (firstOffset.relativePositionInSegment == 0) {
+          // Assume that a new segment was created if the relative position is 0
+          log.deleteOldSegments()
+        }
+        new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+      case None =>
+        throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+    }
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+            s"and the log start offset ($startOffset)"
+          )
+        }
+      }.orElse(0)
+    }
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
     val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-      new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+      if (oldestSnapshotId.isPresent() &&
+        offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+        offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
       Always tough to think my way through this logic. So here we are handling 
the case where the client has requested the end offset for an epoch which is 
smaller than the entries remaining in the log. The current epoch cache 
implementation handles this by returning the requested epoch with an end offset 
equal to the log start offset. So we detect the case here by checking that the 
returned epoch matches the requested epoch and the end offset matches the log 
start offset, which we assume to be equal to the offset corresponding to the 
oldest snapshot. Right so far?
   
   So there are still a couple cases that are possible after verifying this:
   
   1. The epoch matches the snapshot epoch. We know then that the end offset 
must be equal to the snapshot offset. This is a case where there is a new epoch 
which begins on the first offset in the log.
   2. The epoch is less than the snapshot epoch. The follower might have been 
offline for a long time and is requesting an ancient epoch which we no longer 
have any knowledge of.
   
   I'm convinced about the handling for the first case, but less sure about the 
second one. The guarantee that this API provides is that it will return the 
largest epoch which is less than or equal to the requested epoch and its last 
offset. But for this case, the snapshot epoch is larger than the request epoch. 
In fact, we have no way of satisfying this API because we have lost the old 
data.
   
   Thinking in terms of what we want to happen might be helpful. We want to 
detect this as an out of range fetch which causes the follower to fetch from 
the latest snapshot. What if we return `Optional.empty` since we cannot 
determine the end offset for the requested epoch? Currently we do the following 
in `KafkaRaftClient`:
   
   ```java
          OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> {
             return new IllegalStateException(
                   String.format(
                       "Expected to find an end offset for epoch %s since it 
must be less than the current epoch %s",
                       lastFetchedEpoch,
                       quorum.epoch()
                   )
               );
           });
   ```
   
   There are two cases we want to handle: the requested epoch is less than any 
known epoch we have and the requested epoch is larger than any known epoch. The 
latter case might be possible if there was an "unclean" election of some kind. 
However, we can handle both cases the same. We can send the latest snapshot id 
to the follower, which will cause it to truncate. The follower may see this as 
a loss of committed data and fail, but that is ok. At least we will get a clear 
message about the failure.
   
   

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-      leaderEpoch = epoch,
-      origin = AppendOrigin.Coordinator)
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    handleAndConvertLogAppendInfo(
+      log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+        leaderEpoch = epoch,
+        origin = AppendOrigin.Coordinator
+      )
+    )
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
     if (records.sizeInBytes == 0)
       throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-    val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-    new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-      throw new KafkaException("Append failed unexpectedly")
-    }, appendInfo.lastOffset)
+    
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+    appendInfo.firstOffset match {
+      case Some(firstOffset) =>
+        if (firstOffset.relativePositionInSegment == 0) {
+          // Assume that a new segment was created if the relative position is 0
+          log.deleteOldSegments()
+        }
+        new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+      case None =>
+        throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+    }
   }
 
   override def lastFetchedEpoch: Int = {
-    log.latestEpoch.getOrElse(0)
+    log.latestEpoch.getOrElse {
+      latestSnapshotId.map { snapshotId =>
+        val logEndOffset = endOffset().offset
+        if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+          // Return the epoch of the snapshot when the log is empty
+          snapshotId.epoch
+        } else {
+          throw new KafkaException(
+            s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+            s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+            s"and the log start offset ($startOffset)"
+          )
+        }
+      }.orElse(0)
+    }
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
     val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-      new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+      if (oldestSnapshotId.isPresent() &&

Review comment:
       nit: might consider converting to scala Option and changing this to a 
`match`. A little easier on the eyes

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -16,28 +16,41 @@
  */
 package kafka.raft
 
+import java.nio.file.Files
 import java.nio.file.NoSuchFileException
+import java.util.NoSuchElementException
 import java.util.Optional
+import java.util.concurrent.ConcurrentSkipListSet
 
-import kafka.log.{AppendOrigin, Log}
+import kafka.log.{AppendOrigin, Log, SnapshotGenerated}
 import kafka.server.{FetchHighWatermark, FetchLogEnd}
 import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.raft
-import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, ReplicatedLog}
+import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, 
Isolation, OffsetMetadata, ReplicatedLog}
 import org.apache.kafka.snapshot.FileRawSnapshotReader
 import org.apache.kafka.snapshot.FileRawSnapshotWriter
 import org.apache.kafka.snapshot.RawSnapshotReader
 import org.apache.kafka.snapshot.RawSnapshotWriter
+import org.apache.kafka.snapshot.Snapshots
 
 import scala.compat.java8.OptionConverters._
 
-class KafkaMetadataLog(
+final class KafkaMetadataLog private (
   log: Log,
+  // This object needs to be thread-safe because the polling thread in the 
KafkaRaftClient implementation
+  // and other threads will access this object. This object is used to 
efficiently notify the polling thread
+  // when snapshots are created.
+  snapshotIds: ConcurrentSkipListSet[raft.OffsetAndEpoch],
   topicPartition: TopicPartition,
-  maxFetchSizeInBytes: Int = 1024 * 1024
+  maxFetchSizeInBytes: Int
 ) extends ReplicatedLog {
 
+  private[this] var oldestSnapshotId = snapshotIds

Review comment:
       Hmm... It might be helpful to document this fact somewhere since it 
seems non-obvious. It surprised me anyway.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to