adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031745262
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition
sharePartition, Map<Long, L
return errorMessage.toString();
}
+ @Test
+ public void testFilterRecordBatchesFromAcquiredRecords() {
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ List<AcquiredRecords> acquiredRecords1 = List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short)
1)
+ );
+ List<RecordBatch> recordBatches1 = List.of(
+ memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+ memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+ );
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short)
1)),
+
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1,
recordBatches1));
+
+ List<AcquiredRecords> acquiredRecords2 = List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short)
3)
+ );
+ List<RecordBatch> recordBatches2 = List.of(
+ memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+ memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+ );
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short)
3),
+ new
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short)
3)
+
+ ),
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2,
recordBatches2)
+ );
+
+ // Record batches is empty.
+ assertEquals(acquiredRecords2,
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2,
List.of()));
+
+ List<AcquiredRecords> acquiredRecords3 = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short)
1)
+ );
+ List<RecordBatch> recordBatches3 = List.of(
+ memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+ memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+ );
+
+ assertEquals(
+ List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short)
1)
+
+ ),
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3,
recordBatches3)
+ );
+ }
+
+ @Test
+ public void testAcquireWithReadCommittedIsolationLevel() {
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .build());
+
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 5, 10).close();
+ memoryRecordsBuilder(buffer, 5, 15).close();
+ memoryRecordsBuilder(buffer, 15, 20).close();
+ memoryRecordsBuilder(buffer, 8, 50).close();
+ memoryRecordsBuilder(buffer, 10, 58).close();
+ memoryRecordsBuilder(buffer, 5, 70).close();
+
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ FetchPartitionData fetchPartitionData = fetchPartitionData(records,
newAbortedTransactions());
+
+ // We are mocking the result of function
fetchAbortedTransactionRecordBatches. The records present at these offsets need
to be archived.
+ // We won't be utilizing the aborted transactions passed in
fetchPartitionData.
+
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
fetchPartitionData.abortedTransactions.get())).thenReturn(
+ List.of(
+ memoryRecordsBuilder(5,
10).build().batches().iterator().next(),
+ memoryRecordsBuilder(10,
58).build().batches().iterator().next(),
+ memoryRecordsBuilder(5, 70).build().batches().iterator().next()
+ )
+ );
+
+ List<AcquiredRecords> acquiredRecordsList = fetchAcquiredRecords(
+ sharePartition.acquire(
+ MEMBER_ID,
+ 10 /* Batch size */,
+ 100,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData,
+ FetchIsolation.TXN_COMMITTED),
+ 45 /* Gap of 15 records will be added to second batch, gap of 2
records will also be added to fourth batch */);
+
+ assertEquals(List.of(
+ new
AcquiredRecords().setFirstOffset(15).setLastOffset(19).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(20).setLastOffset(49).setDeliveryCount((short)
1),
Review Comment:
This is because there is a gap of records from 35-49 in the original records
to be acquired. This gap is added to this batch since we rely on the client to
inform the broker about these natural gaps in the partition log.
```
memoryRecordsBuilder(buffer, 5, 10).close(); // batch from 10-14
memoryRecordsBuilder(buffer, 5, 15).close(); // batch from 15-19
memoryRecordsBuilder(buffer, 15, 20).close(); // batch from 20-34
memoryRecordsBuilder(buffer, 8, 50).close(); // batch from 35-49
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]