abhijeetk88 commented on code in PR #15820:
URL: https://github.com/apache/kafka/pull/15820#discussion_r1618199858


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -2530,6 +2549,269 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
 
         }
     }
+    
+    @Test
+    public void testCopyQuotaManagerConfig() {
+        Properties defaultProps = new Properties();
+        RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+        RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.copyQuotaManagerConfig(defaultRlmConfig);
+        assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+        assertEquals(61, defaultConfig.numQuotaSamples());
+        assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+        Properties customProps = new Properties();
+        
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
 100);
+        
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP,
 31);
+        
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+        RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+        RLMQuotaManagerConfig rlmCopyQuotaManagerConfig = 
RemoteLogManager.copyQuotaManagerConfig(rlmConfig);
+        assertEquals(100L, rlmCopyQuotaManagerConfig.quotaBytesPerSecond());
+        assertEquals(31, rlmCopyQuotaManagerConfig.numQuotaSamples());
+        assertEquals(1, rlmCopyQuotaManagerConfig.quotaWindowSizeSeconds());
+    }
+    
+    @Test
+    public void testFetchQuotaManagerConfig() {
+        Properties defaultProps = new Properties();
+        RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
+        RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
+        assertEquals(Long.MAX_VALUE, defaultConfig.quotaBytesPerSecond());
+        assertEquals(11, defaultConfig.numQuotaSamples());
+        assertEquals(1, defaultConfig.quotaWindowSizeSeconds());
+
+        Properties customProps = new Properties();
+        
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);
+        
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP,
 31);
+        
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS_PROP,
 1);
+        RemoteLogManagerConfig rlmConfig = createRLMConfig(customProps);
+        RLMQuotaManagerConfig rlmFetchQuotaManagerConfig = 
RemoteLogManager.fetchQuotaManagerConfig(rlmConfig);
+        assertEquals(100L, rlmFetchQuotaManagerConfig.quotaBytesPerSecond());
+        assertEquals(31, rlmFetchQuotaManagerConfig.numQuotaSamples());
+        assertEquals(1, rlmFetchQuotaManagerConfig.quotaWindowSizeSeconds());
+    }
+
+    @Test
+    public void testCopyQuotaNotExceeded() throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new 
LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt())).thenReturn(Optional.of(0L));
+
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        File tempFile = TestUtils.tempFile();
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+
+        // Set up the segment that will be copied
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        // set up the active segment
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), 
anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment,
 activeSegment)));
+
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        ProducerStateManager mockStateManager = 
mock(ProducerStateManager.class);
+        
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        File tempDir = TestUtils.tempDirectory();
+        OffsetIndex idx = 
LazyIndex.forOffset(LogFileUtils.offsetIndexFile(tempDir, 
oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000).get();
+        TimeIndex timeIdx = 
LazyIndex.forTime(LogFileUtils.timeIndexFile(tempDir, oldSegmentStartOffset, 
""), oldSegmentStartOffset, 1500).get();
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, 
oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new 
TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.timeIndex()).thenReturn(timeIdx);
+        when(oldSegment.offsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        
when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
+        
when(remoteStorageManager.copyLogSegmentData(any(RemoteLogSegmentMetadata.class),
 any(LogSegmentData.class))).thenReturn(Optional.empty());
+
+        // QuotaManager returns false to indicate quota has not been exceeded
+        when(rlmCopyQuotaManager.isQuotaExceeded()).thenReturn(false);
+        doNothing().when(rlmCopyQuotaManager).record(anyInt());
+
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(2);
+        // Verify the copy operation completes within the timeout, since it 
does not need to wait for quota availability
+        assertTimeoutPreemptively(Duration.ofSeconds(1), () -> 
task.copyLogSegmentsToRemote(mockLog));

Review Comment:
   You are right. 100 ms should be enough since the quota has not been exceeded 
and it should immediately proceed to copy the segment and complete the copy. I 
will change this.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to