This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 1897af3ef92 KAFKA-15511: Handle CorruptIndexException in 
RemoteIndexCache (#14459)
1897af3ef92 is described below

commit 1897af3ef9251024c419e514506c8344114feaf5
Author: iit2009060 <[email protected]>
AuthorDate: Fri Sep 29 15:56:46 2023 +0530

    KAFKA-15511: Handle CorruptIndexException in RemoteIndexCache (#14459)
    
    A bug in the RemoteIndexCache leads to a situation where the cache does not 
replace the corrupted index with a new index instance fetched from remote 
storage. This commit fixes the bug by adding correct handling for 
`CorruptIndexException`.
    
    Reviewers: Divij Vaidya <[email protected]>, Satish Duggana 
<[email protected]>, Kamal Chandraprakash <[email protected]>, 
Alexandre Dupriez <[email protected]>
---
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 28 +++++++++++++++++++++-
 .../storage/internals/log/RemoteIndexCache.java    |  3 +--
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala 
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 2c0e6389221..93e3f01511f 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.io.{File, FileInputStream, IOException}
+import java.io.{File, FileInputStream, IOException, PrintWriter}
 import java.nio.file.Files
 import java.util
 import java.util.Collections
@@ -524,6 +524,23 @@ class RemoteIndexCacheTest {
     }
   }
 
+  @Test
+  def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
+    // create Corrupt Offset Index File
+    createCorruptRemoteIndexCacheOffsetFile()
+    val entry = cache.getIndexEntry(rlsMetadata)
+    // Test would fail if it throws corrupt Exception
+    val expectedOffsetIndexFileName: String = 
remoteOffsetIndexFileName(rlsMetadata)
+    val offsetIndexFile = entry.offsetIndex.file().toPath
+
+    assertEquals(expectedOffsetIndexFileName, 
offsetIndexFile.getFileName.toString)
+    // assert that parent directory for the index files is correct
+    assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
+      s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+    // file is corrupted it should fetch from remote storage again
+    verifyFetchIndexInvocation(count = 1)
+  }
+
   private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
                                     = 
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
     val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, 
baseOffset, lastOffset,
@@ -598,4 +615,13 @@ class RemoteIndexCacheTest {
       timeIndex.flush()
     }
   }
+
+  private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
+    val pw =  new PrintWriter(remoteOffsetIndexFile(new File(tpDir, 
RemoteIndexCache.DIR_NAME), rlsMetadata))
+    pw.write("Hello, world")
+    // The size of the string written in the file is 12 bytes,
+    // but it should be multiple of Offset Index EntrySIZE which is equal to 8.
+    pw.close()
+  }
+
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 455913947b3..2bbe9d76ecf 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -21,7 +21,6 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -310,7 +309,7 @@ public class RemoteIndexCache implements Closeable {
         if (Files.exists(indexFile.toPath())) {
             try {
                 index = readIndex.apply(indexFile);
-            } catch (CorruptRecordException ex) {
+            } catch (CorruptIndexException ex) {
                 log.info("Error occurred while loading the stored index file 
{}", indexFile.getPath(), ex);
             }
         }

Reply via email to