This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 119d043c493 KAFKA-18700 Migrate SnapshotPath, Entry, OffsetAndEpoch,
LogFetchInfo, and LogAppendInfo to record classes (#19062)
119d043c493 is described below
commit 119d043c4933b6665716a9196a6620686338a962
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Sun Mar 9 09:52:02 2025 +0800
KAFKA-18700 Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and
LogAppendInfo to record classes (#19062)
Migrate the following data carrier class to records to eliminate
constructors, `equals`, `hashCode`, and `toString`.
* `Entry` in `LogHistory`
* `SnapshotPath`
Additionally, migrate the following classes as discussed:
* OffsetAndEpoch
* LogFetchInfo
* LogAppendInfo
In Java, accessing a field in record class requires parentheses.
In Scala, parentheses are not needed because Scala allows omitting them
when calling parameterless methods; hence, there is no need to change
the Scala code.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 14 +++----
.../java/org/apache/kafka/raft/LogAppendInfo.java | 11 +-----
.../java/org/apache/kafka/raft/LogFetchInfo.java | 11 +-----
.../java/org/apache/kafka/raft/OffsetAndEpoch.java | 43 +---------------------
.../internals/KRaftControlRecordStateMachine.java | 2 +-
.../apache/kafka/raft/internals/LogHistory.java | 40 +-------------------
.../org/apache/kafka/snapshot/SnapshotPath.java | 18 +--------
.../kafka/raft/KafkaRaftClientReconfigTest.java | 4 +-
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 16 ++++----
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 4 +-
.../test/java/org/apache/kafka/raft/MockLog.java | 2 +-
.../java/org/apache/kafka/raft/MockLogTest.java | 28 +++++++-------
.../org/apache/kafka/snapshot/SnapshotsTest.java | 18 ++++-----
13 files changed, 49 insertions(+), 162 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 34b5770cf70..cbea9251d84 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -430,7 +430,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset
-> {
if (nextExpectedOffset < highWatermark) {
LogFetchInfo readInfo = log.read(nextExpectedOffset,
Isolation.COMMITTED);
- listenerContext.fireHandleCommit(nextExpectedOffset,
readInfo.records);
+ listenerContext.fireHandleCommit(nextExpectedOffset,
readInfo.records());
}
});
}
@@ -1625,11 +1625,11 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED);
- if (state.updateReplicaState(replicaKey, currentTimeMs,
info.startOffsetMetadata)) {
+ if (state.updateReplicaState(replicaKey, currentTimeMs,
info.startOffsetMetadata())) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}
- records = info.records;
+ records = info.records();
} else {
records = MemoryRecords.EMPTY;
}
@@ -1820,7 +1820,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
try {
var info = log.appendAsFollower(records, quorum.epoch());
- kafkaRaftMetrics.updateFetchedRecords(info.lastOffset -
info.firstOffset + 1);
+ kafkaRaftMetrics.updateFetchedRecords(info.lastOffset() -
info.firstOffset() + 1);
} catch (CorruptRecordException | InvalidRecordException e) {
logger.info(
"Failed to append the records with the batch header '{}' to
the log",
@@ -1850,9 +1850,9 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
partitionState.updateState();
OffsetAndEpoch endOffset = endOffset();
- kafkaRaftMetrics.updateAppendRecords(info.lastOffset -
info.firstOffset + 1);
+ kafkaRaftMetrics.updateAppendRecords(info.lastOffset() -
info.firstOffset() + 1);
kafkaRaftMetrics.updateLogEnd(endOffset);
- logger.trace("Leader appended records at base offset {}, new end
offset is {}", info.firstOffset, endOffset);
+ logger.trace("Leader appended records at base offset {}, new end
offset is {}", info.firstOffset(), endOffset);
return info;
}
@@ -2938,7 +2938,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
try {
int epoch = state.epoch();
LogAppendInfo info = appendAsLeader(batch.data);
- OffsetAndEpoch offsetAndEpoch = new
OffsetAndEpoch(info.lastOffset, epoch);
+ OffsetAndEpoch offsetAndEpoch = new
OffsetAndEpoch(info.lastOffset(), epoch);
CompletableFuture<Long> future = appendPurgatory.await(
offsetAndEpoch.offset() + 1, Integer.MAX_VALUE);
diff --git a/raft/src/main/java/org/apache/kafka/raft/LogAppendInfo.java
b/raft/src/main/java/org/apache/kafka/raft/LogAppendInfo.java
index 6dc036ff9ab..79f93ea010c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LogAppendInfo.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LogAppendInfo.java
@@ -19,13 +19,4 @@ package org.apache.kafka.raft;
/**
* Metadata for the record batch appended to log
*/
-public class LogAppendInfo {
-
- public final long firstOffset;
- public final long lastOffset;
-
- public LogAppendInfo(long firstOffset, long lastOffset) {
- this.firstOffset = firstOffset;
- this.lastOffset = lastOffset;
- }
-}
+public record LogAppendInfo(long firstOffset, long lastOffset) { }
diff --git a/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
b/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
index 7aca7ea5dbc..7862fc2ab29 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java
@@ -21,13 +21,4 @@ import org.apache.kafka.common.record.Records;
/**
* Metadata for the records fetched from log, including the records itself
*/
-public class LogFetchInfo {
-
- public final Records records;
- public final LogOffsetMetadata startOffsetMetadata;
-
- public LogFetchInfo(Records records, LogOffsetMetadata
startOffsetMetadata) {
- this.records = records;
- this.startOffsetMetadata = startOffsetMetadata;
- }
-}
+public record LogFetchInfo(Records records, LogOffsetMetadata
startOffsetMetadata) { }
diff --git a/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java
b/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java
index 32e3daee8cf..cba6108fd5d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/OffsetAndEpoch.java
@@ -16,48 +16,7 @@
*/
package org.apache.kafka.raft;
-public class OffsetAndEpoch implements Comparable<OffsetAndEpoch> {
- private final long offset;
- private final int epoch;
-
- public OffsetAndEpoch(long offset, int epoch) {
- this.offset = offset;
- this.epoch = epoch;
- }
-
- public long offset() {
- return offset;
- }
-
- public int epoch() {
- return epoch;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- OffsetAndEpoch that = (OffsetAndEpoch) o;
-
- if (offset != that.offset) return false;
- return epoch == that.epoch;
- }
-
- @Override
- public int hashCode() {
- int result = (int) (offset ^ (offset >>> 32));
- result = 31 * result + epoch;
- return result;
- }
-
- @Override
- public String toString() {
- return "OffsetAndEpoch(" +
- "offset=" + offset +
- ", epoch=" + epoch +
- ')';
- }
+public record OffsetAndEpoch(long offset, int epoch) implements
Comparable<OffsetAndEpoch> {
@Override
public int compareTo(OffsetAndEpoch o) {
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index a8768f902a8..032f8230233 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -233,7 +233,7 @@ public final class KRaftControlRecordStateMachine {
while (log.endOffset().offset() > nextOffset) {
LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
try (RecordsIterator<?> iterator = new RecordsIterator<>(
- info.records,
+ info.records(),
serde,
bufferSupplier,
maxBatchSizeBytes,
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java
b/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java
index a3a37daf80c..62dd87ec601 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/LogHistory.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.raft.internals;
-import java.util.Objects;
import java.util.Optional;
/**
@@ -76,42 +75,5 @@ public interface LogHistory<T> {
*/
void clear();
- final class Entry<T> {
- private final long offset;
- private final T value;
-
- public Entry(long offset, T value) {
- this.offset = offset;
- this.value = value;
- }
-
- public long offset() {
- return offset;
- }
-
- public T value() {
- return value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- Entry<?> that = (Entry<?>) o;
-
- if (offset != that.offset) return false;
- return Objects.equals(value, that.value);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(offset, value);
- }
-
- @Override
- public String toString() {
- return String.format("Entry(offset=%d, value=%s)", offset, value);
- }
- }
+ record Entry<T>(long offset, T value) { }
}
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java
b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java
index 9a7d95e45e9..58ca000595b 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotPath.java
@@ -20,21 +20,5 @@ import org.apache.kafka.raft.OffsetAndEpoch;
import java.nio.file.Path;
-public final class SnapshotPath {
- public final Path path;
- public final OffsetAndEpoch snapshotId;
- public final boolean partial;
- public final boolean deleted;
- public SnapshotPath(Path path, OffsetAndEpoch snapshotId, boolean partial,
boolean deleted) {
- this.path = path;
- this.snapshotId = snapshotId;
- this.partial = partial;
- this.deleted = deleted;
- }
-
- @Override
- public String toString() {
- return String.format("SnapshotPath(path=%s, snapshotId=%s,
partial=%s)", path, snapshotId, partial);
- }
-}
+public record SnapshotPath(Path path, OffsetAndEpoch snapshotId, boolean
partial, boolean deleted) { }
\ No newline at end of file
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index a1c3392a064..9cde33d46f8 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -129,7 +129,7 @@ public class KafkaRaftClientReconfigTest {
context.unattachedToLeader();
// check if leader writes 3 bootstrap records to the log
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
Iterator<Record> recordIterator = batch.iterator();
@@ -210,7 +210,7 @@ public class KafkaRaftClientReconfigTest {
// check leader does not write bootstrap records to log
context.unattachedToLeader();
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
Iterator<Record> recordIterator = batch.iterator();
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 1f3307f9ada..c2ecf660471 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -1920,7 +1920,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId1, 0)
);
assertEquals(
- "Cannot create a snapshot with an id (OffsetAndEpoch(offset=4,
epoch=2)) greater than the high-watermark (0)",
+ "Cannot create a snapshot with an id (OffsetAndEpoch[offset=4,
epoch=2]) greater than the high-watermark (0)",
exception.getMessage()
);
@@ -1940,7 +1940,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId2, 0)
);
assertEquals(
- "Cannot create a snapshot with an id (OffsetAndEpoch(offset=7,
epoch=3)) greater than the high-watermark (4)",
+ "Cannot create a snapshot with an id (OffsetAndEpoch[offset=7,
epoch=3]) greater than the high-watermark (4)",
exception.getMessage()
);
@@ -1951,7 +1951,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId3, 0)
);
assertEquals(
- "Snapshot id (OffsetAndEpoch(offset=4, epoch=4)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=7, epoch=3))",
+ "Snapshot id (OffsetAndEpoch[offset=4, epoch=4]) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch[offset=7, epoch=3])",
exception.getMessage()
);
@@ -1964,7 +1964,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId4, 0)
);
assertEquals(
- "Snapshot id (OffsetAndEpoch(offset=4, epoch=2)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=2))",
+ "Snapshot id (OffsetAndEpoch[offset=4, epoch=2]) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch[offset=3, epoch=2])",
exception.getMessage()
);
@@ -2007,7 +2007,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId1, 0)
);
assertEquals(
- "Cannot create a snapshot with an id (OffsetAndEpoch(offset=1,
epoch=1)) greater than the high-watermark (0)",
+ "Cannot create a snapshot with an id (OffsetAndEpoch[offset=1,
epoch=1]) greater than the high-watermark (0)",
exception.getMessage()
);
@@ -2035,7 +2035,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId2, 0)
);
assertEquals(
- "Cannot create a snapshot with an id (OffsetAndEpoch(offset=1,
epoch=5)) greater than the high-watermark (0)",
+ "Cannot create a snapshot with an id (OffsetAndEpoch[offset=1,
epoch=5]) greater than the high-watermark (0)",
exception.getMessage()
);
@@ -2063,7 +2063,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId3, 0)
);
assertEquals(
- "Snapshot id (OffsetAndEpoch(offset=6, epoch=6)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=6, epoch=4))",
+ "Snapshot id (OffsetAndEpoch[offset=6, epoch=6]) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch[offset=6, epoch=4])",
exception.getMessage()
);
@@ -2076,7 +2076,7 @@ public final class KafkaRaftClientSnapshotTest {
() -> context.client.createSnapshot(invalidSnapshotId4, 0)
);
assertEquals(
- "Snapshot id (OffsetAndEpoch(offset=6, epoch=3)) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch(offset=3, epoch=3))",
+ "Snapshot id (OffsetAndEpoch[offset=6, epoch=3]) is not valid
according to the log: ValidOffsetAndEpoch(kind=DIVERGING,
offsetAndEpoch=OffsetAndEpoch[offset=3, epoch=3])",
exception.getMessage()
);
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index fdd20c9cd39..e8b042adb40 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -916,7 +916,7 @@ public class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochRequest(1, Set.of(otherNodeId));
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
@@ -965,7 +965,7 @@ public class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochRequest(2, Set.of(firstNodeId,
secondNodeId));
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(0, Isolation.UNCOMMITTED).records();
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 9fb4724cc0c..2d0bf538ce2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -514,7 +514,7 @@ public class MockLog implements ReplicatedLog {
);
}
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata().offset();
if (snapshotId.offset() != baseOffset) {
throw new IllegalArgumentException(
String.format(
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index eca0fe5d3de..7b2602c9e6e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -215,7 +215,7 @@ public class MockLogTest {
assertEquals(1, log.endOffset().offset());
assertEquals(currentEpoch, log.lastFetchedEpoch());
- Records records = log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = log.read(0, Isolation.UNCOMMITTED).records();
for (RecordBatch batch : records.batches()) {
assertTrue(batch.isControlBatch());
}
@@ -250,7 +250,7 @@ public class MockLogTest {
assertEquals(initialOffset + 1, log.endOffset().offset());
assertEquals(3, log.lastFetchedEpoch());
- Records records = log.read(5L, Isolation.UNCOMMITTED).records;
+ Records records = log.read(5L, Isolation.UNCOMMITTED).records();
List<ByteBuffer> extractRecords = new ArrayList<>();
for (Record record : records.records()) {
extractRecords.add(record.value());
@@ -276,7 +276,7 @@ public class MockLogTest {
appendAsLeader(Arrays.asList(recordOne, recordTwo), epoch);
- Records records = log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = log.read(0, Isolation.UNCOMMITTED).records();
List<ByteBuffer> extractRecords = new ArrayList<>();
for (Record record : records.records()) {
@@ -347,12 +347,12 @@ public class MockLogTest {
appendBatch(5, 1);
LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
- assertEquals(5L, readInfo.startOffsetMetadata.offset());
- assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
+ assertEquals(5L, readInfo.startOffsetMetadata().offset());
+ assertTrue(readInfo.startOffsetMetadata().metadata().isPresent());
// Update to a high watermark with valid offset metadata
- log.updateHighWatermark(readInfo.startOffsetMetadata);
- assertEquals(readInfo.startOffsetMetadata.offset(),
log.highWatermark().offset());
+ log.updateHighWatermark(readInfo.startOffsetMetadata());
+ assertEquals(readInfo.startOffsetMetadata().offset(),
log.highWatermark().offset());
// Now update to a high watermark with invalid metadata
assertThrows(IllegalArgumentException.class, () ->
@@ -361,17 +361,17 @@ public class MockLogTest {
// Ensure we can update the high watermark to the end offset
LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
- assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
- assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
- log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
+ assertEquals(15, readFromEndInfo.startOffsetMetadata().offset());
+
assertTrue(readFromEndInfo.startOffsetMetadata().metadata().isPresent());
+ log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
// Ensure that the end offset metadata is valid after new entries are
appended
appendBatch(5, 1);
- log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
+ log.updateHighWatermark(readFromEndInfo.startOffsetMetadata());
// Check handling of a fetch from the middle of a batch
LogFetchInfo readFromMiddleInfo = log.read(16L, Isolation.UNCOMMITTED);
- assertEquals(readFromEndInfo.startOffsetMetadata,
readFromMiddleInfo.startOffsetMetadata);
+ assertEquals(readFromEndInfo.startOffsetMetadata(),
readFromMiddleInfo.startOffsetMetadata());
}
@Test
@@ -1003,7 +1003,7 @@ public class MockLogTest {
while (foundRecord) {
foundRecord = false;
- Records records = log.read(currentStart, isolation).records;
+ Records records = log.read(currentStart, isolation).records();
for (Record record : records.records()) {
foundRecord = true;
@@ -1082,7 +1082,7 @@ public class MockLogTest {
int currentOffset = 0;
while (currentOffset < log.endOffset().offset()) {
- Records records = log.read(currentOffset,
Isolation.UNCOMMITTED).records;
+ Records records = log.read(currentOffset,
Isolation.UNCOMMITTED).records();
List<? extends RecordBatch> batches =
Utils.toList(records.batches().iterator());
assertFalse(batches.isEmpty());
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
index 5f7e25b5dde..5c96b044b42 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java
@@ -45,10 +45,10 @@ public final class SnapshotsTest {
Path path = Snapshots.snapshotPath(TestUtils.tempDirectory().toPath(),
snapshotId);
SnapshotPath snapshotPath = Snapshots.parse(path).get();
- assertEquals(path, snapshotPath.path);
- assertEquals(snapshotId, snapshotPath.snapshotId);
- assertFalse(snapshotPath.partial);
- assertFalse(snapshotPath.deleted);
+ assertEquals(path, snapshotPath.path());
+ assertEquals(snapshotId, snapshotPath.snapshotId());
+ assertFalse(snapshotPath.partial());
+ assertFalse(snapshotPath.deleted());
}
@Test
@@ -64,9 +64,9 @@ public final class SnapshotsTest {
SnapshotPath snapshotPath = Snapshots.parse(path).get();
- assertEquals(path, snapshotPath.path);
- assertEquals(snapshotId, snapshotPath.snapshotId);
- assertTrue(snapshotPath.partial);
+ assertEquals(path, snapshotPath.path());
+ assertEquals(snapshotId, snapshotPath.snapshotId());
+ assertTrue(snapshotPath.partial());
}
@Test
@@ -79,8 +79,8 @@ public final class SnapshotsTest {
Path deletedPath = Snapshots.deleteRenamePath(path, snapshotId);
SnapshotPath snapshotPath = Snapshots.parse(deletedPath).get();
- assertEquals(snapshotId, snapshotPath.snapshotId);
- assertTrue(snapshotPath.deleted);
+ assertEquals(snapshotId, snapshotPath.snapshotId());
+ assertTrue(snapshotPath.deleted());
}
@Test