Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-16 Thread via GitHub


showuon merged PR #14511:
URL: https://github.com/apache/kafka/pull/14511


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-16 Thread via GitHub


showuon commented on PR #14511:
URL: https://github.com/apache/kafka/pull/14511#issuecomment-1763872414

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs()
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
integration.kafka.server.FetchFromFollowerIntegrationTest.testFollowerCompleteDelayedFetchesOnReplication(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-16 Thread via GitHub


showuon commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1360211831


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   Wait, I checked the history, the catch `FileNotFoundException` is a typo, 
should be `NoSuchFileException`. OK, make sense. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-16 Thread via GitHub


showuon commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1360207445


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   In this case, we can directly catch `IOException`, in case the 
`FileNotFoundException` is thrown. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460675


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark 

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460627


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark 

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460606


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460521


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-14 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1359435180


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   @showuon Ah, I think the `FileNotFoundException` added here may not be the 
correct exception thrown. This is what I discovered when merging the latest 
trunk:
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests
   
   错误
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
   栈跟踪
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
at 
java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   @showuon Ah, I think the `FileNotFoundException` added here may not be the 
correct exception thrown. This is what I discovered when merging the latest 
trunk:
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests
   
   错误
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
   栈跟踪
   java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted
at 
java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88)
at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104)
at 
java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-13 Thread via GitHub


showuon commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1358195654


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark 

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-13 Thread via GitHub


showuon commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1358164148


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}

Review Comment:
   Looks like these methods are duplicated. Could we extract it as a class 
method?



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -525,7 +525,7 @@ class RemoteIndexCacheTest {
   .filter(path => path.getFileName.toString.endsWith(suffix))
   .findAny()
   } catch {
-case _: FileNotFoundException => Optional.empty()
+case _: NoSuchFileException => Optional.empty()

Review Comment:
   Why should we change this exception?



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)

Review Comment:
   Why can't we use `assertCacheSize(0)`?



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,108 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entries with limited capacity of 2 entries -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-13 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1358013056


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,103 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-13 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1358012358


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,103 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark 

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-13 Thread via GitHub


satishd commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1357974980


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,103 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry

Review Comment:
   typo: replace `entrys`  with `entries`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-12 Thread via GitHub


satishd commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1357784736


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -561,6 +561,103 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  try {
+Files.walk(cache.cacheDir().toPath())
+  .filter(Files.isRegularFile(_))
+  .filter(path => path.getFileName.toString.endsWith(suffix))
+  .findAny()
+  } catch {
+case _: NoSuchFileException => Optional.empty()
+  }
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache size to 1 byte to ensure that all the entries are 
evicted from it.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val entry0 = cache.getIndexEntry(metadataList(0))
+val entry1 = cache.getIndexEntry(metadataList(1))
+cache.getIndexEntry(metadataList(2))
+assertCacheSize(2)
+val missingMetadata = metadataList(0)
+val missingEntry = entry0
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup,
+  "Failed to mark evicted cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted,
+  "Failed to cleanup evicted cache entry after resizing cache.")
+// verify no index files for `missingEntry` on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent,
+  s"Offset index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent,
+  s"Time index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent,
+  s"Txn index file for evicted entry should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent,
+  s"Index file marked for deletion for evicted entry should not be present 
on disk at ${cache.cacheDir()}")
+
+// Reduce cache capacity to only store 1 entries
+cache.resizeCacheSize(1 * estimateEntryBytesSize)
+assertCacheSize(1)
+
+val nextMissingMetadata = metadataList(1)
+val nextMissingEntry = entry1
+// wait until evicted entry is marked for deletion
+TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup,
+  "Failed to mark 

Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-12 Thread via GitHub


divijvaidya commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1356830611


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -137,15 +138,8 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager 
remoteStorageManager,
 public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
 lock.writeLock().lock();
 try {
-// When resizing the cache, we always start with an empty cache. 
There are two main reasons:
-// 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
-// cache to the new cache in time when resizing inside.
-// 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
-// in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
-// will be inconsistent.
-internalCache.invalidateAll();
-log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
-internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+internalCache.policy().eviction().orElseThrow(() -> new 
NoSuchElementException("No eviction policy is set for the remote index cache.")

Review Comment:
   Ah! I think the difference is that we use a builder to construct a new cache 
in initCache() whereas we want to access a method (policy()) on existing cache 
over here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-11 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1355219802


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -716,4 +710,8 @@ public static String 
remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
 return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
 }
 
+public static String 
remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-11 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1355175100


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -137,15 +138,8 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager 
remoteStorageManager,
 public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
 lock.writeLock().lock();
 try {
-// When resizing the cache, we always start with an empty cache. 
There are two main reasons:
-// 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
-// cache to the new cache in time when resizing inside.
-// 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
-// in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
-// will be inconsistent.
-internalCache.invalidateAll();
-log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
-internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+internalCache.policy().eviction().orElseThrow(() -> new 
NoSuchElementException("No eviction policy is set for the remote index cache.")

Review Comment:
   Isn't the purpose of this PR to dynamically set the cache size? When the new 
size is smaller than the previous size, some elements can be dynamically 
evicted. If `initEmptyCache` is still called when `resizeCacheSize`, this 
effect does not seem to be achieved. If I understand correctly. @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-10 Thread via GitHub


divijvaidya commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1352082129


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -137,15 +138,8 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager 
remoteStorageManager,
 public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
 lock.writeLock().lock();
 try {
-// When resizing the cache, we always start with an empty cache. 
There are two main reasons:
-// 1. Resizing the cache is not a high-frequency operation, and 
there is no need to fill the data in the old
-// cache to the new cache in time when resizing inside.
-// 2. Since the eviction of the caffeine cache is cleared 
asynchronously, it is possible that after the entry
-// in the old cache is filled in the new cache, the old cache will 
clear the entry, and the data in the two caches
-// will be inconsistent.
-internalCache.invalidateAll();
-log.info("Invalidated all entries in the cache and triggered the 
cleaning of all index files in the cache dir.");
-internalCache = initEmptyCache(remoteLogIndexFileCacheSize);
+internalCache.policy().eviction().orElseThrow(() -> new 
NoSuchElementException("No eviction policy is set for the remote index cache.")

Review Comment:
   Why aren't we changing this in initEmptyCache() itself? Right now, setting 
the eviction policy is done separately in two places, in `resizeCacheSize` and 
in `initEmptyCache`. Alternatively, we can call `initEmptyCache` from both 
these places. It's just that in the case of `resizeCacheSize`, we will reset 
the maxSize of the cache to the new value.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -716,4 +710,8 @@ public static String 
remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
 return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + 
LogFileUtils.TXN_INDEX_FILE_SUFFIX;
 }
 
+public static String 
remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {

Review Comment:
   please add a comment:
   `// visible for testing`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-09 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1349931016


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -554,6 +554,115 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  Files.walk(cache.cacheDir())
+.filter(Files.isRegularFile(_))
+.filter(path => path.getFileName.toString.endsWith(suffix))
+.findAny()
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache limit to 1 to ensure that all are evicted.

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-09 Thread via GitHub


hudeqi commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1349931417


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -554,6 +554,115 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  Files.walk(cache.cacheDir())
+.filter(Files.isRegularFile(_))
+.filter(path => path.getFileName.toString.endsWith(suffix))
+.findAny()
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache limit to 1 to ensure that all are evicted.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val cacheEntrys = new mutable.HashMap[Uuid, RemoteIndexCache.Entry]()
+cacheEntrys.put(metadataList(0).remoteLogSegmentId().id(), 
cache.getIndexEntry(metadataList(0)))
+cacheEntrys.put(metadataList(1).remoteLogSegmentId().id(), 
cache.getIndexEntry(metadataList(1)))
+cacheEntrys.put(metadataList(2).remoteLogSegmentId().id(), 
cache.getIndexEntry(metadataList(2)))
+
+assertCacheSize(2)
+val missingMetadataOpt = {
+  metadataList.find(segmentMetadata => {
+val segmentId = segmentMetadata.remoteLogSegmentId().id()
+!cache.internalCache.asMap().containsKey(segmentId)
+  })
+}
+assertFalse(missingMetadataOpt.isEmpty)
+val missingEntry = cacheEntrys.find(entry => 
entry._1.equals(missingMetadataOpt.get.remoteLogSegmentId().id())).get._2

Review Comment:
   Great! updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]

2023-10-09 Thread via GitHub


kamalcph commented on code in PR #14511:
URL: https://github.com/apache/kafka/pull/14511#discussion_r1349882603


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -554,6 +554,115 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  Files.walk(cache.cacheDir())
+.filter(Files.isRegularFile(_))
+.filter(path => path.getFileName.toString.endsWith(suffix))
+.findAny()
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent)
+
assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent)
+
+// Reduce the cache limit to 1 to ensure that all are evicted.
+cache.resizeCacheSize(1L)
+
+// wait until entry is marked for deletion
+TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
+  "Failed to mark cache entry for cleanup after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
+  "Failed to cleanup cache entry after resizing cache.")
+
+// verify no index files on remote cache dir
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
+  s"Offset index file should not be present on disk at 
${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
+  s"Txn index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
+  s"Time index file should not be present on disk at ${cache.cacheDir()}")
+TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
+  s"Index file marked for deletion should not be present on disk at 
${cache.cacheDir()}")
+
+assertTrue(cache.internalCache().estimatedSize() == 0)
+
+// Increase cache capacity to only store 2 entries
+cache.resizeCacheSize(2 * estimateEntryBytesSize)
+assertCacheSize(0)
+
+val cacheEntrys = new mutable.HashMap[Uuid, RemoteIndexCache.Entry]()
+cacheEntrys.put(metadataList(0).remoteLogSegmentId().id(), 
cache.getIndexEntry(metadataList(0)))
+cacheEntrys.put(metadataList(1).remoteLogSegmentId().id(), 
cache.getIndexEntry(metadataList(1)))
+cacheEntrys.put(metadataList(2).remoteLogSegmentId().id(), 
cache.getIndexEntry(metadataList(2)))
+
+assertCacheSize(2)
+val missingMetadataOpt = {
+  metadataList.find(segmentMetadata => {
+val segmentId = segmentMetadata.remoteLogSegmentId().id()
+!cache.internalCache.asMap().containsKey(segmentId)
+  })
+}
+assertFalse(missingMetadataOpt.isEmpty)
+val missingEntry = cacheEntrys.find(entry => 
entry._1.equals(missingMetadataOpt.get.remoteLogSegmentId().id())).get._2

Review Comment:
   Since this is a LRU cache, can we directly check whether the first entry 
metadaList(0).remoteLogSegmentId().id() is evicted? 



##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -554,6 +554,115 @@ class RemoteIndexCacheTest {
 assertTrue(cache.internalCache().estimatedSize() == 0)
   }
 
+  @Test
+  def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
+
+def getIndexFileFromRemoteCacheDir(suffix: String) = {
+  Files.walk(cache.cacheDir())
+.filter(Files.isRegularFile(_))
+.filter(path => path.getFileName.toString.endsWith(suffix))
+.findAny()
+}
+
+// The test process for resizing is: put 1 entry -> evict to empty -> put 
3 entrys with limited capacity of 2 entrys -> evict to 1 entry
+val estimateEntryBytesSize = estimateOneEntryBytesSize()
+val tpId = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0))
+val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+assertCacheSize(0)
+// getIndex for first time will call rsm#fetchIndex
+val cacheEntry = cache.getIndexEntry(metadataList.head)
+assertCacheSize(1)
+