This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a619c6bb952 MINOR; Remove cast for Records' slice (#19661)
a619c6bb952 is described below

commit a619c6bb95200108cbeabd041242b8ff01a0c3dc
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue May 13 20:06:38 2025 -0400

    MINOR; Remove cast for Records' slice (#19661)
    
    In Java return types are covariant. This means that method override can
    override the return type with a subclass.
    
    Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
     <[email protected]>, Apoorv Mittal <[email protected]>
---
 .../apache/kafka/common/record/FileRecords.java    | 78 ++++++++++++++--------
 .../apache/kafka/common/record/MemoryRecords.java  |  2 +-
 .../org/apache/kafka/common/record/Records.java    |  3 +-
 .../kafka/common/record/FileRecordsTest.java       | 44 ++++++------
 .../kafka/common/record/MemoryRecordsTest.java     | 51 +++++++++-----
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  6 +-
 6 files changed, 109 insertions(+), 75 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 6a42a52d2e0..4ec7db604bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -54,33 +54,52 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
      * The {@code FileRecords.open} methods should be used instead of this 
constructor whenever possible.
      * The constructor is visible for tests.
      */
-    FileRecords(File file,
-                FileChannel channel,
-                int start,
-                int end,
-                boolean isSlice) throws IOException {
+    FileRecords(
+        File file,
+        FileChannel channel,
+        int end
+    ) throws IOException {
         this.file = file;
         this.channel = channel;
-        this.start = start;
+        this.start = 0;
         this.end = end;
-        this.isSlice = isSlice;
-        this.size = new AtomicInteger();
+        this.isSlice = false;
 
-        if (isSlice) {
-            // don't check the file size if this is just a slice view
-            size.set(end - start);
-        } else {
-            if (channel.size() > Integer.MAX_VALUE)
-                throw new KafkaException("The size of segment " + file + " (" 
+ channel.size() +
-                        ") is larger than the maximum allowed segment size of 
" + Integer.MAX_VALUE);
+        if (channel.size() > Integer.MAX_VALUE) {
+            throw new KafkaException(
+                "The size of segment " + file + " (" + channel.size() +
+                ") is larger than the maximum allowed segment size of " + 
Integer.MAX_VALUE
+            );
+        }
 
-            int limit = Math.min((int) channel.size(), end);
-            size.set(limit - start);
+        int limit = Math.min((int) channel.size(), end);
+        this.size = new AtomicInteger(limit - start);
 
-            // if this is not a slice, update the file pointer to the end of 
the file
-            // set the file position to the last byte in the file
-            channel.position(limit);
-        }
+        // update the file position to the end of the file
+        channel.position(limit);
+
+        batches = batchesFrom(start);
+    }
+
+    /**
+     * Constructor for creating a slice.
+     *
+     * This overloaded constructor avoids having to declare a checked IO 
exception.
+     */
+    private FileRecords(
+        File file,
+        FileChannel channel,
+        int start,
+        int end
+    ) {
+        this.file = file;
+        this.channel = channel;
+        this.start = start;
+        this.end = end;
+        this.isSlice = true;
+
+        // don't check the file size since this is just a slice view
+        this.size = new AtomicInteger(end - start);
 
         batches = batchesFrom(start);
     }
@@ -121,10 +140,11 @@ public class FileRecords extends AbstractRecords 
implements Closeable {
     }
 
     @Override
-    public Records slice(int position, int size) throws IOException {
+    public FileRecords slice(int position, int size) {
         int availableBytes = availableBytes(position, size);
         int startPosition = this.start + position;
-        return new FileRecords(file, channel, startPosition, startPosition + 
availableBytes, true);
+
+        return new FileRecords(file, channel, startPosition, startPosition + 
availableBytes);
     }
 
     /**
@@ -288,7 +308,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
      */
     public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, 
int startingPosition) {
         FileChannelRecordBatch prevBatch = null;
-        // The following logic is intentionally designed to minimize memory 
usage by avoiding 
+        // The following logic is intentionally designed to minimize memory 
usage by avoiding
         // unnecessary calls to lastOffset() for every batch.
         // Instead, we use baseOffset() comparisons when possible, and only 
check lastOffset() when absolutely necessary.
         for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
@@ -296,14 +316,14 @@ public class FileRecords extends AbstractRecords 
implements Closeable {
             if (batch.baseOffset() == targetOffset) {
                 return LogOffsetPosition.fromBatch(batch);
             }
-            
+
             // If we find the first batch with baseOffset greater than 
targetOffset
             if (batch.baseOffset() > targetOffset) {
                 // If the previous batch contains the target
                 if (prevBatch != null && prevBatch.lastOffset() >= 
targetOffset)
                     return LogOffsetPosition.fromBatch(prevBatch);
                 else {
-                    // If there's no previous batch or the previous batch 
doesn't contain the 
+                    // If there's no previous batch or the previous batch 
doesn't contain the
                     // target, return the current batch
                     return LogOffsetPosition.fromBatch(batch);
                 }
@@ -312,7 +332,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
         }
         // Only one case would reach here: all batches have baseOffset less 
than targetOffset
         // Check if the last batch contains the target
-        if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) 
+        if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
             return LogOffsetPosition.fromBatch(prevBatch);
 
         return null;
@@ -424,7 +444,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, 
initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, end);
     }
 
     public static FileRecords open(File file,
@@ -475,7 +495,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
         public final long offset;
         public final int position;
         public final int size;
-        
+
         public static LogOffsetPosition fromBatch(FileChannelRecordBatch 
batch) {
             return new LogOffsetPosition(batch.baseOffset(), batch.position(), 
batch.sizeInBytes());
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 1786f61d187..2e2b97dfe37 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -301,7 +301,7 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public Records slice(int position, int size) {
+    public MemoryRecords slice(int position, int size) {
         if (position < 0)
             throw new IllegalArgumentException("Invalid position: " + position 
+ " in read from " + this);
         if (position > buffer.limit())
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java 
b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 3d45762e815..017c49ba94c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.utils.AbstractIterator;
 
-import java.io.IOException;
 import java.util.Iterator;
 import java.util.Optional;
 
@@ -105,5 +104,5 @@ public interface Records extends TransferableRecords {
      * @param size The number of bytes after the start position to include
      * @return A sliced wrapper on this message set limited based on the given 
position and size
      */
-    Records slice(int position, int size) throws IOException;
+    Records slice(int position, int size);
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 75babcf95b0..04590ccbe7b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -89,7 +89,7 @@ public class FileRecordsTest {
         FileChannel fileChannelMock = mock(FileChannel.class);
         when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE);
 
-        FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, 
Integer.MAX_VALUE, false);
+        FileRecords records = new FileRecords(fileMock, fileChannelMock, 
Integer.MAX_VALUE);
         assertThrows(IllegalArgumentException.class, () -> append(records, 
values));
     }
 
@@ -99,7 +99,7 @@ public class FileRecordsTest {
         FileChannel fileChannelMock = mock(FileChannel.class);
         when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L);
 
-        assertThrows(KafkaException.class, () -> new FileRecords(fileMock, 
fileChannelMock, 0, Integer.MAX_VALUE, false));
+        assertThrows(KafkaException.class, () -> new FileRecords(fileMock, 
fileChannelMock, Integer.MAX_VALUE));
     }
 
     @Test
@@ -198,9 +198,9 @@ public class FileRecordsTest {
      */
     @Test
     public void testRead() throws IOException {
-        Records read = fileRecords.slice(0, fileRecords.sizeInBytes());
+        FileRecords read = fileRecords.slice(0, fileRecords.sizeInBytes());
         assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes());
-        TestUtils.checkEquals(fileRecords.batches(), ((FileRecords) 
read).batches());
+        TestUtils.checkEquals(fileRecords.batches(), read.batches());
 
         List<RecordBatch> items = batches(read);
         RecordBatch first = items.get(0);
@@ -313,7 +313,7 @@ public class FileRecordsTest {
         when(channelMock.size()).thenReturn(42L);
         when(channelMock.position(42L)).thenReturn(null);
 
-        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, 
Integer.MAX_VALUE, false);
+        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 
Integer.MAX_VALUE);
         fileRecords.truncateTo(42);
 
         verify(channelMock, atLeastOnce()).size();
@@ -330,7 +330,7 @@ public class FileRecordsTest {
 
         when(channelMock.size()).thenReturn(42L);
 
-        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, 
Integer.MAX_VALUE, false);
+        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 
Integer.MAX_VALUE);
 
         try {
             fileRecords.truncateTo(43);
@@ -352,7 +352,7 @@ public class FileRecordsTest {
         when(channelMock.size()).thenReturn(42L);
         when(channelMock.truncate(anyLong())).thenReturn(channelMock);
 
-        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, 
Integer.MAX_VALUE, false);
+        FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 
Integer.MAX_VALUE);
         fileRecords.truncateTo(23);
 
         verify(channelMock, atLeastOnce()).size();
@@ -526,7 +526,7 @@ public class FileRecordsTest {
      * 1. If the target offset equals the base offset of the first batch
      * 2. If the target offset is less than the base offset of the first batch
      * <p>
-     * If the base offset of the first batch is equal to or greater than the 
target offset, it should return the 
+     * If the base offset of the first batch is equal to or greater than the 
target offset, it should return the
      * position of the first batch and the lastOffset method should not be 
called.
      */
     @ParameterizedTest
@@ -537,7 +537,7 @@ public class FileRecordsTest {
         FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
         when(batch.baseOffset()).thenReturn(baseOffset);
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, batch);
 
         FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(5L, 0);
@@ -557,7 +557,7 @@ public class FileRecordsTest {
         when(batch.baseOffset()).thenReturn(3L);
         when(batch.lastOffset()).thenReturn(5L);
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, batch);
 
         FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(5L, 0);
@@ -581,7 +581,7 @@ public class FileRecordsTest {
         when(currentBatch.baseOffset()).thenReturn(15L);
         when(currentBatch.lastOffset()).thenReturn(20L);
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
 
         FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(20L, 0);
@@ -605,13 +605,13 @@ public class FileRecordsTest {
         FileLogInputStream.FileChannelRecordBatch currentBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
         when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
 
         FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(10L, 0);
 
         assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), 
result);
-        // Because the target offset is in the current batch, we should call 
lastOffset 
+        // Because the target offset is in the current batch, we should call 
lastOffset
         // on the previous batch
         verify(prevBatch, times(1)).lastOffset();
     }
@@ -629,13 +629,13 @@ public class FileRecordsTest {
         when(batch2.baseOffset()).thenReturn(8L);  // < targetOffset
         when(batch2.lastOffset()).thenReturn(9L);  // < targetOffset
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, batch1, batch2);
 
         FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(10L, 0);
 
         assertNull(result);
-        // Because the target offset is exceeded by the last offset of the 
batch2, 
+        // Because the target offset is exceeded by the last offset of the 
batch2,
         // we should call lastOffset on the batch2
         verify(batch1, never()).lastOffset();
         verify(batch2, times(1)).lastOffset();
@@ -657,7 +657,7 @@ public class FileRecordsTest {
         when(batch2.baseOffset()).thenReturn(baseOffset);  // < targetOffset 
or == targetOffset
         when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, batch1, batch2);
 
         long targetOffset = 10L;
@@ -670,7 +670,7 @@ public class FileRecordsTest {
             verify(batch1, never()).lastOffset();
             verify(batch2, never()).lastOffset();
         } else {
-            // Because the target offset is in the batch2, we should not call 
+            // Because the target offset is in the batch2, we should not call
             // lastOffset on batch1
             verify(batch1, never()).lastOffset();
             verify(batch2, times(1)).lastOffset();
@@ -685,13 +685,13 @@ public class FileRecordsTest {
         File mockFile = mock(File.class);
         FileChannel mockChannel = mock(FileChannel.class);
         FileLogInputStream.FileChannelRecordBatch batch1 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
-        when(batch1.baseOffset()).thenReturn(5L);  
-        when(batch1.lastOffset()).thenReturn(10L); 
+        when(batch1.baseOffset()).thenReturn(5L);
+        when(batch1.lastOffset()).thenReturn(10L);
         FileLogInputStream.FileChannelRecordBatch batch2 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
-        when(batch2.baseOffset()).thenReturn(15L);  
-        when(batch2.lastOffset()).thenReturn(20L);  
+        when(batch2.baseOffset()).thenReturn(15L);
+        when(batch2.lastOffset()).thenReturn(20L);
 
-        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 100));
         mockFileRecordBatches(fileRecords, batch1, batch2);
 
         FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(13L, 0);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 6f98da21eed..7092928010b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -35,7 +35,6 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.ArgumentsProvider;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -136,10 +135,6 @@ public class MemoryRecordsTest {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
 
         int partitionLeaderEpoch = 998;
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, 
compression,
-                TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, 
epoch, firstSequence, false, false,
-                partitionLeaderEpoch, buffer.limit());
-
         SimpleRecord[] records = new SimpleRecord[] {
             new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
             new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
@@ -149,10 +144,30 @@ public class MemoryRecordsTest {
             new SimpleRecord(6L, (byte[]) null, null)
         };
 
-        for (SimpleRecord record : records)
-            builder.append(record);
+        final MemoryRecords memoryRecords;
+        try (var builder = new MemoryRecordsBuilder(
+                buffer,
+                magic,
+                compression,
+                TimestampType.CREATE_TIME,
+                firstOffset,
+                logAppendTime,
+                pid,
+                epoch,
+                firstSequence,
+                false,
+                false,
+                partitionLeaderEpoch,
+                buffer.limit()
+            )
+        ) {
+            for (SimpleRecord record : records) {
+                builder.append(record);
+            }
+
+            memoryRecords = builder.build();
+        }
 
-        MemoryRecords memoryRecords = builder.build();
         for (int iteration = 0; iteration < 2; iteration++) {
             int total = 0;
             for (RecordBatch batch : memoryRecords.batches()) {
@@ -1075,7 +1090,7 @@ public class MemoryRecordsTest {
 
     @ParameterizedTest
     @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testSlice(Args args) throws IOException {
+    public void testSlice(Args args) {
         // Create a MemoryRecords instance with multiple batches. Prior 
RecordBatch.MAGIC_VALUE_V2,
         // every append in a batch is a new batch. After 
RecordBatch.MAGIC_VALUE_V2, we can have multiple
         // batches in a single MemoryRecords instance. Though with 
compression, we can have multiple
@@ -1087,10 +1102,10 @@ public class MemoryRecordsTest {
         MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
 
         // Test slicing from start
-        Records sliced = records.slice(0, records.sizeInBytes());
+        MemoryRecords sliced = records.slice(0, records.sizeInBytes());
         assertEquals(records.sizeInBytes(), sliced.sizeInBytes());
-        assertEquals(records.validBytes(), ((MemoryRecords) 
sliced).validBytes());
-        TestUtils.checkEquals(records.batches(), ((MemoryRecords) 
sliced).batches());
+        assertEquals(records.validBytes(), sliced.validBytes());
+        TestUtils.checkEquals(records.batches(), sliced.batches());
 
         List<RecordBatch> items = batches(records);
         // Test slicing first message.
@@ -1098,19 +1113,19 @@ public class MemoryRecordsTest {
         sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - 
first.sizeInBytes());
         assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
         assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
-        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+        assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
 
         // Read from second message and size is past the end of the file.
         sliced = records.slice(first.sizeInBytes(), records.sizeInBytes());
         assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
         assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
-        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+        assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
 
         // Read from second message and position + size overflows.
         sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE);
         assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
         assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
-        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+        assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
 
         // Read a single message starting from second message.
         RecordBatch second = items.get(1);
@@ -1131,14 +1146,14 @@ public class MemoryRecordsTest {
             .slice(first.sizeInBytes() - 1, records.sizeInBytes());
         assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
         assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
-        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+        assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
 
         // Read from second message and position + size overflows on the 
already sliced view.
         sliced = records.slice(1, records.sizeInBytes() - 1)
             .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE);
         assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
         assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
-        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+        assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
     }
 
     @ParameterizedTest
@@ -1170,7 +1185,7 @@ public class MemoryRecordsTest {
      */
     @ParameterizedTest
     @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
-    public void testSliceForAlreadySlicedMemoryRecords(Args args) throws 
IOException {
+    public void testSliceForAlreadySlicedMemoryRecords(Args args) {
         LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
         recordsPerOffset.put(args.firstOffset, 5);
         recordsPerOffset.put(args.firstOffset + 5L, 10);
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 85bc4e1a269..7967d6f4f49 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -277,7 +277,7 @@ object DumpLogSegments {
         println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch: 
${path.snapshotId.epoch}")
       }
     }
-    val fileRecords = FileRecords.open(file, false).slice(0, 
maxBytes).asInstanceOf[FileRecords]
+    val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
     try {
       var validBytes = 0L
       var lastOffset = -1L
@@ -567,7 +567,7 @@ object DumpLogSegments {
 
   private class RemoteMetadataLogMessageParser extends MessageParser[String, 
String] {
     private val metadataRecordSerde = new RemoteLogMetadataSerde
-    
+
     override def parse(record: Record): (Option[String], Option[String]) = {
       val output = try {
         val data = new Array[Byte](record.value.remaining)
@@ -626,7 +626,7 @@ object DumpLogSegments {
     private val transactionLogOpt = parser.accepts("transaction-log-decoder", 
"If set, log data will be parsed as " +
       "transaction metadata from the __transaction_state topic.")
     private val clusterMetadataOpt = 
parser.accepts("cluster-metadata-decoder", "If set, log data will be parsed as 
cluster metadata records.")
-    private val remoteMetadataOpt = 
parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed 
as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." + 
+    private val remoteMetadataOpt = 
parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed 
as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." +
       " Instead, the value-decoder-class option can be used if a custom RLMM 
implementation is configured.")
     private val shareStateOpt = parser.accepts("share-group-state-decoder", 
"If set, log data will be parsed as share group state data from the " +
       "__share_group_state topic.")

Reply via email to