divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1233828378


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -117,51 +128,60 @@ class RemoteIndexCacheTest {
 
   @Test
   def testCacheEntryExpiry(): Unit = {
-    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = 
logDir.toString)
+    // close existing cache created in test setup before creating a new one
+    Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")

Review Comment:
   No, it wouldn't fail the test but it will lead to memory & thread leaks. 
This is because in some tests, we are forcefully assigning a new value to 
member variable `cache` in the test. Hence, we end up with two instances of 
cache, one which was initialized in `@BeforeEach` and another which the test 
created by itself. If we don't explicitly close the cache  which is initialized 
at the `@BeforeEach`, it will keep on consuming heap and have stray threads 
which won't be released until process is closed.



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -170,44 +190,173 @@ class RemoteIndexCacheTest {
     assertThrows(classOf[IllegalStateException], () => 
cache.getIndexEntry(metadataList.head))
   }
 
+  @Test
+  def testCloseIsIdempotent(): Unit = {
+    val spyCleanerThread = spy(cache.cleanerThread)
+    cache.cleanerThread = spyCleanerThread
+    cache.close()
+    cache.close()
+    // verify that cleanup is only called once
+    verify(spyCleanerThread).initiateShutdown()
+  }
+
+  @Test
+  def testClose(): Unit = {
+    val spyInternalCache = spy(cache.internalCache)
+    val spyCleanerThread = spy(cache.cleanerThread)
+
+    // replace with new spy cache
+    cache.internalCache = spyInternalCache
+    cache.cleanerThread = spyCleanerThread
+
+    // use the cache
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+    val entry = cache.getIndexEntry(metadataList.head)
+
+    val spyTxnIndex = spy(entry.txnIndex)
+    val spyOffsetIndex = spy(entry.offsetIndex)
+    val spyTimeIndex = spy(entry.timeIndex)
+    // remove this entry and replace with spied entry
+    cache.internalCache.invalidateAll()
+    cache.internalCache.put(metadataList.head.remoteLogSegmentId().id(), new 
Entry(spyOffsetIndex, spyTimeIndex, spyTxnIndex))
+
+    // close the cache
+    cache.close()
+
+    // cleaner thread should be closed properly
+    verify(spyCleanerThread).initiateShutdown()
+    verify(spyCleanerThread).awaitShutdown()
+
+    // close for all index entries must be invoked
+    verify(spyTxnIndex).close()
+    verify(spyOffsetIndex).close()
+    verify(spyTimeIndex).close()
+
+    // index files must not be deleted
+    verify(spyTxnIndex, times(0)).deleteIfExists()
+    verify(spyOffsetIndex, times(0)).deleteIfExists()
+    verify(spyTimeIndex, times(0)).deleteIfExists()
+  }
+
+  @Test
+  def testConcurrentReadWriteAccessForCache(): Unit = {

Review Comment:
   Thank you :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to