iit2009060 commented on code in PR #14483: URL: https://github.com/apache/kafka/pull/14483#discussion_r1348641063
########## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ########## @@ -568,27 +621,26 @@ class RemoteIndexCacheTest { } private def verifyFetchIndexInvocation(count: Int, - indexTypes: Seq[IndexType] = - Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = { + indexTypes: Seq[IndexType]): Unit = { for (indexType <- indexTypes) { verify(rsm, times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType)) } } private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex = { - val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata) + val txnIdxFile = remoteTransactionIndexFile(new File(tpDir, DIR_NAME), metadata) txnIdxFile.createNewFile() new TransactionIndex(metadata.startOffset(), txnIdxFile) } private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] - new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12) + new TimeIndex(remoteTimeIndexFile(new File(tpDir, DIR_NAME), metadata), metadata.startOffset(), maxEntries * 12) Review Comment: @jeel2420 There are three functions which is used to create Indexes on tpDir storage. ``` private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): OffsetIndex private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex ``` tpDir storage in the current unit test act as a data place fetched from remote storage. The cases where tpDir has been used instead of remoteCacheDir is only when you are creating the spyEntry. But if you see the test cases where it has happened , they are just testing the cache functionality and not mixing it with the remote storage manager functionality. To support the use cases mentioned above you can do this step **Suggestion:** Change the signature of the below method to take Directory path also has an parameter , and refactor the existing test case ``` private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata,dir:File): TimeIndex private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata,dir:File): OffsetIndex private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata,dir:File):TransactionIndex ``` Then you can pass the appropriate directory required for your test case. cc @showuon @divijvaidya ########## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ########## @@ -193,7 +192,16 @@ public File cacheDir() { public void remove(Uuid key) { lock.readLock().lock(); try { - internalCache.invalidate(key); + internalCache.asMap().computeIfPresent(key, (k, v) -> { + try { + v.markForCleanup(); + expiredIndexes.put(v); Review Comment: @showuon @jeel2420 Should not we use expiredIndxes.offer instead of expiredIndexes.put() as put will block the operation if queue size is full ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org