showuon commented on code in PR #13850: URL: https://github.com/apache/kafka/pull/13850#discussion_r1236592041
########## core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala: ########## @@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageM init() // Start cleaner thread that will clean the expired entries - val cleanerThread: ShutdownableThread = new ShutdownableThread("remote-log-index-cleaner") { + private[remote] var cleanerThread: ShutdownableThread = new ShutdownableThread(remoteLogIndexCacheCleanerThread) { setDaemon(true) override def doWork(): Unit = { - while (!closed) { - try { + try { + while (!isRemoteIndexCacheClosed.get()) { val entry = expiredIndexes.take() - info(s"Cleaning up index entry $entry") + debug(s"Cleaning up index entry $entry") entry.cleanup() - } catch { - case ex: InterruptedException => info("Cleaner thread was interrupted", ex) - case ex: Exception => error("Error occurred while fetching/cleaning up expired entry", ex) } + } catch { + case ex: InterruptedException => + // cleaner thread should only be interrupted when cache is being closed, else it's an error + if (!isRemoteIndexCacheClosed.get()) { + error("Cleaner thread received interruption but remote index cache is not closed", ex) + throw ex + } else { + debug("Cleaner thread was interrupted on cache shutdown") + } + case ex: Exception => error("Error occurred while fetching/cleaning up expired entry", ex) } } } + cleanerThread.start() def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry = { - if(closed) throw new IllegalStateException("Instance is already closed.") - - def loadIndexFile[T](fileName: String, - suffix: String, - fetchRemoteIndex: RemoteLogSegmentMetadata => InputStream, - readIndex: File => T): T = { - val indexFile = new File(cacheDir, fileName + suffix) - - def fetchAndCreateIndex(): T = { - val tmpIndexFile = new File(cacheDir, fileName + suffix + RemoteIndexCache.TmpFileSuffix) - - val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata) - try { - Files.copy(inputStream, tmpIndexFile.toPath) - } finally { - if (inputStream != null) { - inputStream.close() - } - } + if (isRemoteIndexCacheClosed.get()) { + throw new IllegalStateException(s"Unable to fetch index for " + + s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}. Index instance is already closed.") + } - Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, false) - readIndex(indexFile) - } + inReadLock(lock) { + val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id() + internalCache.get(cacheKey, (uuid: Uuid) => { Review Comment: Should we use `writeLock` here since the `get` means > If the specified key is not already associated with a value, attempts to compute its value using the given mapping function and enters it into this cache unless {@code null}. The entire method invocation is performed atomically, so the function is applied at most once per key. Some attempted update operations on this cache by other threads may be blocked while the computation is in progress, so the computation should be short and simple, and must not attempt * to update any other mappings of this cache. https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Cache.java#L60-L65 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org