Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
showuon merged PR #14511: URL: https://github.com/apache/kafka/pull/14511 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
showuon commented on PR #14511: URL: https://github.com/apache/kafka/pull/14511#issuecomment-1763872414 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testSyncTopicConfigs() Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated() Build / JDK 11 and Scala 2.13 / integration.kafka.server.FetchFromFollowerIntegrationTest.testFollowerCompleteDelayedFetchesOnReplication(String).quorum=kraft ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
showuon commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1360211831 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: Wait, I checked the history, the catch `FileNotFoundException` is a typo, should be `NoSuchFileException`. OK, make sense. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
showuon commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1360207445 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: In this case, we can directly catch `IOException`, in case the `FileNotFoundException` is thrown. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460675 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460627 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460606 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359460521 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1359435180 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: @showuon Ah, I think the `FileNotFoundException` added here may not be the correct exception thrown. This is what I discovered when merging the latest trunk: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests 错误 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted 栈跟踪 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted at java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88) at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104) at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499) ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: @showuon Ah, I think the `FileNotFoundException` added here may not be the correct exception thrown. This is what I discovered when merging the latest trunk: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14511/4/tests 错误 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted 栈跟踪 java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest256653951450751431/x1qGOc3yRE6shFMJVKArpw:foo-0/remote-log-index-cache/0_TGK1Vu0nTCCcb0TN379YFA.timeindex.deleted at java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:88) at java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:104) at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
showuon commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1358195654 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
showuon commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1358164148 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} Review Comment: Looks like these methods are duplicated. Could we extract it as a class method? ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -525,7 +525,7 @@ class RemoteIndexCacheTest { .filter(path => path.getFileName.toString.endsWith(suffix)) .findAny() } catch { -case _: FileNotFoundException => Optional.empty() +case _: NoSuchFileException => Optional.empty() Review Comment: Why should we change this exception? ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) Review Comment: Why can't we use `assertCacheSize(0)`? ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,108 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1358013056 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,103 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1358012358 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,103 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
satishd commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1357974980 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,103 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry Review Comment: typo: replace `entrys` with `entries`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
satishd commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1357784736 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -561,6 +561,103 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + try { +Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { +case _: NoSuchFileException => Optional.empty() + } +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache size to 1 byte to ensure that all the entries are evicted from it. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val entry0 = cache.getIndexEntry(metadataList(0)) +val entry1 = cache.getIndexEntry(metadataList(1)) +cache.getIndexEntry(metadataList(2)) +assertCacheSize(2) +val missingMetadata = metadataList(0) +val missingEntry = entry0 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => missingEntry.isMarkedForCleanup, + "Failed to mark evicted cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => missingEntry.isCleanStarted, + "Failed to cleanup evicted cache entry after resizing cache.") +// verify no index files for `missingEntry` on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteOffsetIndexFileName(missingMetadata)).isPresent, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTimeIndexFileName(missingMetadata)).isPresent, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteTransactionIndexFileName(missingMetadata)).isPresent, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(remoteDeletedSuffixIndexFileName(missingMetadata)).isPresent, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + +// Reduce cache capacity to only store 1 entries +cache.resizeCacheSize(1 * estimateEntryBytesSize) +assertCacheSize(1) + +val nextMissingMetadata = metadataList(1) +val nextMissingEntry = entry1 +// wait until evicted entry is marked for deletion +TestUtils.waitUntilTrue(() => nextMissingEntry.isMarkedForCleanup, + "Failed to mark
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
divijvaidya commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1356830611 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -137,15 +138,8 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, public void resizeCacheSize(long remoteLogIndexFileCacheSize) { lock.writeLock().lock(); try { -// When resizing the cache, we always start with an empty cache. There are two main reasons: -// 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old -// cache to the new cache in time when resizing inside. -// 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry -// in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches -// will be inconsistent. -internalCache.invalidateAll(); -log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir."); -internalCache = initEmptyCache(remoteLogIndexFileCacheSize); +internalCache.policy().eviction().orElseThrow(() -> new NoSuchElementException("No eviction policy is set for the remote index cache.") Review Comment: Ah! I think the difference is that we use a builder to construct a new cache in initCache() whereas we want to access a method (policy()) on existing cache over here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1355219802 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -716,4 +710,8 @@ public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX; } +public static String remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1355175100 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -137,15 +138,8 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, public void resizeCacheSize(long remoteLogIndexFileCacheSize) { lock.writeLock().lock(); try { -// When resizing the cache, we always start with an empty cache. There are two main reasons: -// 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old -// cache to the new cache in time when resizing inside. -// 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry -// in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches -// will be inconsistent. -internalCache.invalidateAll(); -log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir."); -internalCache = initEmptyCache(remoteLogIndexFileCacheSize); +internalCache.policy().eviction().orElseThrow(() -> new NoSuchElementException("No eviction policy is set for the remote index cache.") Review Comment: Isn't the purpose of this PR to dynamically set the cache size? When the new size is smaller than the previous size, some elements can be dynamically evicted. If `initEmptyCache` is still called when `resizeCacheSize`, this effect does not seem to be achieved. If I understand correctly. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
divijvaidya commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1352082129 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -137,15 +138,8 @@ public RemoteIndexCache(long maxSize, RemoteStorageManager remoteStorageManager, public void resizeCacheSize(long remoteLogIndexFileCacheSize) { lock.writeLock().lock(); try { -// When resizing the cache, we always start with an empty cache. There are two main reasons: -// 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old -// cache to the new cache in time when resizing inside. -// 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry -// in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches -// will be inconsistent. -internalCache.invalidateAll(); -log.info("Invalidated all entries in the cache and triggered the cleaning of all index files in the cache dir."); -internalCache = initEmptyCache(remoteLogIndexFileCacheSize); +internalCache.policy().eviction().orElseThrow(() -> new NoSuchElementException("No eviction policy is set for the remote index cache.") Review Comment: Why aren't we changing this in initEmptyCache() itself? Right now, setting the eviction policy is done separately in two places, in `resizeCacheSize` and in `initEmptyCache`. Alternatively, we can call `initEmptyCache` from both these places. It's just that in the case of `resizeCacheSize`, we will reset the maxSize of the cache to the new value. ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -716,4 +710,8 @@ public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX; } +public static String remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { Review Comment: please add a comment: `// visible for testing` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1349931016 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -554,6 +554,115 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + Files.walk(cache.cacheDir()) +.filter(Files.isRegularFile(_)) +.filter(path => path.getFileName.toString.endsWith(suffix)) +.findAny() +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache limit to 1 to ensure that all are evicted. Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
hudeqi commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1349931417 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -554,6 +554,115 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + Files.walk(cache.cacheDir()) +.filter(Files.isRegularFile(_)) +.filter(path => path.getFileName.toString.endsWith(suffix)) +.findAny() +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache limit to 1 to ensure that all are evicted. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val cacheEntrys = new mutable.HashMap[Uuid, RemoteIndexCache.Entry]() +cacheEntrys.put(metadataList(0).remoteLogSegmentId().id(), cache.getIndexEntry(metadataList(0))) +cacheEntrys.put(metadataList(1).remoteLogSegmentId().id(), cache.getIndexEntry(metadataList(1))) +cacheEntrys.put(metadataList(2).remoteLogSegmentId().id(), cache.getIndexEntry(metadataList(2))) + +assertCacheSize(2) +val missingMetadataOpt = { + metadataList.find(segmentMetadata => { +val segmentId = segmentMetadata.remoteLogSegmentId().id() +!cache.internalCache.asMap().containsKey(segmentId) + }) +} +assertFalse(missingMetadataOpt.isEmpty) +val missingEntry = cacheEntrys.find(entry => entry._1.equals(missingMetadataOpt.get.remoteLogSegmentId().id())).get._2 Review Comment: Great! updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15536:Dynamically resize remoteIndexCache [kafka]
kamalcph commented on code in PR #14511: URL: https://github.com/apache/kafka/pull/14511#discussion_r1349882603 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -554,6 +554,115 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + Files.walk(cache.cacheDir()) +.filter(Files.isRegularFile(_)) +.filter(path => path.getFileName.toString.endsWith(suffix)) +.findAny() +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent) + +// Reduce the cache limit to 1 to ensure that all are evicted. +cache.resizeCacheSize(1L) + +// wait until entry is marked for deletion +TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, + "Failed to cleanup cache entry after resizing cache.") + +// verify no index files on remote cache dir +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${cache.cacheDir()}") +TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") + +assertTrue(cache.internalCache().estimatedSize() == 0) + +// Increase cache capacity to only store 2 entries +cache.resizeCacheSize(2 * estimateEntryBytesSize) +assertCacheSize(0) + +val cacheEntrys = new mutable.HashMap[Uuid, RemoteIndexCache.Entry]() +cacheEntrys.put(metadataList(0).remoteLogSegmentId().id(), cache.getIndexEntry(metadataList(0))) +cacheEntrys.put(metadataList(1).remoteLogSegmentId().id(), cache.getIndexEntry(metadataList(1))) +cacheEntrys.put(metadataList(2).remoteLogSegmentId().id(), cache.getIndexEntry(metadataList(2))) + +assertCacheSize(2) +val missingMetadataOpt = { + metadataList.find(segmentMetadata => { +val segmentId = segmentMetadata.remoteLogSegmentId().id() +!cache.internalCache.asMap().containsKey(segmentId) + }) +} +assertFalse(missingMetadataOpt.isEmpty) +val missingEntry = cacheEntrys.find(entry => entry._1.equals(missingMetadataOpt.get.remoteLogSegmentId().id())).get._2 Review Comment: Since this is a LRU cache, can we directly check whether the first entry metadaList(0).remoteLogSegmentId().id() is evicted? ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -554,6 +554,115 @@ class RemoteIndexCacheTest { assertTrue(cache.internalCache().estimatedSize() == 0) } + @Test + def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { + +def getIndexFileFromRemoteCacheDir(suffix: String) = { + Files.walk(cache.cacheDir()) +.filter(Files.isRegularFile(_)) +.filter(path => path.getFileName.toString.endsWith(suffix)) +.findAny() +} + +// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entrys with limited capacity of 2 entrys -> evict to 1 entry +val estimateEntryBytesSize = estimateOneEntryBytesSize() +val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) +val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) + +assertCacheSize(0) +// getIndex for first time will call rsm#fetchIndex +val cacheEntry = cache.getIndexEntry(metadataList.head) +assertCacheSize(1) +