kamalcph commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1622181661


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2716,6 +2728,198 @@ public void testFetchQuotaManagerConfig() {
         assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testCopyQuota(boolean quotaExceeded) throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        File tempFile = TestUtils.tempFile();
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+
+        // Set up the segment that is eligible for copy
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        // set up the active segment
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        File tempDir = TestUtils.tempDirectory();
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(quotaExceeded);
+        doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(2);
+
+        if (quotaExceeded) {
+            // Verify that the copy operation times out, since no segments can 
be copied due to quota being exceeded
+            try {
+                assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+                fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+            } catch (AssertionFailedError e) {
+                // Fail the test if the operation completed within the timeout
+                if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) 
{
+                    fail(e.getMessage());
+                }
+            }
+
+            // Verify the highest offset in remote storage is updated only once
+            ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+            verify(mockLog, 
times(1)).updateHighestOffsetInRemoteStorage(capture.capture());
+            // Verify the highest offset in remote storage was -1L before the 
copy started
+            assertEquals(-1L, capture.getValue());
+        } else {
+            // Verify the copy operation completes within the timeout, since 
it does not need to wait for quota availability
+            assertTimeoutPreemptively(Duration.ofMillis(100), () -> 
task.copyLogSegmentsToRemote(mockLog));
+
+            // Verify quota check was performed
+            verify(rlmCopyQuotaManager, times(1)).isQuotaExceeded();
+            // Verify bytes to copy was recorded with the quota manager
+            verify(rlmCopyQuotaManager, times(1)).record(10);
+
+            // Verify the highest offset in remote storage is updated
+            ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);
+            verify(mockLog, 
times(2)).updateHighestOffsetInRemoteStorage(capture.capture());
+            List<Long> capturedValues = capture.getAllValues();
+            // Verify the highest offset in remote storage was -1L before the 
copy
+            assertEquals(-1L, capturedValues.get(0).longValue());
+            // Verify it was updated to 149L after the copy
+            assertEquals(149L, capturedValues.get(1).longValue());
+        }
+    }
+
+    @Test
+    public void testCopyThrottling() throws Exception {
+        long oldestSegmentStartOffset = 0L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create 3 log segments
+        LogSegment segmentToCopy = mock(LogSegment.class);
+        LogSegment segmentToThrottle = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        File tempFile = TestUtils.tempFile();
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+
+        // set up the segment that will be copied
+        when(segmentToCopy.log()).thenReturn(fileRecords);
+        when(segmentToCopy.baseOffset()).thenReturn(oldestSegmentStartOffset);
+        when(segmentToCopy.readNextOffset()).thenReturn(100L);
+
+        // set up the segment that will not be copied because of hitting quota
+        when(segmentToThrottle.log()).thenReturn(fileRecords);
+        when(segmentToThrottle.baseOffset()).thenReturn(100L);
+        when(segmentToThrottle.readNextOffset()).thenReturn(150L);
+
+        // set up the active segment
+        when(activeSegment.log()).thenReturn(fileRecords);
+        when(activeSegment.baseOffset()).thenReturn(150L);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldestSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(segmentToCopy,
 segmentToThrottle, activeSegment)));
+
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        File tempDir = TestUtils.tempDirectory();
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldestSegmentStartOffset, ""), oldestSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldestSegmentStartOffset, 
""), oldestSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldestSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldestSegmentStartOffset, txnFile);
+        when(segmentToCopy.timeIndex()).thenReturn(timeIdx);
+        when(segmentToCopy.offsetIndex()).thenReturn(idx);
+        when(segmentToCopy.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+        // After the first call, isQuotaExceeded should return true
+        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false, true);
+        doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(2);
+
+        // Verify that the copy operation times out, since the second segment 
cannot be copied due to quota being exceeded
+        try {
+            assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));
+            fail(EXPECTED_THE_OPERATION_TO_TIME_OUT);
+        } catch (AssertionFailedError e) {
+            // Fail the test if the operation completed within the timeout
+            if (e.getMessage().equals(EXPECTED_THE_OPERATION_TO_TIME_OUT)) {
+                fail(e.getMessage());
+            }
+        }

Review Comment:
   ```java
   assertThrows(AssertionFailedError.class, () -> 
assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog)));
   ```
   can we rewrite it to be concise?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to