iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1364998096


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -672,16 +673,84 @@ class RemoteIndexCacheTest {
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    var entry: RemoteIndexCache.Entry = null
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()

Review Comment:
   @showuon  
   `For (1), the markForCleanUp is an injected method for controlling the 
invoking order. So there are latches wait/countdown.`
   Do you mean this is a mock method and no rename would happen in this case ?
   Effectively the functionality/logic  of `markCleanUp` is called one time 
only ?
   
   I was thinking something like this
    
   ```
   `val latchForTestWait = new CountDownLatch(2)
    val removeCache = (() => {
         cache.remove(rlsMetadata.remoteLogSegmentId().id())
         latchForTestWait.countdown()
       }): Runnable
   
       val readCache = (() => {
         entry = cache.getIndexEntry(rlsMetadata)
         // Signal the CacheRemove to start renaming the files
         latchForTestWait.countDown()
       }): Runnable
   
       val executor = Executors.newFixedThreadPool(2)
       try {
         executor.submit(removeCache: Runnable)
         executor.submit(readCache: Runnable)
   
         // Wait for signal to complete the test
         latchForTestWait.await()
   
         // validate cache size based on the file existence`
          if offset file exists validate this 
        // validate rsm call should happen  if( execution order is remove,read)
        if cache size == 0
        // validate no rsm call should happen if ( execution order is 
read,remove)
        
   ```
   In the test case mentioned in the jira  
[KAFKA-15481](https://issues.apache.org/jira/browse/KAFKA-15481)
   the execution order is remove,read and the overall result is cache size 0 
which is wrong because of timegap between removal and renaming the files. Here 
we are validating the same with rsm call count. If they are atomic rsm 
execution should happen and files should be restored.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to