divijvaidya commented on code in PR #14311:
URL: https://github.com/apache/kafka/pull/14311#discussion_r1312736254


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1330,7 +1330,7 @@ private Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegm
                 : Optional.empty();
     }
 
-    private RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+    protected RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {

Review Comment:
   default access? Also, in Kafka code base we add a `// Visible for testing` 
comment whenever we change access due to tests



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false
+        );
+
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(
+                remoteLogManagerConfig,
+                brokerId,
+                logDir,
+                clusterId,
+                time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return rsmManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
+                                                                               
     int epochForOffset, long offset) {
+                return Optional.of(segmentMetadata);
+            }
+
+            protected int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 1;
+            }
+
+            protected RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+                return null;
+            }
+        }) {
+            FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
+            assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+            assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records);
+            assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty());

Review Comment:
   Please first assert that `fetchDataInfo.abortedTransactions` is present 
otherwise we will get a NullPointerException here instead of a properly failing 
test.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false
+        );
+
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(
+                remoteLogManagerConfig,
+                brokerId,
+                logDir,
+                clusterId,
+                time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return rsmManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
+                                                                               
     int epochForOffset, long offset) {
+                return Optional.of(segmentMetadata);
+            }
+
+            protected int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 1;
+            }
+
+            protected RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+                return null;
+            }
+        }) {
+            FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
+            assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset);

Review Comment:
   please assert that firstEntryIncomplete is false as well.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false

Review Comment:
   please move "fetchOffset" into a var and then use the same var in assertion. 
It helps readibility for the test since it is easy to make out that we are 
expecting the same value of fetchOffset that we initially provided to read() 
function.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {

Review Comment:
   the test name should signify what specific scenario are we testing. In this 
case we are testing that if we don't find any batch in the input stream, then 
we should return an empty result.
   
   Perhaps rename this to "testReadForMissingFirstBatchInRemote"



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false
+        );
+
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(
+                remoteLogManagerConfig,
+                brokerId,
+                logDir,
+                clusterId,
+                time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return rsmManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
+                                                                               
     int epochForOffset, long offset) {
+                return Optional.of(segmentMetadata);
+            }
+
+            protected int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 1;
+            }
+
+            protected RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+                return null;

Review Comment:
   please add a comment here that this is the key scenario that we are testing 
here.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false
+        );
+
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(
+                remoteLogManagerConfig,
+                brokerId,
+                logDir,
+                clusterId,
+                time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return rsmManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
+                                                                               
     int epochForOffset, long offset) {
+                return Optional.of(segmentMetadata);
+            }
+
+            protected int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 1;
+            }
+
+            protected RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+                return null;
+            }
+        }) {
+            FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
+            assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+            assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records);
+            assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty());
+        }
+    }
+
+    @Test
+    public void testReadPassesWithEmptyRecordsWhenMinOneMessageIsFalse() 
throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.HIGH_WATERMARK, 
false
+        );
+
+        int recordBatchSizeInBytes = 10;

Review Comment:
   please mention here that that this is greater than `fetchMaxBytes`. Even 
better, create a variable for `fetchMaxBytes` and set this var as 
`fetchMaxBytes` + 1 which will make it explicit that this is greater.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1361,6 +1371,188 @@ private Map<Integer, Long> 
truncateAndGetLeaderEpochs(List<EpochEntry> entries,
         return myCheckpoint.read().stream().collect(Collectors.toMap(e -> 
e.epoch, e -> e.startOffset));
     }
 
+    @Test
+    public void testReadPasses() throws RemoteStorageException, IOException {
+        FileInputStream fileInputStream = mock(FileInputStream.class);
+        ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
+        RemoteLogSegmentMetadata segmentMetadata = 
mock(RemoteLogSegmentMetadata.class);
+        LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
+        when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));
+
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenAnswer(a -> fileInputStream);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), 0, 0, 0, Optional.empty()
+        );
+
+        RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
+                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, 
false
+        );
+
+        try (RemoteLogManager remoteLogManager = new RemoteLogManager(
+                remoteLogManagerConfig,
+                brokerId,
+                logDir,
+                clusterId,
+                time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> { },
+                brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return rsmManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+
+            public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPartition topicPartition,
+                                                                               
     int epochForOffset, long offset) {
+                return Optional.of(segmentMetadata);
+            }
+
+            protected int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 1;
+            }
+
+            protected RecordBatch findFirstBatch(RemoteLogInputStream 
remoteLogInputStream, long offset) throws IOException {
+                return null;
+            }
+        }) {
+            FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
+            assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+            assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records);
+            assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty());
+        }
+    }
+
+    @Test
+    public void testReadPassesWithEmptyRecordsWhenMinOneMessageIsFalse() 
throws RemoteStorageException, IOException {

Review Comment:
   I would probably rephrase this as:
   
   `testReadForFirstBatchMoreThanMaxFetchBytes`
   
   and then parameterize it using `@ParameterizedTest` with minOneMessage and 
one without.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to