kamalcph commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1349857381


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -524,23 +527,221 @@ class RemoteIndexCacheTest {
     }
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupted Index File in remote index cache
+    createCorruptedIndexFile(indexType, cache.cacheDir())
     val entry = cache.getIndexEntry(rlsMetadata)
     // Test would fail if it throws corrupt Exception

Review Comment:
   Can we update this comment to?
   
   ```
   Test would fail it it throws exception other than CorruptIndexException
   ```



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -554,23 +557,221 @@ class RemoteIndexCacheTest {
     assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
-  @Test
-  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
-    // create Corrupt Offset Index File
-    createCorruptRemoteIndexCacheOffsetFile()
+  @ParameterizedTest
+  @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", 
"TRANSACTION"))
+  def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit 
= {
+    // create Corrupted Index File in remote index cache
+    createCorruptedIndexFile(indexType, cache.cacheDir())
     val entry = cache.getIndexEntry(rlsMetadata)
     // Test would fail if it throws corrupt Exception
-    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
     val offsetIndexFile = entry.offsetIndex.file().toPath
+    val txnIndexFile = entry.txnIndex.file().toPath
+    val timeIndexFile = entry.timeIndex.file().toPath
+
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val expectedTimeIndexFileName: String = 
remoteTimeIndexFileName(rlsMetadata)
+    val expectedTxnIndexFileName: String = 
remoteTransactionIndexFileName(rlsMetadata)
 
     assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
+    assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
+
     // assert that parent directory for the index files is correct
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
-      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+      s"offsetIndex=$offsetIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
txnIndexFile.getParent.getFileName.toString,
+      s"txnIndex=$txnIndexFile is created under incorrect parent")
+    assertEquals(RemoteIndexCache.DIR_NAME, 
timeIndexFile.getParent.getFileName.toString,
+      s"timeIndex=$timeIndexFile is created under incorrect parent")
+
     // file is corrupted it should fetch from remote storage again
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
+    reset(rsm)
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        // Create corrupted index file
+        createCorruptTimeIndexOffsetFile(tpDir)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+
+    assertThrows(classOf[CorruptIndexException], () => 
cache.getIndexEntry(rlsMetadata))
+    
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
+    // Current status
+    // (cache is null)
+    // RemoteCacheDir contain
+    // 1. Offset Index File is fine and not corrupted
+    // 2. Time Index File is corrupted
+    // What should be the code flow in next execution
+    // 1. No rsm call for fetching OffSet Index File.
+    // 2. Time index file should be fetched from remote storage again as it is 
corrupted in the first execution.
+    // 3. Transaction index file should be fetched from remote storage.
+    reset(rsm)
+    // delete all files created in tpDir
+    Files.walk(tpDir.toPath, 1)
+      .filter(Files.isRegularFile(_))
+      .forEach(path => Files.deleteIfExists(path))
+    // rsm should return no corrupted file in the 2nd execution
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
+        val timeIdx = createTimeIndexForSegmentMetadata(metadata)
+        val txnIdx = createTxIndexForSegmentMetadata(metadata)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not 
accessed.
+        }
+      })
+    cache.getIndexEntry(rlsMetadata)
+    // rsm should not be called to fetch offset Index
+    verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
+    verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))

Review Comment:
   Txn index file was not corrupted. Can we add a comment to explain why are we 
fetching it again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to