kowshik commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r719132048



##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -77,8 +77,9 @@ class LogSegment private[log] (val log: FileRecords,
     timeIndex.resize(size)
   }
 
-  def sanityCheck(timeIndexFileNewlyCreated: Boolean): Unit = {
-    if (lazyOffsetIndex.file.exists) {
+  def sanityCheck(timeIndexFileNewlyCreated: Boolean, isActiveSegment: 
Boolean): Unit = {
+    // We allow for absence of offset index file only for an empty active 
segment.
+    if ((isActiveSegment && size == 0) || lazyOffsetIndex.file.exists) {

Review comment:
       @junrao: When the `UnifiedLog` is [flushed during clean 
shutdown](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LogManager.scala#L527),
 we flush the `LocaLog` [until the 
logEndOffset](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/UnifiedLog.scala#L1508).
 Here an empty active segment is not included in the list of candidate segments 
to be flushed. The reason is that during `LocalLog.flush()`, the 
`LogSegments.values(recoveryPoint, logEndOffset)` call 
[here](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LocalLog.scala#L171)
 does not select the empty active segment 
([doc](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LogSegments.scala#L127-L131)),
 because, the logEndOffset would match the base offset of the empty active 
segment and thus
  get ommitted. So, prior to clean shutdown if the empty active segment's 
offset index was never created before, then, the offset index will not be 
created during clean shutdown because the empty active segment is never flushed.
   
   The above is shown in the following passing unit test:
   
   ```
   @Test
   def testFlushEmptyActiveSegmentDoesNotCreateOffsetIndex(): Unit = {
       // Create an empty log.
       val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
       val log = createLog(logDir, logConfig)
       val oneRecord = TestUtils.records(List(
         new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes)
       ))
   
       // Append a record and flush. Verify that there exists only 1 segment.
       log.appendAsLeader(oneRecord, leaderEpoch = 0)
       assertEquals(1, log.logEndOffset)
       log.flush()
       assertEquals(1, log.logSegments.size)
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertFalse(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   
       // Roll the log and verify that the new active segment's offset index is 
missing.
       log.roll()
       assertEquals(2, log.logSegments.size)
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertTrue(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   
       // Flush the log and once again verify that the active segment's offset 
index is still missing.
       log.flush()
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertTrue(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   
       // Close the log and verify that the active segment's offset index is 
still missing.
       log.close()
       assertTrue(UnifiedLog.logFile(logDir, 0).exists())
       assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
       assertTrue(UnifiedLog.logFile(logDir, 1).exists())
       assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
   }
   ```
   
   This PR mainly fixes a logging issue in the code. For example, one situation 
where the issue happens more frequently is the following: Imagine there exists 
a topic with very low ingress traffic in some/all partitions. Imagine that for 
this topic the retention setting causes all existing segments to expire and get 
removed. In such a case, [we roll the log to create an active 
segment](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/UnifiedLog.scala#L1352-L1354).
 This ensures there is at least one segment remaining in the `LocalLog` when 
the retention loop completes. However we don't create the offset index for the 
active segment until the first append operation. Now before the first append, 
if the Kafka cluster is rolled then we will see [this false negative corruption 
error message during 
recovery.](https://github.com/apache/kafka/blob/c6aeb5c5546f34276879c1eef4199d208f1c23dc/core/src/main/scala/kafka/log/LogLoader.scala#L
 336-L337)
   
   This PR fixes the logging problem by ignoring the absence of offset index 
for an empty active segment during recovery.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to