iit2009060 commented on code in PR #14483:
URL: https://github.com/apache/kafka/pull/14483#discussion_r1348641063


##########
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##########
@@ -568,27 +621,26 @@ class RemoteIndexCacheTest {
   }
 
   private def verifyFetchIndexInvocation(count: Int,
-                                         indexTypes: Seq[IndexType] =
-                                         Seq(IndexType.OFFSET, 
IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
+                                         indexTypes: Seq[IndexType]): Unit = {
     for (indexType <- indexTypes) {
       verify(rsm, 
times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), 
ArgumentMatchers.eq(indexType))
     }
   }
 
   private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TransactionIndex = {
-    val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata)
+    val txnIdxFile = remoteTransactionIndexFile(new File(tpDir, DIR_NAME), 
metadata)
     txnIdxFile.createNewFile()
     new TransactionIndex(metadata.startOffset(), txnIdxFile)
   }
 
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TimeIndex = {
     val maxEntries = (metadata.endOffset() - 
metadata.startOffset()).asInstanceOf[Int]
-    new TimeIndex(remoteTimeIndexFile(tpDir, metadata), 
metadata.startOffset(), maxEntries * 12)
+    new TimeIndex(remoteTimeIndexFile(new File(tpDir, DIR_NAME), metadata), 
metadata.startOffset(), maxEntries * 12)

Review Comment:
   @jeel2420  
   There are three functions which is  used to create Indexes on tpDir storage. 
     ```
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TimeIndex 
     private def createOffsetIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): OffsetIndex
     private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata): TransactionIndex
   
   ```
   tpDir storage in the current  unit test act as a data place fetched from 
remote storage. 
   
   The cases where tpDir  has been used instead of remoteCacheDir is only when 
you are creating the spyEntry. But if you see the test cases where it has 
happened , they are just testing the cache functionality and not mixing it with 
the remote storage manager functionality. 
   To support the use cases mentioned above you can do this step
   
   **Suggestion:**
   Change the signature of the below  method to take Directory path also has an 
parameter , and refactor the existing test case
    ```
   private def createTimeIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata,dir:File): TimeIndex 
     private def createOffsetIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata,dir:File): OffsetIndex
     private def createTxIndexForSegmentMetadata(metadata: 
RemoteLogSegmentMetadata,dir:File):TransactionIndex
   ```
   Then you can pass the appropriate directory required for your test case. 
   
   cc @showuon  @divijvaidya 
   
   



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##########
@@ -193,7 +192,16 @@ public File cacheDir() {
     public void remove(Uuid key) {
         lock.readLock().lock();
         try {
-            internalCache.invalidate(key);
+            internalCache.asMap().computeIfPresent(key, (k, v) -> {
+                try {
+                    v.markForCleanup();
+                    expiredIndexes.put(v);

Review Comment:
   @showuon  @jeel2420  
   Should not we use expiredIndxes.offer instead of expiredIndexes.put()
   as put will block the operation if queue size is full ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to