showuon commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1360317032


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -592,16 +593,75 @@ class RemoteIndexCacheTest {
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    var entry: RemoteIndexCache.Entry = null
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    doAnswer((invocation: InvocationOnMock) => {
+      // Signal the CacheRead to unblock itself
+      latchForCacheRead.countDown()
+      // Wait for signal to start renaming the files
+      latchForCacheRemove.await()
+      // Calling the markForCleanup() actual method to start renaming the files
+      invocation.callRealMethod()

Review Comment:
   > 3. readCache executed and finished because no lock is pending on the 
remove operation
   
   No, it's not right. The lock is in the concurrentMap inside cache. 
   But checking it again, I found the lock is when "updating the entry". That 
is, we have 2 threads trying to do:
   1. create a new entry for the key
   2. remove the entry for the key
   We can't make sure which one will complete first. All we can make sure is 
the 2 operations are atomic. So, after L646:
   ```
         // Wait for signal to complete the test
         latchForTestWait.await()
       
        // Here, we can't make sure if the cache size is 0 or 1 at this point. 
I think we can only do is to make sure either it's cache size 1 and file 
existed (create goes later), or cache size is 0 and file inexisted (remove goes 
later)
   
       // So, maybe we verify with this:
         if (Files.exists(entry.offsetIndex().file().toPath)) {
             assertCacheSize(1)
        } else {
           assertCacheSize(0)
        } 
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to