apoorvmittal10 commented on code in PR #18804:
URL: https://github.com/apache/kafka/pull/18804#discussion_r1946811181
##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -449,4 +468,202 @@ tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE,
0L, 0L,
Mockito.verify(exceptionHandler, times(1)).accept(new
SharePartitionKey("grp", tp0), exception);
Mockito.verify(sp0, times(0)).updateCacheAndOffsets(any(Long.class));
}
+
+ @Test
+ public void testMaybeSliceFetchRecordsSingleBatch() throws IOException {
+ // Create 1 batch of records with 10 records.
+ FileRecords records = createFileRecords(Map.of(5L, 10));
+
+ // Acquire all offsets, should return all batches.
+ List<AcquiredRecords> acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(5).setLastOffset(14).setDeliveryCount((short)
1));
+ Records slicedRecords =
ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 10, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire offsets out of first offset bound should return the records
for the matching batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(2).setLastOffset(14).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 10, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire offsets out of last offset bound should return the records
for the matching batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(5).setLastOffset(20).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only subset of batch offsets, starting from the first
offset.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only subset of batch offsets, ending at the last offset.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(8).setLastOffset(14).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only subset of batch offsets, in between the batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(8).setLastOffset(10).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire subsets out of base offset bounds should return empty
records.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(2).setLastOffset(4).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(0, slicedRecords.sizeInBytes());
+
+ // Acquire subsets out of last offset bounds should return empty
records.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(15).setLastOffset(18).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertEquals(0, slicedRecords.sizeInBytes());
+ }
+
+ @Test
+ public void testMaybeSliceFetchRecordsMultipleBatches() throws IOException
{
+ // Create 3 batches of records with 3, 2 and 4 records respectively.
+ LinkedHashMap<Long, Integer> offsetValues = new LinkedHashMap<>();
+ offsetValues.put(0L, 3);
+ offsetValues.put(3L, 2);
+ offsetValues.put(7L, 4); // Gap of 2 offsets between batches.
+ FileRecords records = createFileRecords(offsetValues);
+
+ // Acquire all offsets, should return all batches.
+ List<AcquiredRecords> acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(0).setLastOffset(10).setDeliveryCount((short)
1));
+ Records slicedRecords =
ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 11, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire offsets from all batches, but only first record from last
batch. Should return
+ // all batches.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+ // Acquire only first batch offsets, should return only first batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ List<RecordBatch> recordBatches =
TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(0, recordBatches.get(0).baseOffset());
+ assertEquals(2, recordBatches.get(0).lastOffset());
+
+ // Acquire only second batch offsets, should return only second batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 5, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(3, recordBatches.get(0).baseOffset());
+ assertEquals(4, recordBatches.get(0).lastOffset());
+
+ // Acquire only last batch offsets, should return only last batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(7).setLastOffset(10).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(7, recordBatches.get(0).baseOffset());
+ assertEquals(10, recordBatches.get(0).lastOffset());
+
+ // Acquire only subset of first batch offsets, should return only
first batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(0, recordBatches.get(0).baseOffset());
+ assertEquals(2, recordBatches.get(0).lastOffset());
+
+ // Acquire only subset of second batch offsets, should return only
second batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(3, recordBatches.get(0).baseOffset());
+ assertEquals(4, recordBatches.get(0).lastOffset());
+
+ // Acquire only subset of last batch offsets, should return only last
batch.
+ acquiredRecords = List.of(new
AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short)
1));
+ slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new
ShareAcquiredRecords(acquiredRecords, 1, true));
+ assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+ recordBatches = TestUtils.toList(slicedRecords.batches());
+ assertEquals(1, recordBatches.size());
+ assertEquals(7, recordBatches.get(0).baseOffset());
+ assertEquals(10, recordBatches.get(0).lastOffset());
+
+ // Acquire including gaps between batches, should return 2 batches.
Review Comment:
Gap is at offset 5 and 6 hence the check just validates that there occurs no
issue when acquired near gap boundaries.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]