showuon commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1347002486


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -538,7 +533,75 @@ class RemoteIndexCacheTest {
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
       s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
     // file is corrupted it should fetch from remote storage again
-    verifyFetchIndexInvocation(count = 1)
+    verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))
+  }
+
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()
+        // Signal TestWait to unblock itself so that test can be completed
+        latchForTestWait.countDown()
+      } else {
+        // Subsequent call for markForCleanup method
+        latchForCacheRead.countDown()
+        latchForCacheRemove.countDown()
+      }
+    }).when(spyEntry).markForCleanup()
+
+    val removeCache = (() => {
+      cache.remove(rlsMetadata.remoteLogSegmentId().id())
+    }): Runnable
+
+    val readCache = (() => {
+      // Wait for signal to start CacheRead
+      latchForCacheRead.await()
+      cache.getIndexEntry(rlsMetadata)
+      // Signal the CacheRemove to start renaming the files
+      latchForCacheRemove.countDown()
+    }): Runnable
+
+    val executor = Executors.newFixedThreadPool(2)
+    try {
+      executor.submit(removeCache: Runnable)
+      executor.submit(readCache: Runnable)
+
+      // Wait for signal to complete the test
+      latchForTestWait.await()
+      assertCacheSize(1)
+      val entry = cache.getIndexEntry(rlsMetadata)
+      assertTrue(Files.exists(entry.offsetIndex().file().toPath))

Review Comment:
   Please help me understand these lines. My understanding is:
   In L598, we want to verify the `cache.getIndexEntry` will be run after 
`cache.remove` completion. So, that's where the cacheSize 1 comes frome.
   If so, then, In L599, why should we `getIndexEntry` again before asserting 
L600?
   



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -538,7 +533,75 @@ class RemoteIndexCacheTest {
     assertEquals(RemoteIndexCache.DIR_NAME, 
offsetIndexFile.getParent.getFileName.toString,
       s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
     // file is corrupted it should fetch from remote storage again
-    verifyFetchIndexInvocation(count = 1)
+    verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))
+  }
+
+  @Test
+  def testConcurrentRemoveReadForCache(): Unit = {
+    // Create a spy Cache Entry
+    val rlsMetadata = new 
RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), 
baseOffset, lastOffset,
+      time.milliseconds(), brokerId, time.milliseconds(), segmentSize, 
Collections.singletonMap(0, 0L))
+
+    val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
+    val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
+    val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))
+
+    val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, 
txIndex))
+    cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)
+
+    assertCacheSize(1)
+
+    val latchForCacheRead = new CountDownLatch(1)
+    val latchForCacheRemove = new CountDownLatch(1)
+    val latchForTestWait = new CountDownLatch(1)
+
+    var markForCleanupCallCount = 0
+
+    doAnswer((invocation: InvocationOnMock) => {
+      markForCleanupCallCount += 1
+
+      if (markForCleanupCallCount == 1) {
+        // Signal the CacheRead to unblock itself
+        latchForCacheRead.countDown()
+        // Wait for signal to start renaming the files
+        latchForCacheRemove.await()
+        // Calling the markForCleanup() actual method to start renaming the 
files
+        invocation.callRealMethod()
+        // Signal TestWait to unblock itself so that test can be completed
+        latchForTestWait.countDown()
+      } else {
+        // Subsequent call for markForCleanup method
+        latchForCacheRead.countDown()
+        latchForCacheRemove.countDown()
+      }

Review Comment:
   when will this be called?



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -132,7 +133,7 @@ class RemoteIndexCacheTest {
     // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
     val resultPosition = cache.lookupOffset(rlsMetadata, 
offsetPosition1.offset)
     assertEquals(offsetPosition1.position, resultPosition)
-    verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, 
IndexType.TIMESTAMP))
+    verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET))

Review Comment:
   I'm not following this change. Do you mean it will fail if we keep 
`Seq(IndexType.OFFSET, IndexType.TIMESTAMP)`? From my understanding, all 3 
index types will be invoked once if cache miss. So, I think we can just call 
`verifyFetchIndexInvocation(count = 1)` to verify all 3 types.



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -568,27 +631,26 @@ class RemoteIndexCacheTest {
   }
 
   private def verifyFetchIndexInvocation(count: Int,
-                                         indexTypes: Seq[IndexType] =
-                                         Seq(IndexType.OFFSET, 
IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
+                                         indexTypes: Seq[IndexType]): Unit = {

Review Comment:
   I don't think this change is necessary. When fetchIndex, we'll always fetch 
all 3 types of indexes. So why can't we verify all of them by default?



##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -281,14 +282,8 @@ class RemoteIndexCacheTest {
     // no expired entries yet
     assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be 
zero at start of test")
 
-    // invalidate the cache. it should async mark the entry for removal
-    cache.internalCache.invalidate(internalIndexKey)
-
-    // wait until entry is marked for deletion
-    TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
-      "Failed to mark cache entry for cleanup after invalidation")
-    TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
-      "Failed to cleanup cache entry after invalidation")
+    // call remove function to mark the entry for removal
+    cache.remove(internalIndexKey)
 
     // first it will be marked for cleanup, second time markForCleanup is 
called when cleanup() is called
     verify(cacheEntry, times(2)).markForCleanup()

Review Comment:
   In L291, we want to verify `cleanup` is invoked, but after my suggestion to 
remove `cleanup` in `internalCache.asMap().computeIfPresent`, now, we need to 
add this before verify it.
   ```
       TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
         "Failed to cleanup cache entry after invalidation")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to