apoorvmittal10 commented on code in PR #18804:
URL: https://github.com/apache/kafka/pull/18804#discussion_r1946811181


##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -449,4 +468,202 @@ tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 
0L, 0L,
         Mockito.verify(exceptionHandler, times(1)).accept(new 
SharePartitionKey("grp", tp0), exception);
         Mockito.verify(sp0, times(0)).updateCacheAndOffsets(any(Long.class));
     }
+
+    @Test
+    public void testMaybeSliceFetchRecordsSingleBatch() throws IOException {
+        // Create 1 batch of records with 10 records.
+        FileRecords records = createFileRecords(Map.of(5L, 10));
+
+        // Acquire all offsets, should return all batches.
+        List<AcquiredRecords> acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(5).setLastOffset(14).setDeliveryCount((short) 
1));
+        Records slicedRecords = 
ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 10, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire offsets out of first offset bound should return the records 
for the matching batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(2).setLastOffset(14).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 10, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire offsets out of last offset bound should return the records 
for the matching batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(5).setLastOffset(20).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only subset of batch offsets, starting from the first 
offset.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only subset of batch offsets, ending at the last offset.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(8).setLastOffset(14).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only subset of batch offsets, in between the batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(8).setLastOffset(10).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire subsets out of base offset bounds should return empty 
records.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(2).setLastOffset(4).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(0, slicedRecords.sizeInBytes());
+
+        // Acquire subsets out of last offset bounds should return empty 
records.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(15).setLastOffset(18).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertEquals(0, slicedRecords.sizeInBytes());
+    }
+
+    @Test
+    public void testMaybeSliceFetchRecordsMultipleBatches() throws IOException 
{
+        // Create 3 batches of records with 3, 2 and 4 records respectively.
+        LinkedHashMap<Long, Integer> offsetValues = new LinkedHashMap<>();
+        offsetValues.put(0L, 3);
+        offsetValues.put(3L, 2);
+        offsetValues.put(7L, 4); // Gap of 2 offsets between batches.
+        FileRecords records = createFileRecords(offsetValues);
+
+        // Acquire all offsets, should return all batches.
+        List<AcquiredRecords> acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(0).setLastOffset(10).setDeliveryCount((short) 
1));
+        Records slicedRecords = 
ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 11, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire offsets from all batches, but only first record from last 
batch. Should return
+        // all batches.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertEquals(records.sizeInBytes(), slicedRecords.sizeInBytes());
+
+        // Acquire only first batch offsets, should return only first batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(0).setLastOffset(2).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        List<RecordBatch> recordBatches = 
TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(0, recordBatches.get(0).baseOffset());
+        assertEquals(2, recordBatches.get(0).lastOffset());
+
+        // Acquire only second batch offsets, should return only second batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(3).setLastOffset(4).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 5, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(3, recordBatches.get(0).baseOffset());
+        assertEquals(4, recordBatches.get(0).lastOffset());
+
+        // Acquire only last batch offsets, should return only last batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(7).setLastOffset(10).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(7, recordBatches.get(0).baseOffset());
+        assertEquals(10, recordBatches.get(0).lastOffset());
+
+        // Acquire only subset of first batch offsets, should return only 
first batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(0, recordBatches.get(0).baseOffset());
+        assertEquals(2, recordBatches.get(0).lastOffset());
+
+        // Acquire only subset of second batch offsets, should return only 
second batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(3, recordBatches.get(0).baseOffset());
+        assertEquals(4, recordBatches.get(0).lastOffset());
+
+        // Acquire only subset of last batch offsets, should return only last 
batch.
+        acquiredRecords = List.of(new 
AcquiredRecords().setFirstOffset(8).setLastOffset(8).setDeliveryCount((short) 
1));
+        slicedRecords = ShareFetchUtils.maybeSliceFetchRecords(records, new 
ShareAcquiredRecords(acquiredRecords, 1, true));
+        assertTrue(records.sizeInBytes() > slicedRecords.sizeInBytes());
+        recordBatches = TestUtils.toList(slicedRecords.batches());
+        assertEquals(1, recordBatches.size());
+        assertEquals(7, recordBatches.get(0).baseOffset());
+        assertEquals(10, recordBatches.get(0).lastOffset());
+
+        // Acquire including gaps between batches, should return 2 batches.

Review Comment:
   Gap is at offset 5 and 6 hence the check just validates that there occurs no 
issue when acquired near gap boundaries.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to