showuon commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1236592041


##########
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##########
@@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024, 
remoteStorageManager: RemoteStorageM
   init()
 
   // Start cleaner thread that will clean the expired entries
-  val cleanerThread: ShutdownableThread = new 
ShutdownableThread("remote-log-index-cleaner") {
+  private[remote] var cleanerThread: ShutdownableThread = new 
ShutdownableThread(remoteLogIndexCacheCleanerThread) {
     setDaemon(true)
 
     override def doWork(): Unit = {
-      while (!closed) {
-        try {
+      try {
+        while (!isRemoteIndexCacheClosed.get()) {
           val entry = expiredIndexes.take()
-          info(s"Cleaning up index entry $entry")
+          debug(s"Cleaning up index entry $entry")
           entry.cleanup()
-        } catch {
-          case ex: InterruptedException => info("Cleaner thread was 
interrupted", ex)
-          case ex: Exception => error("Error occurred while fetching/cleaning 
up expired entry", ex)
         }
+      } catch {
+        case ex: InterruptedException =>
+          // cleaner thread should only be interrupted when cache is being 
closed, else it's an error
+          if (!isRemoteIndexCacheClosed.get()) {
+            error("Cleaner thread received interruption but remote index cache 
is not closed", ex)
+            throw ex
+          } else {
+            debug("Cleaner thread was interrupted on cache shutdown")
+          }
+        case ex: Exception => error("Error occurred while fetching/cleaning up 
expired entry", ex)
       }
     }
   }
+
   cleanerThread.start()
 
   def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry 
= {
-    if(closed) throw new IllegalStateException("Instance is already closed.")
-
-    def loadIndexFile[T](fileName: String,
-                         suffix: String,
-                         fetchRemoteIndex: RemoteLogSegmentMetadata => 
InputStream,
-                         readIndex: File => T): T = {
-      val indexFile = new File(cacheDir, fileName + suffix)
-
-      def fetchAndCreateIndex(): T = {
-        val tmpIndexFile = new File(cacheDir, fileName + suffix + 
RemoteIndexCache.TmpFileSuffix)
-
-        val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
-        try {
-          Files.copy(inputStream, tmpIndexFile.toPath)
-        } finally {
-          if (inputStream != null) {
-            inputStream.close()
-          }
-        }
+    if (isRemoteIndexCacheClosed.get()) {
+      throw new IllegalStateException(s"Unable to fetch index for " +
+        s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. 
Index instance is already closed.")
+    }
 
-        Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, 
false)
-        readIndex(indexFile)
-      }
+    inReadLock(lock) {
+      val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
+      internalCache.get(cacheKey, (uuid: Uuid) => {

Review Comment:
   Should we use `writeLock` here since the `get` means 
   > If the specified key is not already associated with a value, attempts to 
compute its value  using the given mapping function and enters it into this 
cache unless {@code null}. The entire method invocation is performed 
atomically, so the function is applied at most once per key. Some attempted 
update operations on this cache by other threads may be blocked while the 
computation is in progress, so the computation should be short and simple, and 
must not attempt
      * to update any other mappings of this cache.
   
   
https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Cache.java#L60-L65



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to