kamalcph commented on code in PR #15820: URL: https://github.com/apache/kafka/pull/15820#discussion_r1632218079
########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -2730,6 +2742,206 @@ public void testEpochEntriesAsByteBuffer() throws Exception { } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCopyQuota(boolean quotaExceeded) throws Exception { + RemoteLogManager.RLMTask task = setupRLMTask(quotaExceeded); + + if (quotaExceeded) { + // Verify that the copy operation times out, since no segments can be copied due to quota being exceeded + assertThrows(AssertionFailedError.class, () -> assertTimeoutPreemptively(Duration.ofMillis(200), () -> task.copyLogSegmentsToRemote(mockLog))); + + // Verify the highest offset in remote storage is updated only once + ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(capture.capture()); + // Verify the highest offset in remote storage was -1L before the copy started + assertEquals(-1L, capture.getValue()); + } else { + // Verify the copy operation completes within the timeout, since it does not need to wait for quota availability + assertTimeoutPreemptively(Duration.ofMillis(100), () -> task.copyLogSegmentsToRemote(mockLog)); + + // Verify quota check was performed + verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded(); + // Verify bytes to copy was recorded with the quota manager + verify(rlmCopyQuotaManager, times(1)).record(10); + + // Verify the highest offset in remote storage is updated + ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class); + verify(mockLog, times(2)).updateHighestOffsetInRemoteStorage(capture.capture()); + List<Long> capturedValues = capture.getAllValues(); + // Verify the highest offset in remote storage was -1L before the copy + assertEquals(-1L, capturedValues.get(0).longValue()); + // Verify it was updated to 149L after the copy + assertEquals(149L, capturedValues.get(1).longValue()); + } + } + + @Test + public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { + RemoteLogManager.RLMTask task = setupRLMTask(true); + + Thread t = new Thread(task); + t.start(); + // Sleep for a while to allow the task to start and quota check to be performed once + Thread.sleep(100); Review Comment: `sleep` can lead to flaky tests in CI and the shutdown/close behavior is not tested E2E. Can we rewrite the test? ``` @Test public void testRLMTaskShutdownDuringQuotaExceededScenario() throws Exception { remoteLogManager.startup(); setupRLMTask(true); remoteLogManager.onLeadershipChange( Collections.singleton(mockPartition(leaderTopicIdPartition)), Collections.emptySet(), topicIds); TestUtils.waitForCondition(() -> { verify(rlmCopyQuotaManager, atLeast(1)).isQuotaExceeded(); return true; }, "Quota exceeded check did not happen"); assertTimeoutPreemptively(Duration.ofMillis(100), () -> remoteLogManager.close()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org