showuon commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1363633722


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -672,16 +673,84 @@ class RemoteIndexCacheTest {
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    var entry: RemoteIndexCache.Entry = null
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()
+        // Signal TestWait to unblock itself so that test can be completed
+        latchForTestWait.countDown()
+      }
+    }).when(spyEntry).markForCleanup()
+
+    val removeCache = (() => {
+      cache.remove(rlsMetadata.remoteLogSegmentId().id())
+    }): Runnable
+
+    val readCache = (() => {
+      // Wait for signal to start CacheRead
+      latchForCacheRead.await()
+      entry = cache.getIndexEntry(rlsMetadata)
+      // Signal the CacheRemove to start renaming the files
+      latchForCacheRemove.countDown()
+    }): Runnable
+
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      executor.submit(removeCache: Runnable)
+      executor.submit(readCache: Runnable)
+
+      // Wait for signal to complete the test
+      latchForTestWait.await()
+      if (getIndexFileFromRemoteCacheDir(cache, 
LogFileUtils.INDEX_FILE_SUFFIX).isPresent) {
+        assertCacheSize(1)
+      } else {
+        assertCacheSize(0)
+      }

Review Comment:
   It's worth adding some comments here to explain why we need to verify 2 
different cases. Ex:
   `We can't determine read thread or remove thread will go first, but we have 
to want to make sure when cache existed, the cache file should exist, too, and 
if cache is non-existed, the cache file should not exist.` 
   
   Something like it. You can make it more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to