kirktrue commented on code in PR #19214:
URL: https://github.com/apache/kafka/pull/19214#discussion_r1999903645
##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -298,11 +298,28 @@ public int writeTo(TransferableChannel destChannel, int
offset, int length) thro
* @return the batch's base offset, its physical position, and its size
(including log overhead)
*/
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int
startingPosition) {
+ FileChannelRecordBatch prevBatch = null;
+
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
- long offset = batch.lastOffset();
- if (offset >= targetOffset)
- return new LogOffsetPosition(batch.baseOffset(),
batch.position(), batch.sizeInBytes());
+ // This indicates that either the current batch or the previous
batch
+ // contains the target we are looking for.
+ if (batch.baseOffset() >= targetOffset) {
Review Comment:
What about a mixture of both versions?
```java
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
// if baseOffset exactly equals targetOffset, return immediately
if (batch.baseOffset() == targetOffset) {
return LogOffsetPosition.fromBatch(batch);
}
// If we find the first batch with baseOffset greater than targetOffset
if (batch.baseOffset() > targetOffset) {
// Check if the previous batch contains the target
if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
return LogOffsetPosition.fromBatch(prevBatch);
else {
// If there's no previous batch or the previous batch doesn't
contain the
// target, return the current batch
return LogOffsetPosition.fromBatch(batch);
}
}
prevBatch = batch;
}
```
##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException
{
verify(channel).transferFrom(any(), anyLong(), eq((long) size -
firstWritten));
}
+ /**
+ * Test two condition
+ * 1. If the target offset equals to the base offset of the first batch
+ * 2. If the target offset < the base offset of the first batch
+ */
+ @ParameterizedTest
+ @ValueSource(longs = {5, 10})
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchMatch(long baseOffset)
throws IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch batch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch.baseOffset()).thenReturn(baseOffset);
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, batch);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(5L, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+ verify(batch, never()).lastOffset();
+ }
+
+ @Test
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchLastOffsetMatch()
throws IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch batch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch.baseOffset()).thenReturn(3L);
+ when(batch.lastOffset()).thenReturn(5L);
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, batch);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(5L, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+ // target is equals to the last offset of the batch, we should call
lastOffset
+ verify(batch, times(1)).lastOffset();
+ }
+
+ @Test
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountLastBatchLastOffsetMatch() throws
IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch prevBatch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(prevBatch.baseOffset()).thenReturn(5L);
+ when(prevBatch.lastOffset()).thenReturn(12L);
+ FileLogInputStream.FileChannelRecordBatch currentBatch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(currentBatch.baseOffset()).thenReturn(15L);
+ when(currentBatch.lastOffset()).thenReturn(20L);
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(20L, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(currentBatch),
result);
+ // Because the target offset is in the current batch, we should not
call lastOffset in the previous batch
+ verify(prevBatch, never()).lastOffset();
+ verify(currentBatch, times(1)).lastOffset();
+ }
+
+ @Test
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountPrevBatchMatches() throws
IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch prevBatch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(prevBatch.baseOffset()).thenReturn(5L);
+ when(prevBatch.lastOffset()).thenReturn(12L); // > targetOffset
+ FileLogInputStream.FileChannelRecordBatch currentBatch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(10L, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch),
result);
+ // Because the target offset is in the current batch, we should call
lastOffset
+ // on the previous batch
+ verify(prevBatch, times(1)).lastOffset();
+ }
+
+ @Test
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountAllBatchesSmallerLastBatchDoesntMatch()
throws IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch batch1 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch1.baseOffset()).thenReturn(5L); // < targetOffset
+ FileLogInputStream.FileChannelRecordBatch batch2 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch2.baseOffset()).thenReturn(8L); // < targetOffset
+ when(batch2.lastOffset()).thenReturn(9L); // < targetOffset
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, batch1, batch2);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(10L, 0);
+
+ assertNull(result);
+ // Because the target offset is exceeded by the last offset of the
batch2,
+ // we should call lastOffset on the batch2
+ verify(batch1, never()).lastOffset();
+ verify(batch2, times(1)).lastOffset();
+ }
+
+ /**
+ * Test two condition
+ * 1. If the target offset < the base offset of the last batch
+ * 2. If the target offset equals to the base offset of the last batch
+ */
+ @ParameterizedTest
+ @ValueSource(longs = {8, 10})
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountAllBatchesSmallerLastBatchMatches(long
baseOffset) throws IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch batch1 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch1.baseOffset()).thenReturn(5L); // < targetOffset
+ FileLogInputStream.FileChannelRecordBatch batch2 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset
or == targetOffset
+ when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, batch1, batch2);
+
+ long targetOffset = 10L;
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(targetOffset, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
+ if (targetOffset == baseOffset) {
+ // Because the target offset is equal to the base offset of the
batch2, we should not call
+ // lastOffset on batch2
+ verify(batch1, times(1)).lastOffset();
+ verify(batch2, never()).lastOffset();
+ } else {
+ // Because the target offset is in the batch2, we should not call
+ // lastOffset on batch1
+ verify(batch1, never()).lastOffset();
+ verify(batch2, times(1)).lastOffset();
+ }
+ }
+
+ @Test
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountTargetBetweenTwoBatches() throws
IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch batch1 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch1.baseOffset()).thenReturn(5L);
+ when(batch1.lastOffset()).thenReturn(10L);
+ FileLogInputStream.FileChannelRecordBatch batch2 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch2.baseOffset()).thenReturn(15L);
+ when(batch2.lastOffset()).thenReturn(20L);
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, batch1, batch2);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(13L, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
+ // Because the target offset is between the two batches, we should
call lastOffset on the batch1
+ verify(batch1, times(1)).lastOffset();
+ verify(batch2, never()).lastOffset();
+ }
+
+ private void mockFileRecordBatches(FileRecords fileRecords,
FileLogInputStream.FileChannelRecordBatch... batch) {
+ List<FileLogInputStream.FileChannelRecordBatch> batches = new
ArrayList<>(asList(batch));
Review Comment:
Out of curiosity, is there a reason not to use the list returned by
`asList()`?
##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException
{
verify(channel).transferFrom(any(), anyLong(), eq((long) size -
firstWritten));
}
+ /**
+ * Test two condition
+ * 1. If the target offset equals to the base offset of the first batch
+ * 2. If the target offset < the base offset of the first batch
+ */
+ @ParameterizedTest
+ @ValueSource(longs = {5, 10})
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchMatch(long baseOffset)
throws IOException {
+ File mockFile = mock(File.class);
+ FileChannel mockChannel = mock(FileChannel.class);
+ FileLogInputStream.FileChannelRecordBatch batch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
+ when(batch.baseOffset()).thenReturn(baseOffset);
+
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ mockFileRecordBatches(fileRecords, batch);
+
+ FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetWithSize(5L, 0);
+
+ assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+ verify(batch, never()).lastOffset();
+ }
+
+ @Test
+ public void
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchLastOffsetMatch()
throws IOException {
Review Comment:
I empathize with the challenge of coming up with meaningful, yet succinct
test method names. I can't get more than maybe two thirds of the way through
that method name before my brain shuts down. But I think that says more about
my brain than the method name 😆
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]