iit2009060 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1364998096
########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -672,16 +673,84 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { + // Create a spy Cache Entry + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + + val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) + cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + + assertCacheSize(1) + + var entry: RemoteIndexCache.Entry = null + + val latchForCacheRead = new CountDownLatch(1) + val latchForCacheRemove = new CountDownLatch(1) + val latchForTestWait = new CountDownLatch(1) + + var markForCleanupCallCount = 0 + + doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { + // Signal the CacheRead to unblock itself + latchForCacheRead.countDown() + // Wait for signal to start renaming the files + latchForCacheRemove.await() + // Calling the markForCleanup() actual method to start renaming the files + invocation.callRealMethod() Review Comment: @showuon `For (1), the markForCleanUp is an injected method for controlling the invoking order. So there are latches wait/countdown.` Do you mean this is a mock method and no rename would happen in this case ? Effectively the functionality/logic of `markCleanUp` is called one time only ? I was thinking something like this ``` `val latchForTestWait = new CountDownLatch(2) val removeCache = (() => { cache.remove(rlsMetadata.remoteLogSegmentId().id()) latchForTestWait.countdown() }): Runnable val readCache = (() => { entry = cache.getIndexEntry(rlsMetadata) // Signal the CacheRemove to start renaming the files latchForTestWait.countDown() }): Runnable val executor = Executors.newFixedThreadPool(2) try { executor.submit(removeCache: Runnable) executor.submit(readCache: Runnable) // Wait for signal to complete the test latchForTestWait.await() // validate cache size ` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org