Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd merged PR #15820: URL: https://github.com/apache/kafka/pull/15820 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2161862623 There are a few unrelated test failures, merging it to trunk and 3.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2161861876 Thanks @abhijeetk88 for addressing the review comments. LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1634689117 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: @abhijeetk88 , thanks for the response. I agree we can keep it as is, and just document it clearly in monitoring section: https://kafka.apache.org/documentation/#monitoring . We can improve it in the future if necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1633050128 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); +// Verify the highest offset in remote storage was -1L before the copy started +assertEquals(-1L, capture.getValue()); +} else { +// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability +assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + +// Verify quota check was performed +verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); +// Verify bytes to copy was recorded with the quota manager +verify(rlmCopyQuotaManager, times(1)).record(10); + +// Verify the highest offset in remote storage is updated +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); +List capturedValues = capture.getAllValues(); +// Verify the highest offset in remote storage was -1L before the copy +assertEquals(-1L, capturedValues.get(0).longValue()); +// Verify it was updated to 149L after the copy +assertEquals(149L, capturedValues.get(1).longValue()); +} +} + +@Test +public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(true); + +Thread t = new Thread(task); +t.start(); +// Sleep for a while to allow the task to start and quota check to be performed once +Thread.sleep(100); Review Comment: Changed the test as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); +// Verify the highest offset in remote storage was -1L before the copy started +assertEquals(-1L, capture.getValue()); +} else { +// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability +assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + +// Verify quota check was performed +verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); +// Verify bytes to copy was recorded with the quota manager +verify(rlmCopyQuotaManager, times(1)).record(10); + +// Verify the highest offset in remote storage is updated +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); +List capturedValues = capture.getAllValues(); +// Verify the highest offset in remote storage was -1L before the copy +assertEquals(-1L, capturedValues.get(0).longValue()); +// Verify it was updated to 149L after the copy +assertEquals(149L, capturedValues.get(1).longValue()); +} +} + +@Test +public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(true); + +Thread t = new Thread(task); +t.start(); +// Sleep for a while to allow the task to start and quota check to be performed once +Thread.sleep(100); Review Comment: `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not tested E2E. Can we rewrite the test? ``` @Test public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { remoteLogManager.startup(); setupRLMTask(true); remoteLogManager.onLeadershipChange( Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); TestUtils.waitForCondition(() -> { verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); return true; }, "Quota exceeded check did not happen"); assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); +// Verify the highest offset in remote storage was -1L before the copy started +assertEquals(-1L, capture.getValue()); +} else { +// Verify the copy operation completes within the timeout, since it does not need to wait for quota availability +assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + +// Verify quota check was performed +verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); +// Verify bytes to copy was recorded with the quota manager +verify(rlmCopyQuotaManager, times(1)).record(10); + +// Verify the highest offset in remote storage is updated +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); +List capturedValues = capture.getAllValues(); +// Verify the highest offset in remote storage was -1L before the copy +assertEquals(-1L, capturedValues.get(0).longValue()); +// Verify it was updated to 149L after the copy +assertEquals(149L, capturedValues.get(1).longValue()); +} +} + +@Test +public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { +RemoteLogManager.RLMTask task = setupRLMTask(true); + +Thread t = new Thread(task); +t.start(); +// Sleep for a while to allow the task to start and quota check to be performed once +Thread.sleep(100); Review Comment: `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not tested E2E. Can we rewrite the test? Also, we have to give some buffer time in the close timeout (quotaTimeout + 50 ms) to avoid test flakiness. ``` @Test public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { remoteLogManager.startup(); setupRLMTask(true); remoteLogManager.onLeadershipChange( Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); TestUtils.waitForCondition(() -> { verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); return true; }, "Quota exceeded check did not happen"); assertTimeoutPreemptively(Duration.ofMillis(150), () -> remoteLogManager.close()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1632125146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +// If the thread gets interrupted while waiting, the InterruptedException is thrown +// back to the caller. It's important to note that the task being executed is already +// cancelled before the executing thread is interrupted. The caller is responsible +// for handling the exception gracefully by checking if the task is already cancelled. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2156212270 @kamalcph I have addressed your comments, please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2151797298 Hi @showuon. I have responded to your comment [here](https://github.com/apache/kafka/pull/15820#discussion_r1625465435). Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1628876545 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +// If the thread gets interrupted while waiting, the InterruptedException is thrown +// back to the caller. It's important to note that the task being executed is already +// cancelled before the executing thread is interrupted. The caller is responsible +// for handling the exception gracefully by checking if the task is already cancelled. Review Comment: Will add. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1628819130 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: I agree that the theadpool will look busy when the threads are waiting for the quota to be available. However, we will also have another metric for throttling which can be referred to verify if the copy operations are getting throttled. If we see **threadpool busy and copy being throttled -> copy quota is fully utilized and there is nothing to do** However, if **threadpool is busy and no throttles are happening -> there are not enough threads in the pool and threadpool size needs to be increased**. Skipping the current run (as Kamal pointed out) will result in ineffective use of the quota because the next run of the task happens after 30 sec. Also, this may cause starvation for some topic partitions. There is no fairness. For eg. the copy for one topic partition gets skipped, but for another one is picked immediately if the quota is available. These problems will continue to exist if we schedule the next run with 1 sec delay. It is also not simple to schedule a new RLMTask with a 1-sec delay (if we skip the current run). The schedule of the tasks is already fixed because we are using a ScheduledTheadPoolExecutor with _scheduleWithFixedDelay_ to schedule the tasks. If we need to make the above change, the task needs to own its own scheduling. i.e after each run, it schedules its next run. This will be a larger change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2151343142 @abhijeetk88 , any update to this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625465435 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: Good point! [KAFKA-16887](https://issues.apache.org/jira/browse/KAFKA-16887) is created for this issue. I'm also thinking, maybe we could still skip this copy when quota exceeds, and waiting for next RLMTask run. And if it's throttled, we can scheduleOnce another task in 1 second delay. That will mitigate the issue @kamalcph mentioned above, also get the same result. WDYT? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: Good point! [KAFKA-16887](https://issues.apache.org/jira/browse/KAFKA-16887) is created for this issue. I'm also thinking, maybe we could still skip this copy when quota exceeds, and waiting for next RLMTask run. And if it's throttled, we can scheduleOnce another RLMtask in 1 second delay. That will mitigate the issue @kamalcph mentioned above, also get the same result. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625465435 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: Good point! [KAFKA-16887](https://issues.apache.org/jira/browse/KAFKA-16887) is created for this issue. I'm also thinking, maybe we could still skip this copy when quota exceeds, and waiting for next RLMTask run. And if it's throttled, we can schedule another task in 1 second delay. That will mitigate the issue @kamalcph mentioned above, also get the same result. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: One drawback of the current approach is that the thread-pool will be shown as all "busy" in the metrics when the threads are waiting for the quota to be available. It can raise false alarms if user configures an 100% thread-pool usage alert. We can mention about the behavior in the docs section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: One drawback of the current approach is that the thread-pool will be shown as all "busy" in the metrics. It can raise false alarms if user configures an alert on top of it. We can mention about the behavior in the docs section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625304008 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: @abhijeetk88 One drawback of the current approach is that the thread-pool will be shown as all "busy" in the metrics. Could you confirm whether the threads are "shown as" busy when they are in `await`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625290704 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +// If the thread gets interrupted while waiting, the InterruptedException is thrown +// back to the caller. It's important to note that the task being executed is already +// cancelled before the executing thread is interrupted. The caller is responsible +// for handling the exception gracefully by checking if the task is already cancelled. Review Comment: Could you cover shutdown when the quota gets breached as unit test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1625271623 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemot
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1624109218 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: j
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1624108812 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +try { + copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); +} catch (InterruptedException e) { Review Comment: Addressed this. Also added comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1624108252 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1624107462 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -239,6 +244,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } +/** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ +Duration quotaTimeout() { +return Duration.ofSeconds(1); Review Comment: I want to override this method to have a smaller timeout for tests. That is the reason I did not use a constant value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1623765428 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -239,6 +244,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } +/** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ +Duration quotaTimeout() { +return Duration.ofSeconds(1); Review Comment: Can you use the declared constant value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622486629 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} Review Comment: ok. Got it now. Will simplify this. Thanks for the suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622440767 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInR
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622434675 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); Review Comment: Will change it here and in the other test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622418590 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInR
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622418254 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} Review Comment: The suggested approach will not work. The assertTimeoutPreemptively method throws an assertion failed error if the executable does not complete within the timeout. I want to verify the
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622404030 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +try { + copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); +} catch (InterruptedException e) { Review Comment: You are right. We should not be swallowing the exception. Let me change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622404030 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +try { + copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); +} catch (InterruptedException e) { Review Comment: Thanks for catching this. You are right. We should not be swallowing the exception, as it may prevent application shutdown. Let me change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622254618 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: Fair enough. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622187413 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: > we could skip this copy when quota exceeds, and waiting for next RLMTask run? RLMTask schedule interval is configurable and defaults to 30 seconds. The next run will happen after 30 seconds, the quota might be available before the timeout. If we go with the suggested approach, then we may not be utilizing the quota effectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622187413 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: > we could skip this copy when quota exceeds, and waiting for next RLMTask run? RLMTask schedule interval is configurable and defaults to 30 seconds. The next run will happen after 30 seconds, the quota might be available before the next run. If we go with the suggested approach, then we may not be utilizing the quota effectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622181661 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} + +// Verify the highest offset in remote storage is updated only once +ArgumentCaptor capture = ArgumentCaptor.forClass(Long.class); +verify(mockLog, times(1)).updateHighestOffsetInRemo
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622178816 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); +fail(EXPECTED_THE_OPERATION_TO_TIME_OUT); +} catch (AssertionFailedError e) { +// Fail the test if the operation completed within the timeout +if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) { +fail(e.getMessage()); +} +} Review Comment: ```java assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog))); ``` can we rewrite it to be concise?
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622159510 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -750,6 +762,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +try { + copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); +} catch (InterruptedException e) { Review Comment: We are catching the InterruptedException: 1. does it delays the shutdown? If yes, do we have to add a check whether the task is cancelled before copying the log segment? 2. Why the logger level is set to WARN? can it be reduced to INFO? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622134355 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: @abhijeetk88 @kamalcph , I'm thinking if we could skip this copy when quota exceeds, and waiting for next RLMTask run? In that way, the implementation will be much simiplier without lock and wait. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1622091326 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() { assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); } +@ParameterizedTest +@ValueSource(booleans = {false, true}) +public void testCopyQuota(boolean quotaExceeded) throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that is eligible for copy +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStateManager()).thenReturn(mockStateManager); +when(mockLog.lastStableOffset()).thenReturn(250L); + +File tempDir = TestUtils.tempDirectory(); +OffsetIndex idx = LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get(); +TimeIndex timeIdx = LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500).get(); +File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, ""); +txnFile.createNewFile(); +TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile); +when(oldSegment.timeIndex()).thenReturn(timeIdx); +when(oldSegment.offsetIndex()).thenReturn(idx); +when(oldSegment.txnIndex()).thenReturn(txnIndex); + +CompletableFuture dummyFuture = new CompletableFuture<>(); +dummyFuture.complete(null); + when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture); + when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture); + when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class))).thenReturn(Optional.empty()); + +when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded); +doNothing().when(rlmCopyQuotaManager).record(anyInt()); + +RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128); +task.convertToLeader(2); + +if (quotaExceeded) { +// Verify that the copy operation times out, since no segments can be copied due to quota being exceeded +try { +assertTimeoutPreemptively(Duration.ofSeconds(1), () -> task.copyLogSegmentsToRemote(mockLog)); Review Comment: I still see this is 1 second. I think we can add some buffer to give it 2x waiting time is enough, which is 200ms. Same as below. Otherwise, we need to run these tests at least 1 second. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
satishd commented on PR #15820: URL: https://github.com/apache/kafka/pull/15820#issuecomment-2141069773 @abhijeetk88 Can you resolve the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1619996598 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1619996273 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1618199858 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1618195673 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1618195673 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1618191025 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producer
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1618190777 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable { private final RemoteLogMetadataManager remoteLogMetadataManager; +private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); Review Comment: We want the threads that were blocked first to be unblocked first. This is to prevent the starvation of some threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
abhijeetk88 commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1618189739 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: Yes, this was planned as part of KIP-950 implementation. We will have a separate change for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1617086269 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -237,6 +242,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } +/** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ +Duration quotaTimeout() { Review Comment: shall we rename `quotaTimeout` to `throttleTimeMs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1617519570 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: We can this up in [KAFKA-16853](https://issues.apache.org/jira/browse/KAFKA-16853) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1617105411 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -738,6 +750,23 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { Review Comment: If the copy quota gets breached, all the `kafka-rlm-thread-pool` threads will wait for the quota to be available which might delay the deletion of remote log segments since the same thread does both copy and delete. Do you plan to split the copy/delete operations in a separate thread pool? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -237,6 +242,13 @@ private void removeMetrics() { remoteStorageReaderThreadPool.removeMetrics(); } +/** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ +Duration quotaTimeout() { Review Comment: shall we rename `quotaTimeout` to `throttleTimeMs`? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -153,6 +156,8 @@ public class RemoteLogManager implements Closeable { private final RemoteLogMetadataManager remoteLogMetadataManager; +private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); Review Comment: Why `fairness` is turned on to true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage [kafka]
showuon commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1615786577 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l } } + +@Test +public void testCopyQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(61, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = RemoteLogManager.copyQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testFetchQuotaManagerConfig() { +Properties defaultProps = new Properties(); +RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); +RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); +assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond()); +assertEquals(11, defaultConfig.numQuotaSamples()); +assertEquals(1, defaultConfig.quotaWindowSizeSeconds()); + +Properties customProps = new Properties(); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP, 31); + customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP, 1); +RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps); +RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = RemoteLogManager.fetchQuotaManagerConfig(rlmConfig); +assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond()); +assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples()); +assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds()); +} + +@Test +public void testCopyQuotaNotExceeded() throws Exception { +long oldSegmentStartOffset = 0L; +long nextSegmentStartOffset = 150L; + + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + +// leader epoch preparation +checkpoint.write(totalEpochEntries); +LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint); +when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L)); + +// create 2 log segments, with 0 and 150 as log start offset +LogSegment oldSegment = mock(LogSegment.class); +LogSegment activeSegment = mock(LogSegment.class); + +File tempFile = TestUtils.tempFile(); +FileRecords fileRecords = mock(FileRecords.class); +when(fileRecords.file()).thenReturn(tempFile); +when(fileRecords.sizeInBytes()).thenReturn(10); + +// Set up the segment that will be copied +when(oldSegment.log()).thenReturn(fileRecords); +when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset); +when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset); + +// set up the active segment +when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset); + +when(mockLog.activeSegment()).thenReturn(activeSegment); +when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset); +when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment))); + +File mockProducerSnapshotIndex = TestUtils.tempFile(); +ProducerStateManager mockStateManager = mock(ProducerStateManager.class); + when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex)); + +when(mockLog.producerStat