junrao commented on code in PR #19581:
URL: https://github.com/apache/kafka/pull/19581#discussion_r2076340154
##########
clients/src/main/java/org/apache/kafka/common/record/Records.java:
##########
@@ -90,4 +91,18 @@ public interface Records extends TransferableRecords {
* @return The record iterator
*/
Iterable<Record> records();
+
+ /**
+ * Return a slice of records from this instance, which is a view into this
set starting from the given position
+ * and with the given size limit.
+ *
+ * If the size is beyond the end of the records, the end will be based on
the size of the records at the time of the read.
+ *
+ * If this records set is already sliced, the position will be taken
relative to that slicing.
+ *
+ * @param position The start position to begin the read from
+ * @param size The number of bytes after the start position to include
+ * @return A sliced wrapper on this message set limited based on the given
position and size
+ */
+ Records slice(int position, int size) throws IOException;
Review Comment:
Thanks for he explanation. We can just keep what's in the PR then.
##########
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java:
##########
@@ -1068,6 +1073,146 @@ public void testUnsupportedCompress() {
});
}
+ @ParameterizedTest
+ @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+ public void testSlice(Args args) throws IOException {
+ // Create a MemoryRecords instance with multiple batches. Prior
RecordBatch.MAGIC_VALUE_V2,
+ // every append in a batch is a new batch. After
RecordBatch.MAGIC_VALUE_V2, we can have multiple
+ // batches in a single MemoryRecords instance. Though with
compression, we can have multiple
+ // appends resulting in a single batch prior
RecordBatch.MAGIC_VALUE_V2 as well.
+ LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
+ recordsPerOffset.put(args.firstOffset, 3);
+ recordsPerOffset.put(args.firstOffset + 6L, 8);
+ recordsPerOffset.put(args.firstOffset + 15L, 4);
+ MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
+
+ // Test slicing from start
+ Records sliced = records.slice(0, records.sizeInBytes());
+ assertEquals(records.sizeInBytes(), sliced.sizeInBytes());
+ assertEquals(records.validBytes(), ((MemoryRecords)
sliced).validBytes());
+ TestUtils.checkEquals(records.batches(), ((MemoryRecords)
sliced).batches());
+
+ List<RecordBatch> items = batches(records);
+ // Test slicing first message.
+ RecordBatch first = items.get(0);
+ sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() -
first.sizeInBytes());
+ assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
+ assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
+ assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+
+ // Read from second message and size is past the end of the file.
+ sliced = records.slice(first.sizeInBytes(), records.sizeInBytes());
+ assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
+ assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
+ assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+
+ // Read from second message and position + size overflows.
+ sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE);
+ assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
+ assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
+ assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+
+ // Read a single message starting from second message.
+ RecordBatch second = items.get(1);
+ sliced = records.slice(first.sizeInBytes(), second.sizeInBytes());
+ assertEquals(second.sizeInBytes(), sliced.sizeInBytes());
+ assertEquals(Collections.singletonList(second), batches(sliced), "Read
a single message starting from the second message");
+
+ // Read from already sliced view.
+ List<RecordBatch> remainingItems = IntStream.range(0,
items.size()).filter(i -> i != 0 && i !=
1).mapToObj(items::get).collect(Collectors.toList());
+ int remainingSize =
remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum();
+ sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() -
first.sizeInBytes())
+ .slice(second.sizeInBytes(), records.sizeInBytes() -
first.sizeInBytes() - second.sizeInBytes());
+ assertEquals(remainingSize, sliced.sizeInBytes());
+ assertEquals(remainingItems, batches(sliced), "Read starting from the
third message");
+
+ // Read from second message and size is past the end of the file on
the already sliced view.
+ sliced = records.slice(1, records.sizeInBytes() - 1)
+ .slice(first.sizeInBytes() - 1, records.sizeInBytes());
+ assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
+ assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
+ assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+
+ // Read from second message and position + size overflows on the
already sliced view.
+ sliced = records.slice(1, records.sizeInBytes() - 1)
+ .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE);
+ assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
+ assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
+ assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+ public void testSliceInvalidPosition(Args args) {
+ MemoryRecords records = createMemoryRecords(args,
Map.of(args.firstOffset, 1));
+ assertThrows(IllegalArgumentException.class, () -> records.slice(-1,
records.sizeInBytes()));
+ assertThrows(IllegalArgumentException.class, () ->
records.slice(records.sizeInBytes() + 1, records.sizeInBytes()));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+ public void testSliceInvalidSize(Args args) {
+ MemoryRecords records = createMemoryRecords(args,
Map.of(args.firstOffset, 1));
+ assertThrows(IllegalArgumentException.class, () -> records.slice(0,
-1));
+ }
+
+ @Test
+ public void testSliceEmptyRecords() {
+ MemoryRecords empty = MemoryRecords.EMPTY;
+ Records sliced = empty.slice(0, 0);
+ assertEquals(0, sliced.sizeInBytes());
+ assertEquals(0, batches(sliced).size());
+ }
+
+ /**
+ * Test slice when already sliced memory records have start position
greater than available bytes
+ * in the memory records.
+ */
+ @ParameterizedTest
+ @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+ public void testSliceForAlreadySlicedMemoryRecords(Args args) throws
IOException {
+ LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
+ recordsPerOffset.put(args.firstOffset, 5);
+ recordsPerOffset.put(args.firstOffset + 5L, 10);
+ recordsPerOffset.put(args.firstOffset + 15L, 12);
+ recordsPerOffset.put(args.firstOffset + 27L, 4);
+
+ MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
+ List<RecordBatch> items = batches(records.slice(0,
records.sizeInBytes()));
+
+ // Slice from third message until the end.
+ int position = IntStream.range(0, 2).map(i ->
items.get(i).sizeInBytes()).sum();
+ Records sliced = records.slice(position, records.sizeInBytes() -
position);
+ assertEquals(records.sizeInBytes() - position, sliced.sizeInBytes());
+ assertEquals(items.subList(2, items.size()), batches(sliced), "Read
starting from the third message");
+
+ // Further slice the already sliced memory records, from fourth
message until the end. Now the
+ // bytes available in the sliced records are less than the start
position.
Review Comment:
I still don't quite understand this comment. Why are we comparing available
bytes with the start position? Ditto in FileRecordsTest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]