This is an automated email from the ASF dual-hosted git repository.
divijv pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 1897af3ef92 KAFKA-15511: Handle CorruptIndexException in
RemoteIndexCache (#14459)
1897af3ef92 is described below
commit 1897af3ef9251024c419e514506c8344114feaf5
Author: iit2009060 <[email protected]>
AuthorDate: Fri Sep 29 15:56:46 2023 +0530
KAFKA-15511: Handle CorruptIndexException in RemoteIndexCache (#14459)
A bug in the RemoteIndexCache leads to a situation where the cache does not
replace the corrupted index with a new index instance fetched from remote
storage. This commit fixes the bug by adding correct handling for
`CorruptIndexException`.
Reviewers: Divij Vaidya <[email protected]>, Satish Duggana
<[email protected]>, Kamal Chandraprakash <[email protected]>,
Alexandre Dupriez <[email protected]>
---
.../kafka/log/remote/RemoteIndexCacheTest.scala | 28 +++++++++++++++++++++-
.../storage/internals/log/RemoteIndexCache.java | 3 +--
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
index 2c0e6389221..93e3f01511f 100644
--- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
-import java.io.{File, FileInputStream, IOException}
+import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.Files
import java.util
import java.util.Collections
@@ -524,6 +524,23 @@ class RemoteIndexCacheTest {
}
}
+ @Test
+ def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
+ // create Corrupt Offset Index File
+ createCorruptRemoteIndexCacheOffsetFile()
+ val entry = cache.getIndexEntry(rlsMetadata)
+ // Test would fail if it throws corrupt Exception
+ val expectedOffsetIndexFileName: String =
remoteOffsetIndexFileName(rlsMetadata)
+ val offsetIndexFile = entry.offsetIndex.file().toPath
+
+ assertEquals(expectedOffsetIndexFileName,
offsetIndexFile.getFileName.toString)
+ // assert that parent directory for the index files is correct
+ assertEquals(RemoteIndexCache.DIR_NAME,
offsetIndexFile.getParent.getFileName.toString,
+ s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
+ // file is corrupted it should fetch from remote storage again
+ verifyFetchIndexInvocation(count = 1)
+ }
+
private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
=
RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId,
baseOffset, lastOffset,
@@ -598,4 +615,13 @@ class RemoteIndexCacheTest {
timeIndex.flush()
}
}
+
+ private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
+ val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir,
RemoteIndexCache.DIR_NAME), rlsMetadata))
+ pw.write("Hello, world")
+ // The size of the string written in the file is 12 bytes,
+ // but it should be multiple of Offset Index EntrySIZE which is equal to 8.
+ pw.close()
+ }
+
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
index 455913947b3..2bbe9d76ecf 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
@@ -21,7 +21,6 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -310,7 +309,7 @@ public class RemoteIndexCache implements Closeable {
if (Files.exists(indexFile.toPath())) {
try {
index = readIndex.apply(indexFile);
- } catch (CorruptRecordException ex) {
+ } catch (CorruptIndexException ex) {
log.info("Error occurred while loading the stored index file
{}", indexFile.getPath(), ex);
}
}