This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a619c6bb952 MINOR; Remove cast for Records' slice (#19661)
a619c6bb952 is described below
commit a619c6bb95200108cbeabd041242b8ff01a0c3dc
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue May 13 20:06:38 2025 -0400
MINOR; Remove cast for Records' slice (#19661)
In Java return types are covariant. This means that method override can
override the return type with a subclass.
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
<[email protected]>, Apoorv Mittal <[email protected]>
---
.../apache/kafka/common/record/FileRecords.java | 78 ++++++++++++++--------
.../apache/kafka/common/record/MemoryRecords.java | 2 +-
.../org/apache/kafka/common/record/Records.java | 3 +-
.../kafka/common/record/FileRecordsTest.java | 44 ++++++------
.../kafka/common/record/MemoryRecordsTest.java | 51 +++++++++-----
.../main/scala/kafka/tools/DumpLogSegments.scala | 6 +-
6 files changed, 109 insertions(+), 75 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 6a42a52d2e0..4ec7db604bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -54,33 +54,52 @@ public class FileRecords extends AbstractRecords implements
Closeable {
* The {@code FileRecords.open} methods should be used instead of this
constructor whenever possible.
* The constructor is visible for tests.
*/
- FileRecords(File file,
- FileChannel channel,
- int start,
- int end,
- boolean isSlice) throws IOException {
+ FileRecords(
+ File file,
+ FileChannel channel,
+ int end
+ ) throws IOException {
this.file = file;
this.channel = channel;
- this.start = start;
+ this.start = 0;
this.end = end;
- this.isSlice = isSlice;
- this.size = new AtomicInteger();
+ this.isSlice = false;
- if (isSlice) {
- // don't check the file size if this is just a slice view
- size.set(end - start);
- } else {
- if (channel.size() > Integer.MAX_VALUE)
- throw new KafkaException("The size of segment " + file + " ("
+ channel.size() +
- ") is larger than the maximum allowed segment size of
" + Integer.MAX_VALUE);
+ if (channel.size() > Integer.MAX_VALUE) {
+ throw new KafkaException(
+ "The size of segment " + file + " (" + channel.size() +
+ ") is larger than the maximum allowed segment size of " +
Integer.MAX_VALUE
+ );
+ }
- int limit = Math.min((int) channel.size(), end);
- size.set(limit - start);
+ int limit = Math.min((int) channel.size(), end);
+ this.size = new AtomicInteger(limit - start);
- // if this is not a slice, update the file pointer to the end of
the file
- // set the file position to the last byte in the file
- channel.position(limit);
- }
+ // update the file position to the end of the file
+ channel.position(limit);
+
+ batches = batchesFrom(start);
+ }
+
+ /**
+ * Constructor for creating a slice.
+ *
+ * This overloaded constructor avoids having to declare a checked IO
exception.
+ */
+ private FileRecords(
+ File file,
+ FileChannel channel,
+ int start,
+ int end
+ ) {
+ this.file = file;
+ this.channel = channel;
+ this.start = start;
+ this.end = end;
+ this.isSlice = true;
+
+ // don't check the file size since this is just a slice view
+ this.size = new AtomicInteger(end - start);
batches = batchesFrom(start);
}
@@ -121,10 +140,11 @@ public class FileRecords extends AbstractRecords
implements Closeable {
}
@Override
- public Records slice(int position, int size) throws IOException {
+ public FileRecords slice(int position, int size) {
int availableBytes = availableBytes(position, size);
int startPosition = this.start + position;
- return new FileRecords(file, channel, startPosition, startPosition +
availableBytes, true);
+
+ return new FileRecords(file, channel, startPosition, startPosition +
availableBytes);
}
/**
@@ -288,7 +308,7 @@ public class FileRecords extends AbstractRecords implements
Closeable {
*/
public LogOffsetPosition searchForOffsetFromPosition(long targetOffset,
int startingPosition) {
FileChannelRecordBatch prevBatch = null;
- // The following logic is intentionally designed to minimize memory
usage by avoiding
+ // The following logic is intentionally designed to minimize memory
usage by avoiding
// unnecessary calls to lastOffset() for every batch.
// Instead, we use baseOffset() comparisons when possible, and only
check lastOffset() when absolutely necessary.
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
@@ -296,14 +316,14 @@ public class FileRecords extends AbstractRecords
implements Closeable {
if (batch.baseOffset() == targetOffset) {
return LogOffsetPosition.fromBatch(batch);
}
-
+
// If we find the first batch with baseOffset greater than
targetOffset
if (batch.baseOffset() > targetOffset) {
// If the previous batch contains the target
if (prevBatch != null && prevBatch.lastOffset() >=
targetOffset)
return LogOffsetPosition.fromBatch(prevBatch);
else {
- // If there's no previous batch or the previous batch
doesn't contain the
+ // If there's no previous batch or the previous batch
doesn't contain the
// target, return the current batch
return LogOffsetPosition.fromBatch(batch);
}
@@ -312,7 +332,7 @@ public class FileRecords extends AbstractRecords implements
Closeable {
}
// Only one case would reach here: all batches have baseOffset less
than targetOffset
// Check if the last batch contains the target
- if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
+ if (prevBatch != null && prevBatch.lastOffset() >= targetOffset)
return LogOffsetPosition.fromBatch(prevBatch);
return null;
@@ -424,7 +444,7 @@ public class FileRecords extends AbstractRecords implements
Closeable {
boolean preallocate) throws IOException {
FileChannel channel = openChannel(file, mutable, fileAlreadyExists,
initFileSize, preallocate);
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
- return new FileRecords(file, channel, 0, end, false);
+ return new FileRecords(file, channel, end);
}
public static FileRecords open(File file,
@@ -475,7 +495,7 @@ public class FileRecords extends AbstractRecords implements
Closeable {
public final long offset;
public final int position;
public final int size;
-
+
public static LogOffsetPosition fromBatch(FileChannelRecordBatch
batch) {
return new LogOffsetPosition(batch.baseOffset(), batch.position(),
batch.sizeInBytes());
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 1786f61d187..2e2b97dfe37 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -301,7 +301,7 @@ public class MemoryRecords extends AbstractRecords {
}
@Override
- public Records slice(int position, int size) {
+ public MemoryRecords slice(int position, int size) {
if (position < 0)
throw new IllegalArgumentException("Invalid position: " + position
+ " in read from " + this);
if (position > buffer.limit())
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java
b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 3d45762e815..017c49ba94c 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.utils.AbstractIterator;
-import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
@@ -105,5 +104,5 @@ public interface Records extends TransferableRecords {
* @param size The number of bytes after the start position to include
* @return A sliced wrapper on this message set limited based on the given
position and size
*/
- Records slice(int position, int size) throws IOException;
+ Records slice(int position, int size);
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 75babcf95b0..04590ccbe7b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -89,7 +89,7 @@ public class FileRecordsTest {
FileChannel fileChannelMock = mock(FileChannel.class);
when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE);
- FileRecords records = new FileRecords(fileMock, fileChannelMock, 0,
Integer.MAX_VALUE, false);
+ FileRecords records = new FileRecords(fileMock, fileChannelMock,
Integer.MAX_VALUE);
assertThrows(IllegalArgumentException.class, () -> append(records,
values));
}
@@ -99,7 +99,7 @@ public class FileRecordsTest {
FileChannel fileChannelMock = mock(FileChannel.class);
when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L);
- assertThrows(KafkaException.class, () -> new FileRecords(fileMock,
fileChannelMock, 0, Integer.MAX_VALUE, false));
+ assertThrows(KafkaException.class, () -> new FileRecords(fileMock,
fileChannelMock, Integer.MAX_VALUE));
}
@Test
@@ -198,9 +198,9 @@ public class FileRecordsTest {
*/
@Test
public void testRead() throws IOException {
- Records read = fileRecords.slice(0, fileRecords.sizeInBytes());
+ FileRecords read = fileRecords.slice(0, fileRecords.sizeInBytes());
assertEquals(fileRecords.sizeInBytes(), read.sizeInBytes());
- TestUtils.checkEquals(fileRecords.batches(), ((FileRecords)
read).batches());
+ TestUtils.checkEquals(fileRecords.batches(), read.batches());
List<RecordBatch> items = batches(read);
RecordBatch first = items.get(0);
@@ -313,7 +313,7 @@ public class FileRecordsTest {
when(channelMock.size()).thenReturn(42L);
when(channelMock.position(42L)).thenReturn(null);
- FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0,
Integer.MAX_VALUE, false);
+ FileRecords fileRecords = new FileRecords(tempFile(), channelMock,
Integer.MAX_VALUE);
fileRecords.truncateTo(42);
verify(channelMock, atLeastOnce()).size();
@@ -330,7 +330,7 @@ public class FileRecordsTest {
when(channelMock.size()).thenReturn(42L);
- FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0,
Integer.MAX_VALUE, false);
+ FileRecords fileRecords = new FileRecords(tempFile(), channelMock,
Integer.MAX_VALUE);
try {
fileRecords.truncateTo(43);
@@ -352,7 +352,7 @@ public class FileRecordsTest {
when(channelMock.size()).thenReturn(42L);
when(channelMock.truncate(anyLong())).thenReturn(channelMock);
- FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0,
Integer.MAX_VALUE, false);
+ FileRecords fileRecords = new FileRecords(tempFile(), channelMock,
Integer.MAX_VALUE);
fileRecords.truncateTo(23);
verify(channelMock, atLeastOnce()).size();
@@ -526,7 +526,7 @@ public class FileRecordsTest {
* 1. If the target offset equals the base offset of the first batch
* 2. If the target offset is less than the base offset of the first batch
* <p>
- * If the base offset of the first batch is equal to or greater than the
target offset, it should return the
+ * If the base offset of the first batch is equal to or greater than the
target offset, it should return the
* position of the first batch and the lastOffset method should not be
called.
*/
@ParameterizedTest
@@ -537,7 +537,7 @@ public class FileRecordsTest {
FileLogInputStream.FileChannelRecordBatch batch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
when(batch.baseOffset()).thenReturn(baseOffset);
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, batch);
FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetFromPosition(5L, 0);
@@ -557,7 +557,7 @@ public class FileRecordsTest {
when(batch.baseOffset()).thenReturn(3L);
when(batch.lastOffset()).thenReturn(5L);
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, batch);
FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetFromPosition(5L, 0);
@@ -581,7 +581,7 @@ public class FileRecordsTest {
when(currentBatch.baseOffset()).thenReturn(15L);
when(currentBatch.lastOffset()).thenReturn(20L);
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetFromPosition(20L, 0);
@@ -605,13 +605,13 @@ public class FileRecordsTest {
FileLogInputStream.FileChannelRecordBatch currentBatch =
mock(FileLogInputStream.FileChannelRecordBatch.class);
when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetFromPosition(10L, 0);
assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch),
result);
- // Because the target offset is in the current batch, we should call
lastOffset
+ // Because the target offset is in the current batch, we should call
lastOffset
// on the previous batch
verify(prevBatch, times(1)).lastOffset();
}
@@ -629,13 +629,13 @@ public class FileRecordsTest {
when(batch2.baseOffset()).thenReturn(8L); // < targetOffset
when(batch2.lastOffset()).thenReturn(9L); // < targetOffset
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, batch1, batch2);
FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetFromPosition(10L, 0);
assertNull(result);
- // Because the target offset is exceeded by the last offset of the
batch2,
+ // Because the target offset is exceeded by the last offset of the
batch2,
// we should call lastOffset on the batch2
verify(batch1, never()).lastOffset();
verify(batch2, times(1)).lastOffset();
@@ -657,7 +657,7 @@ public class FileRecordsTest {
when(batch2.baseOffset()).thenReturn(baseOffset); // < targetOffset
or == targetOffset
when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, batch1, batch2);
long targetOffset = 10L;
@@ -670,7 +670,7 @@ public class FileRecordsTest {
verify(batch1, never()).lastOffset();
verify(batch2, never()).lastOffset();
} else {
- // Because the target offset is in the batch2, we should not call
+ // Because the target offset is in the batch2, we should not call
// lastOffset on batch1
verify(batch1, never()).lastOffset();
verify(batch2, times(1)).lastOffset();
@@ -685,13 +685,13 @@ public class FileRecordsTest {
File mockFile = mock(File.class);
FileChannel mockChannel = mock(FileChannel.class);
FileLogInputStream.FileChannelRecordBatch batch1 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
- when(batch1.baseOffset()).thenReturn(5L);
- when(batch1.lastOffset()).thenReturn(10L);
+ when(batch1.baseOffset()).thenReturn(5L);
+ when(batch1.lastOffset()).thenReturn(10L);
FileLogInputStream.FileChannelRecordBatch batch2 =
mock(FileLogInputStream.FileChannelRecordBatch.class);
- when(batch2.baseOffset()).thenReturn(15L);
- when(batch2.lastOffset()).thenReturn(20L);
+ when(batch2.baseOffset()).thenReturn(15L);
+ when(batch2.lastOffset()).thenReturn(20L);
- FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 0, 100, false));
+ FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile,
mockChannel, 100));
mockFileRecordBatches(fileRecords, batch1, batch2);
FileRecords.LogOffsetPosition result =
fileRecords.searchForOffsetFromPosition(13L, 0);
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 6f98da21eed..7092928010b 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -35,7 +35,6 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -136,10 +135,6 @@ public class MemoryRecordsTest {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int partitionLeaderEpoch = 998;
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic,
compression,
- TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid,
epoch, firstSequence, false, false,
- partitionLeaderEpoch, buffer.limit());
-
SimpleRecord[] records = new SimpleRecord[] {
new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
@@ -149,10 +144,30 @@ public class MemoryRecordsTest {
new SimpleRecord(6L, (byte[]) null, null)
};
- for (SimpleRecord record : records)
- builder.append(record);
+ final MemoryRecords memoryRecords;
+ try (var builder = new MemoryRecordsBuilder(
+ buffer,
+ magic,
+ compression,
+ TimestampType.CREATE_TIME,
+ firstOffset,
+ logAppendTime,
+ pid,
+ epoch,
+ firstSequence,
+ false,
+ false,
+ partitionLeaderEpoch,
+ buffer.limit()
+ )
+ ) {
+ for (SimpleRecord record : records) {
+ builder.append(record);
+ }
+
+ memoryRecords = builder.build();
+ }
- MemoryRecords memoryRecords = builder.build();
for (int iteration = 0; iteration < 2; iteration++) {
int total = 0;
for (RecordBatch batch : memoryRecords.batches()) {
@@ -1075,7 +1090,7 @@ public class MemoryRecordsTest {
@ParameterizedTest
@ArgumentsSource(MemoryRecordsArgumentsProvider.class)
- public void testSlice(Args args) throws IOException {
+ public void testSlice(Args args) {
// Create a MemoryRecords instance with multiple batches. Prior
RecordBatch.MAGIC_VALUE_V2,
// every append in a batch is a new batch. After
RecordBatch.MAGIC_VALUE_V2, we can have multiple
// batches in a single MemoryRecords instance. Though with
compression, we can have multiple
@@ -1087,10 +1102,10 @@ public class MemoryRecordsTest {
MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
// Test slicing from start
- Records sliced = records.slice(0, records.sizeInBytes());
+ MemoryRecords sliced = records.slice(0, records.sizeInBytes());
assertEquals(records.sizeInBytes(), sliced.sizeInBytes());
- assertEquals(records.validBytes(), ((MemoryRecords)
sliced).validBytes());
- TestUtils.checkEquals(records.batches(), ((MemoryRecords)
sliced).batches());
+ assertEquals(records.validBytes(), sliced.validBytes());
+ TestUtils.checkEquals(records.batches(), sliced.batches());
List<RecordBatch> items = batches(records);
// Test slicing first message.
@@ -1098,19 +1113,19 @@ public class MemoryRecordsTest {
sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() -
first.sizeInBytes());
assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
- assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+ assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read from second message and size is past the end of the file.
sliced = records.slice(first.sizeInBytes(), records.sizeInBytes());
assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
- assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+ assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read from second message and position + size overflows.
sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE);
assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
- assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+ assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read a single message starting from second message.
RecordBatch second = items.get(1);
@@ -1131,14 +1146,14 @@ public class MemoryRecordsTest {
.slice(first.sizeInBytes() - 1, records.sizeInBytes());
assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
- assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+ assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
// Read from second message and position + size overflows on the
already sliced view.
sliced = records.slice(1, records.sizeInBytes() - 1)
.slice(first.sizeInBytes() - 1, Integer.MAX_VALUE);
assertEquals(records.sizeInBytes() - first.sizeInBytes(),
sliced.sizeInBytes());
assertEquals(items.subList(1, items.size()), batches(sliced), "Read
starting from the second message");
- assertTrue(((MemoryRecords) sliced).validBytes() <=
sliced.sizeInBytes());
+ assertTrue(sliced.validBytes() <= sliced.sizeInBytes());
}
@ParameterizedTest
@@ -1170,7 +1185,7 @@ public class MemoryRecordsTest {
*/
@ParameterizedTest
@ArgumentsSource(MemoryRecordsArgumentsProvider.class)
- public void testSliceForAlreadySlicedMemoryRecords(Args args) throws
IOException {
+ public void testSliceForAlreadySlicedMemoryRecords(Args args) {
LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
recordsPerOffset.put(args.firstOffset, 5);
recordsPerOffset.put(args.firstOffset + 5L, 10);
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 85bc4e1a269..7967d6f4f49 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -277,7 +277,7 @@ object DumpLogSegments {
println(s"Snapshot end offset: ${path.snapshotId.offset}, epoch:
${path.snapshotId.epoch}")
}
}
- val fileRecords = FileRecords.open(file, false).slice(0,
maxBytes).asInstanceOf[FileRecords]
+ val fileRecords = FileRecords.open(file, false).slice(0, maxBytes)
try {
var validBytes = 0L
var lastOffset = -1L
@@ -567,7 +567,7 @@ object DumpLogSegments {
private class RemoteMetadataLogMessageParser extends MessageParser[String,
String] {
private val metadataRecordSerde = new RemoteLogMetadataSerde
-
+
override def parse(record: Record): (Option[String], Option[String]) = {
val output = try {
val data = new Array[Byte](record.value.remaining)
@@ -626,7 +626,7 @@ object DumpLogSegments {
private val transactionLogOpt = parser.accepts("transaction-log-decoder",
"If set, log data will be parsed as " +
"transaction metadata from the __transaction_state topic.")
private val clusterMetadataOpt =
parser.accepts("cluster-metadata-decoder", "If set, log data will be parsed as
cluster metadata records.")
- private val remoteMetadataOpt =
parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed
as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." +
+ private val remoteMetadataOpt =
parser.accepts("remote-log-metadata-decoder", "If set, log data will be parsed
as TopicBasedRemoteLogMetadataManager (RLMM) metadata records." +
" Instead, the value-decoder-class option can be used if a custom RLMM
implementation is configured.")
private val shareStateOpt = parser.accepts("share-group-state-decoder",
"If set, log data will be parsed as share group state data from the " +
"__share_group_state topic.")