divijvaidya commented on code in PR #13850:
URL: https://github.com/apache/kafka/pull/13850#discussion_r1233828378
##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -117,51 +128,60 @@ class RemoteIndexCacheTest {
@Test
def testCacheEntryExpiry(): Unit = {
- val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir =
logDir.toString)
+ // close existing cache created in test setup before creating a new one
+ Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
Review Comment:
No, it wouldn't fail the test but it will lead to memory & thread leaks.
This is because in some tests, we are forcefully assigning a new value to
member variable `cache` in the test. Hence, we end up with two instances of
cache, one which was initialized in `@BeforeEach` and another which the test
created by itself. If we don't explicitly close the cache which is initialized
at the `@BeforeEach`, it will keep on consuming heap and have stray threads
which won't be released until process is closed.
##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -170,44 +190,173 @@ class RemoteIndexCacheTest {
assertThrows(classOf[IllegalStateException], () =>
cache.getIndexEntry(metadataList.head))
}
+ @Test
+ def testCloseIsIdempotent(): Unit = {
+ val spyCleanerThread = spy(cache.cleanerThread)
+ cache.cleanerThread = spyCleanerThread
+ cache.close()
+ cache.close()
+ // verify that cleanup is only called once
+ verify(spyCleanerThread).initiateShutdown()
+ }
+
+ @Test
+ def testClose(): Unit = {
+ val spyInternalCache = spy(cache.internalCache)
+ val spyCleanerThread = spy(cache.cleanerThread)
+
+ // replace with new spy cache
+ cache.internalCache = spyInternalCache
+ cache.cleanerThread = spyCleanerThread
+
+ // use the cache
+ val tpId = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0))
+ val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId)
+ val entry = cache.getIndexEntry(metadataList.head)
+
+ val spyTxnIndex = spy(entry.txnIndex)
+ val spyOffsetIndex = spy(entry.offsetIndex)
+ val spyTimeIndex = spy(entry.timeIndex)
+ // remove this entry and replace with spied entry
+ cache.internalCache.invalidateAll()
+ cache.internalCache.put(metadataList.head.remoteLogSegmentId().id(), new
Entry(spyOffsetIndex, spyTimeIndex, spyTxnIndex))
+
+ // close the cache
+ cache.close()
+
+ // cleaner thread should be closed properly
+ verify(spyCleanerThread).initiateShutdown()
+ verify(spyCleanerThread).awaitShutdown()
+
+ // close for all index entries must be invoked
+ verify(spyTxnIndex).close()
+ verify(spyOffsetIndex).close()
+ verify(spyTimeIndex).close()
+
+ // index files must not be deleted
+ verify(spyTxnIndex, times(0)).deleteIfExists()
+ verify(spyOffsetIndex, times(0)).deleteIfExists()
+ verify(spyTimeIndex, times(0)).deleteIfExists()
+ }
+
+ @Test
+ def testConcurrentReadWriteAccessForCache(): Unit = {
Review Comment:
Thank you :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]