showuon commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1364955572


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -672,16 +673,84 @@ class RemoteIndexCacheTest {
     verifyFetchIndexInvocation(count = 1)
   }
 
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new 
File(tpDir, DIR_NAME)))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    var entry: RemoteIndexCache.Entry = null
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()

Review Comment:
   > There are two times when markForCleanUp is called.
   >   1.  remove function which we are calling in removeCache Runnable.
   >   2. One at invocation.callRealMethod() 708 line no
   
   You're right, but they are "different" `markForCleanUp`. 
   For (1), the `markForCleanUp` is an injected method for controlling the 
invoking order. 
   For (2), it's the real `markForCleanUp` method to rename the cache files.
   
   The goal is to simulate the race condition happened in 
[KAFKA-15481](https://issues.apache.org/jira/browse/KAFKA-15481).  
   
   > Even i tried running your test case locally it always assert with 
cacheSize 0 , as it is eventually getting deleted.
   
   Yes, I think so. But in some cases, there could be 1 if getEntry goes after. 
The thread management are all decided by OS, we can't assure which one will go 
first, right? 
   
   I think the goal of this test is to make sure the issue in 
[KAFKA-15481](https://issues.apache.org/jira/browse/KAFKA-15481) will not 
happen again. That's why I added this 
[comment](https://github.com/apache/kafka/pull/14483#discussion_r1364844466).
   
   > IMO we should read and remove concurrently in the separate thread and 
validate the cacheSize based on the order of execution. 
   
   I'm not following you here. What we're doing in this test is to `read and 
remove concurrently in the separate thread`. About `validate the cacheSize 
based on the order of execution`, since we can't assure which thread will be 
executed first, we can't do this, right? If we can decide the execution order, 
then it means they are not running _concurrently_, is that right?
   
   > We should not need to call explicitly for the scenario.
   
   Maybe you can show us if it were you, what test you'll create. Some pseudo 
code are enough. Thank you.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to