clolov commented on code in PR #14349:
URL: https://github.com/apache/kafka/pull/14349#discussion_r1319891533


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1508,16 +1511,153 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
         }
     }
 
+    @Test
+    public void testDeleteRetentionSizeBreachingSegments() throws 
RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+        List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries);
+
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(remoteLogSegmentMetadatas.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenReturn(remoteLogSegmentMetadatas.iterator())
+                .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", 0L);
+        logProps.put("retention.ms", -1L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenAnswer(answer -> {
+                    // assert that log-start-offset has been moved accordingly
+                    // we skip the first entry as it is the local replica 
ensuring it has the correct log start offset
+                    assertEquals(200, 
events.get(1).get(leaderTopicIdPartition.topicPartition()));
+                    return CompletableFuture.runAsync(() -> { });
+                });
+
+        task.run();
+
+        
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+        
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+    }
+
+    @Test
+    public void testDeleteRetentionMsBreachingSegments() throws 
RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+        List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries);
+
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(remoteLogSegmentMetadatas.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenReturn(remoteLogSegmentMetadatas.iterator())
+                .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", -1L);
+        logProps.put("retention.ms", 0L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenAnswer(answer -> {
+                    // assert that log-start-offset has been moved accordingly
+                    // we skip the first entry as it is the local replica 
ensuring it has the correct log start offset
+                    assertEquals(200, 
events.get(1).get(leaderTopicIdPartition.topicPartition()));
+                    return CompletableFuture.runAsync(() -> { });
+                });
+
+        task.run();
+
+        
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
+        
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
+    }
+
+    @Test
+    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws 
RemoteStorageException {
+        RemoteLogManager.RLMTask task = remoteLogManager.new 
RLMTask(leaderTopicIdPartition, 128);
+        task.convertToLeader(0);
+
+        
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+        when(mockLog.logEndOffset()).thenReturn(200L);
+
+        List<EpochEntry> epochEntries = Collections.singletonList(epochEntry0);
+
+        List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = 
listRemoteLogSegmentMetadata(leaderTopicIdPartition, 2, 100, 1024, 
epochEntries);
+
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))
+                .thenReturn(remoteLogSegmentMetadatas.iterator());
+        
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
+                .thenReturn(remoteLogSegmentMetadatas.iterator())
+                .thenReturn(remoteLogSegmentMetadatas.iterator());
+
+        checkpoint.write(epochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        Map<String, Long> logProps = new HashMap<>();
+        logProps.put("retention.bytes", -1L);
+        logProps.put("retention.ms", 0L);
+        LogConfig mockLogConfig = new LogConfig(logProps);
+        when(mockLog.config()).thenReturn(mockLogConfig);
+
+        
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
+                .thenAnswer(answer -> {
+                    // assert that log-start-offset has been moved accordingly
+                    // we skip the first entry as it is the local replica 
ensuring it has the correct log start offset
+                    assertEquals(200, 
events.get(1).get(leaderTopicIdPartition.topicPartition()));

Review Comment:
   I am happy to change it to this. The reason why I implemented it with events 
is that it allowed me to carry out the assertion before deletes were initiated. 
With your approach we assert that the log start offset has been updated, but we 
do not assert that it was updated before deletes were carried out. Would you 
still like me to change it to your proposal?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to