divijvaidya commented on code in PR #14311: URL: https://github.com/apache/kafka/pull/14311#discussion_r1312736254
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1330,7 +1330,7 @@ private Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegm : Optional.empty(); } - private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + protected RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { Review Comment: default access? Also, in Kafka code base we add a `// Visible for testing` comment whenever we change access due to tests ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + protected int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + protected RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty()); Review Comment: Please first assert that `fetchDataInfo.abortedTransactions` is present otherwise we will get a NullPointerException here instead of a properly failing test. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + protected int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + protected RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset); Review Comment: please assert that firstEntryIncomplete is false as well. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false Review Comment: please move "fetchOffset" into a var and then use the same var in assertion. It helps readibility for the test since it is easy to make out that we are expecting the same value of fetchOffset that we initially provided to read() function. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { Review Comment: the test name should signify what specific scenario are we testing. In this case we are testing that if we don't find any batch in the input stream, then we should return an empty result. Perhaps rename this to "testReadForMissingFirstBatchInRemote" ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + protected int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + protected RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; Review Comment: please add a comment here that this is the key scenario that we are testing here. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + protected int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + protected RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty()); + } + } + + @Test + public void testReadPassesWithEmptyRecordsWhenMinOneMessageIsFalse() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.HIGH_WATERMARK, false + ); + + int recordBatchSizeInBytes = 10; Review Comment: please mention here that that this is greater than `fetchMaxBytes`. Even better, create a variable for `fetchMaxBytes` and set this var as `fetchMaxBytes` + 1 which will make it explicit that this is greater. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1361,6 +1371,188 @@ private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset)); } + @Test + public void testReadPasses() throws RemoteStorageException, IOException { + FileInputStream fileInputStream = mock(FileInputStream.class); + ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class); + RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class); + LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); + when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); + + when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) + .thenAnswer(a -> fileInputStream); + when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache)); + + FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData( + Uuid.randomUuid(), 0, 0, 0, Optional.empty() + ); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo( + 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED, false + ); + + try (RemoteLogManager remoteLogManager = new RemoteLogManager( + remoteLogManagerConfig, + brokerId, + logDir, + clusterId, + time, + tp -> Optional.of(mockLog), + (topicPartition, offset) -> { }, + brokerTopicStats) { + public RemoteStorageManager createRemoteStorageManager() { + return rsmManager; + } + public RemoteLogMetadataManager createRemoteLogMetadataManager() { + return remoteLogMetadataManager; + } + + public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPartition topicPartition, + int epochForOffset, long offset) { + return Optional.of(segmentMetadata); + } + + protected int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { + return 1; + } + + protected RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException { + return null; + } + }) { + FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo); + assertEquals(0, fetchDataInfo.fetchOffsetMetadata.messageOffset); + assertEquals(MemoryRecords.EMPTY, fetchDataInfo.records); + assertTrue(fetchDataInfo.abortedTransactions.get().isEmpty()); + } + } + + @Test + public void testReadPassesWithEmptyRecordsWhenMinOneMessageIsFalse() throws RemoteStorageException, IOException { Review Comment: I would probably rephrase this as: `testReadForFirstBatchMoreThanMaxFetchBytes` and then parameterize it using `@ParameterizedTest` with minOneMessage and one without. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org