kamalcph commented on code in PR #13561:
URL: https://github.com/apache/kafka/pull/13561#discussion_r1293158752


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
         }
     }
 
+    private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map<Integer, 
Long> segmentEpochs) {
+        return new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+                        new TopicPartition("topic", 0)), Uuid.randomUuid()),
+                startOffset, endOffset,
+                100000L,
+                1,
+                100000L,
+                1000,
+                Optional.empty(),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+    }
+
+    @Test
+    public void testRemoteSegmentWithinLeaderEpochs() {
+        // Test whether a remote segment is within the leader epochs
+        final long logEndOffset = 90L;
+
+        TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<Integer, 
Long>() {{
+                put(0, 0L);
+                put(1, 10L);
+                put(2, 20L);
+                put(3, 30L);
+                put(4, 40L);
+                put(5, 50L);
+                put(7, 70L);
+            }};
+
+        // Test whether a remote segment's epochs/offsets(multiple) are within 
the range of leader epochs
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                35,
+                new TreeMap<Integer, Long>() {{
+                    put(1, 15L);
+                    put(2, 20L);
+                    put(3, 30L);
+                }}), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a remote segment's epochs/offsets(single) are within 
the range of leader epochs
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(
+                15,
+                19,
+                new TreeMap<Integer, Long>() {{
+                    put(1, 15L);
+                }}), logEndOffset, leaderEpochToStartOffset));
+
+        // Test whether a remote segment's epochs/offsets(single) are within 
the range of leader epochs
+        
assertTrue(RemoteLogManager.isRemoteSegmentWithinLeaderEpochs(createRemoteLogSegmentMetadata(

Review Comment:
   Statements in L1065 and L1057 are same. Typo error?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -698,11 +707,329 @@ public void run() {
             }
         }
 
+        public void handleLogStartOffsetUpdate(TopicPartition topicPartition, 
long remoteLogStartOffset) {
+            if (isLeader()) {
+                logger.debug("Updating {} with remoteLogStartOffset: {}", 
topicPartition, remoteLogStartOffset);
+                updateRemoteLogStartOffset.accept(topicPartition, 
remoteLogStartOffset);
+            }
+        }
+
+        class RemoteLogRetentionHandler {
+
+            private final Optional<RetentionSizeData> retentionSizeData;
+            private final Optional<RetentionTimeData> retentionTimeData;
+
+            private long remainingBreachedSize;
+
+            private OptionalLong logStartOffset = OptionalLong.empty();
+
+            public RemoteLogRetentionHandler(Optional<RetentionSizeData> 
retentionSizeData, Optional<RetentionTimeData> retentionTimeData) {

Review Comment:
   `Optional` is not recommended as parameter in Java:
   
   https://stackoverflow.com/a/31923105



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1003,6 +1015,134 @@ public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
         }
     }
 
+    private static RemoteLogSegmentMetadata 
createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map<Integer, 
Long> segmentEpochs) {
+        return new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(),
+                        new TopicPartition("topic", 0)), Uuid.randomUuid()),
+                startOffset, endOffset,
+                100000L,
+                1,
+                100000L,
+                1000,
+                Optional.empty(),
+                RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
+    }
+
+    @Test
+    public void testRemoteSegmentWithinLeaderEpochs() {
+        // Test whether a remote segment is within the leader epochs
+        final long logEndOffset = 90L;
+
+        TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<Integer, 
Long>() {{
+                put(0, 0L);
+                put(1, 10L);
+                put(2, 20L);
+                put(3, 30L);
+                put(4, 40L);
+                put(5, 50L);
+                put(7, 70L);
+            }};

Review Comment:
   nit:
   
   ```suggestion
           TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<>();
           leaderEpochToStartOffset.put(0, 0L);
           leaderEpochToStartOffset.put(1, 10L);
           leaderEpochToStartOffset.put(2, 20L);
           leaderEpochToStartOffset.put(3, 30L);
           leaderEpochToStartOffset.put(4, 40L);
           leaderEpochToStartOffset.put(5, 50L);
           leaderEpochToStartOffset.put(7, 70L);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to