ijuma commented on a change in pull request #8812:
URL: https://github.com/apache/kafka/pull/8812#discussion_r436180609



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -826,8 +832,16 @@ class Log(@volatile private var _dir: File,
         preallocate = config.preallocate))
     }
 
-    recoveryPoint = activeSegment.readNextOffset
-    recoveryPoint
+    // Update the recovery point if there was a clean shutdown and did not 
perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not 
ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, 
it's best to only advance
+    // the recovery point in flush(Long).
+    if (hasCleanShutdownFile)
+      logEndOffsetOption.foreach(recoveryPoint = _)
+    else
+      recoveryPoint = Math.min(recoveryPoint, logEndOffset)

Review comment:
       I meant to use `logEndOffsetOption`, so this is a bug. :) It probably 
indicates that the variable name is bad (I kept it from before). If I had used 
the right variable, it would be:
   
   ```scala
   def readNextOffset: Long = {
       val fetchData = read(offsetIndex.lastOffset, log.sizeInBytes)
       if (fetchData == null)
         baseOffset
       else
         fetchData.records.batches.asScala.lastOption
           .map(_.nextOffset)
           .getOrElse(baseOffset)
     }
   ```
   
   The idea is that if we delete a bunch of segments, then the recovery point 
we passed to the `Log` constructor could be ahead of what remains.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to