divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1236598557


##########
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##########
@@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
   init()
 
   // Start cleaner thread that will clean the expired entries
-  val cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {
+  private[remote] var cleanerThread: ShutdownableThread = new 
ShutdownableThread(remoteLogIndexCacheCleanerThread) {
     setDaemon(true)
 
     override def doWork(): Unit = {
-      while (!closed) {
-        try {
+      try {
+        while (!isRemoteIndexCacheClosed.get()) {
           val entry = expiredIndexes.take()
-          info(s"Cleaning up index entry $entry")
+          debug(s"Cleaning up index entry $entry")
           entry.cleanup()
-        } catch {
-          case ex: InterruptedException => info("Cleaner thread was 
interrupted", ex)
-          case ex: Exception => error("Error occurred while fetching/cleaning 
up expired entry", ex)
         }
+      } catch {
+        case ex: InterruptedException =>
+          // cleaner thread should only be interrupted when cache is being 
closed, else it's an error
+          if (!isRemoteIndexCacheClosed.get()) {
+            error("Cleaner thread received interruption but remote index cache 
is not closed", ex)
+            throw ex
+          } else {
+            debug("Cleaner thread was interrupted on cache shutdown")
+          }
+        case ex: Exception => error("Error occurred while fetching/cleaning up 
expired entry", ex)
       }
     }
   }
+
   cleanerThread.start()
 
   def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry 
= {
-    if(closed) throw new IllegalStateException("Instance is already closed.")
-
-    def loadIndexFile[T](fileName: String,
-                         suffix: String,
-                         fetchRemoteIndex: RemoteLogSegmentMetadata => 
InputStream,
-                         readIndex: File => T): T = {
-      val indexFile = new File(cacheDir, fileName + suffix)
-
-      def fetchAndCreateIndex(): T = {
-        val tmpIndexFile = new File(cacheDir, fileName + suffix + 
RemoteIndexCache.TmpFileSuffix)
-
-        val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
-        try {
-          Files.copy(inputStream, tmpIndexFile.toPath)
-        } finally {
-          if (inputStream != null) {
-            inputStream.close()
-          }
-        }
+    if (isRemoteIndexCacheClosed.get()) {
+      throw new IllegalStateException(s"Unable to fetch index for " +
+        s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. 
Index instance is already closed.")
+    }
 
-        Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, 
false)
-        readIndex(indexFile)
-      }
+    inReadLock(lock) {
+      val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
+      internalCache.get(cacheKey, (uuid: Uuid) => {

Review Comment:
   Yes, it will update the cache with a new entry. But since the internalCache 
is thread safe, we don't need to prevent any other thread from read/writing to 
another entry in the cache, hence, we don't need to acquire a write lock over 
the entire RemoteIndexCache.
   
   Does this answer your question?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to