divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1236598557
##########
core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala:
##########
@@ -167,64 +235,76 @@ class RemoteIndexCache(maxSize: Int = 1024,
remoteStorageManager: RemoteStorageM
init()
// Start cleaner thread that will clean the expired entries
- val cleanerThread: ShutdownableThread = new
ShutdownableThread("remote-log-index-cleaner") {
+ private[remote] var cleanerThread: ShutdownableThread = new
ShutdownableThread(remoteLogIndexCacheCleanerThread) {
setDaemon(true)
override def doWork(): Unit = {
- while (!closed) {
- try {
+ try {
+ while (!isRemoteIndexCacheClosed.get()) {
val entry = expiredIndexes.take()
- info(s"Cleaning up index entry $entry")
+ debug(s"Cleaning up index entry $entry")
entry.cleanup()
- } catch {
- case ex: InterruptedException => info("Cleaner thread was
interrupted", ex)
- case ex: Exception => error("Error occurred while fetching/cleaning
up expired entry", ex)
}
+ } catch {
+ case ex: InterruptedException =>
+ // cleaner thread should only be interrupted when cache is being
closed, else it's an error
+ if (!isRemoteIndexCacheClosed.get()) {
+ error("Cleaner thread received interruption but remote index cache
is not closed", ex)
+ throw ex
+ } else {
+ debug("Cleaner thread was interrupted on cache shutdown")
+ }
+ case ex: Exception => error("Error occurred while fetching/cleaning up
expired entry", ex)
}
}
}
+
cleanerThread.start()
def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry
= {
- if(closed) throw new IllegalStateException("Instance is already closed.")
-
- def loadIndexFile[T](fileName: String,
- suffix: String,
- fetchRemoteIndex: RemoteLogSegmentMetadata =>
InputStream,
- readIndex: File => T): T = {
- val indexFile = new File(cacheDir, fileName + suffix)
-
- def fetchAndCreateIndex(): T = {
- val tmpIndexFile = new File(cacheDir, fileName + suffix +
RemoteIndexCache.TmpFileSuffix)
-
- val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
- try {
- Files.copy(inputStream, tmpIndexFile.toPath)
- } finally {
- if (inputStream != null) {
- inputStream.close()
- }
- }
+ if (isRemoteIndexCacheClosed.get()) {
+ throw new IllegalStateException(s"Unable to fetch index for " +
+ s"segment id=${remoteLogSegmentMetadata.remoteLogSegmentId().id()}.
Index instance is already closed.")
+ }
- Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath,
false)
- readIndex(indexFile)
- }
+ inReadLock(lock) {
+ val cacheKey = remoteLogSegmentMetadata.remoteLogSegmentId().id()
+ internalCache.get(cacheKey, (uuid: Uuid) => {
Review Comment:
Yes, it will update the cache with a new entry. But since the internalCache
is thread safe, we don't need to prevent any other thread from read/writing to
another entry in the cache, hence, we don't need to acquire a write lock over
the entire RemoteIndexCache.
Does this answer your question?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]