This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 376365d9da8 MINOR; Add property methods to LogOffsetMetadata (#16521)
376365d9da8 is described below

commit 376365d9da8099e3eb9175090cc456f55985fcb0
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Thu Jul 4 15:03:32 2024 -0400

    MINOR; Add property methods to LogOffsetMetadata (#16521)
    
    This change simply adds property methods to LogOffsetMetadata. It
    changes all of the callers to use the new property methods instead of
    using the fields directly.
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 core/src/test/java/kafka/test/ClusterInstance.java |  2 +-
 .../java/org/apache/kafka/raft/FollowerState.java  |  2 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 28 +++----
 .../java/org/apache/kafka/raft/LeaderState.java    | 32 ++++----
 .../org/apache/kafka/raft/LogOffsetMetadata.java   | 13 +++-
 .../java/org/apache/kafka/raft/ReplicatedLog.java  |  2 +-
 .../internals/KRaftControlRecordStateMachine.java  |  2 +-
 .../kafka/raft/internals/KafkaRaftMetrics.java     |  6 +-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 16 ++--
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 88 +++++++++++-----------
 .../test/java/org/apache/kafka/raft/MockLog.java   | 48 ++++++------
 .../java/org/apache/kafka/raft/MockLogTest.java    | 60 +++++++--------
 .../apache/kafka/raft/RaftClientTestContext.java   |  8 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |  6 +-
 .../KRaftControlRecordStateMachineTest.java        | 38 +++++-----
 15 files changed, 181 insertions(+), 170 deletions(-)

diff --git a/core/src/test/java/kafka/test/ClusterInstance.java 
b/core/src/test/java/kafka/test/ClusterInstance.java
index d11d0764c8c..41cc8b485f1 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -194,7 +194,7 @@ public interface ClusterInstance {
         ), 60000L, topic + " metadata not propagated after 60000 ms");
 
         for (ControllerServer controller : controllers().values()) {
-            long controllerOffset = 
controller.raftManager().replicatedLog().endOffset().offset - 1;
+            long controllerOffset = 
controller.raftManager().replicatedLog().endOffset().offset() - 1;
             TestUtils.waitForCondition(
                 () -> brokers().values().stream().allMatch(broker -> 
((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= 
controllerOffset),
                 60000L, "Timeout waiting for controller metadata propagating 
to brokers");
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java 
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index c43d74d10ca..a4d635e0388 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -134,7 +134,7 @@ public class FollowerState implements EpochState {
         }
 
         if (highWatermark.isPresent()) {
-            long previousHighWatermark = highWatermark.get().offset;
+            long previousHighWatermark = highWatermark.get().offset();
             long updatedHighWatermark = newHighWatermark.getAsLong();
 
             if (updatedHighWatermark < 0) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 721472a5729..b9f9f9bef5d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -320,7 +320,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset(), 
currentTimeMs);
     }
 
     private void onUpdateLeaderHighWatermark(
@@ -334,12 +334,12 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             // After updating the high watermark, we first clear the append
             // purgatory so that we have an opportunity to route the pending
             // records still held in memory directly to the listener
-            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset(), 
currentTimeMs);
 
             // It is also possible that the high watermark is being updated
             // for the first time following the leader election, so we need
             // to give lagging listeners an opportunity to catch up as well
-            updateListenersProgress(highWatermark.offset);
+            updateListenersProgress(highWatermark.offset());
         });
     }
 
@@ -469,7 +469,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         // so there are no unknown voter connections. Report this metric as 0.
         kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
 
-        quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+        quorum.initialize(new OffsetAndEpoch(log.endOffset().offset(), 
log.lastFetchedEpoch()));
 
         long currentTimeMs = time.milliseconds();
         if (quorum.isLeader()) {
@@ -514,7 +514,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     }
 
     private OffsetAndEpoch endOffset() {
-        return new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch());
+        return new OffsetAndEpoch(log.endOffset().offset(), 
log.lastFetchedEpoch());
     }
 
     private void resetConnections() {
@@ -522,7 +522,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     }
 
     private void onBecomeLeader(long currentTimeMs) {
-        long endOffset = log.endOffset().offset;
+        long endOffset = log.endOffset().offset();
 
         BatchAccumulator<T> accumulator = new BatchAccumulator<>(
             quorum.epoch(),
@@ -1201,7 +1201,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                     .setErrorCode(error.code())
                     .setLogStartOffset(log.startOffset())
                     .setHighWatermark(
-                        highWatermark.map(offsetMetadata -> 
offsetMetadata.offset).orElse(-1L)
+                        highWatermark.map(offsetMetadata -> 
offsetMetadata.offset()).orElse(-1L)
                     );
 
                 partitionData.currentLeader()
@@ -1515,7 +1515,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                     divergingEpoch.endOffset(), divergingEpoch.epoch());
 
                 state.highWatermark().ifPresent(highWatermark -> {
-                    if (divergingOffsetAndEpoch.offset() < 
highWatermark.offset) {
+                    if (divergingOffsetAndEpoch.offset() < 
highWatermark.offset()) {
                         throw new KafkaException("The leader requested 
truncation to offset " +
                             divergingOffsetAndEpoch.offset() + ", which is 
below the current high watermark" +
                             " " + highWatermark);
@@ -1694,7 +1694,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = 
partitionSnapshotOpt.get();
         Optional<Errors> leaderValidation = validateLeaderOnlyRequest(
-                partitionSnapshot.currentLeaderEpoch()
+            partitionSnapshot.currentLeaderEpoch()
         );
         if (leaderValidation.isPresent()) {
             return RaftUtil.singletonFetchSnapshotResponse(
@@ -1935,7 +1935,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 // is always less than the snapshot id just downloaded.
                 partitionState.updateState();
 
-                updateFollowerHighWatermark(state, 
OptionalLong.of(log.highWatermark().offset));
+                updateFollowerHighWatermark(state, 
OptionalLong.of(log.highWatermark().offset()));
             } else {
                 throw new IllegalStateException(
                     String.format(
@@ -2365,7 +2365,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             fetchPartition -> fetchPartition
                 .setCurrentLeaderEpoch(quorum.epoch())
                 .setLastFetchedEpoch(log.lastFetchedEpoch())
-                .setFetchOffset(log.endOffset().offset)
+                .setFetchOffset(log.endOffset().offset())
                 .setReplicaDirectoryId(quorum.localDirectoryId())
         );
 
@@ -2766,7 +2766,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         // Check listener progress to see if reads are expected
         quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
-            updateListenersProgress(highWatermarkMetadata.offset);
+            updateListenersProgress(highWatermarkMetadata.offset());
         });
 
         // Notify the new listeners of the latest leader and epoch
@@ -3024,7 +3024,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
     @Override
     public long logEndOffset() {
-        return log.endOffset().offset;
+        return log.endOffset().offset();
     }
 
     @Override
@@ -3042,7 +3042,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     @Override
     public OptionalLong highWatermark() {
         if (isInitialized() && quorum.highWatermark().isPresent()) {
-            return OptionalLong.of(quorum.highWatermark().get().offset);
+            return OptionalLong.of(quorum.highWatermark().get().offset());
         } else {
             return OptionalLong.empty();
         }
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index eb1b86983a7..5e6dd8de6b5 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -266,14 +266,14 @@ public class LeaderState<T> implements EpochState {
             // as all records from previous epochs that the current leader has 
committed.
 
             LogOffsetMetadata highWatermarkUpdateMetadata = 
highWatermarkUpdateOpt.get();
-            long highWatermarkUpdateOffset = 
highWatermarkUpdateMetadata.offset;
+            long highWatermarkUpdateOffset = 
highWatermarkUpdateMetadata.offset();
 
             if (highWatermarkUpdateOffset > epochStartOffset) {
                 if (highWatermark.isPresent()) {
                     LogOffsetMetadata currentHighWatermarkMetadata = 
highWatermark.get();
-                    if (highWatermarkUpdateOffset > 
currentHighWatermarkMetadata.offset
-                        || (highWatermarkUpdateOffset == 
currentHighWatermarkMetadata.offset &&
-                            
!highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata)))
 {
+                    if (highWatermarkUpdateOffset > 
currentHighWatermarkMetadata.offset()
+                        || (highWatermarkUpdateOffset == 
currentHighWatermarkMetadata.offset() &&
+                            
!highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata())))
 {
                         Optional<LogOffsetMetadata> oldHighWatermark = 
highWatermark;
                         highWatermark = highWatermarkUpdateOpt;
                         logHighWatermarkUpdate(
@@ -283,12 +283,12 @@ public class LeaderState<T> implements EpochState {
                             followersByDescendingFetchOffset
                         );
                         return true;
-                    } else if (highWatermarkUpdateOffset < 
currentHighWatermarkMetadata.offset) {
+                    } else if (highWatermarkUpdateOffset < 
currentHighWatermarkMetadata.offset()) {
                         log.info("The latest computed high watermark {} is 
smaller than the current " +
                                 "value {}, which should only happen when voter 
set membership changes. If the voter " +
                                 "set has not changed this suggests that one of 
the voters has lost committed data. " +
                                 "Full voter replication state: {}", 
highWatermarkUpdateOffset,
-                            currentHighWatermarkMetadata.offset, 
voterStates.values());
+                            currentHighWatermarkMetadata.offset(), 
voterStates.values());
                         return false;
                     } else {
                         return false;
@@ -347,9 +347,9 @@ public class LeaderState<T> implements EpochState {
     ) {
         ReplicaState state = getOrCreateReplicaState(localReplicaKey);
         state.endOffset.ifPresent(currentEndOffset -> {
-            if (currentEndOffset.offset > endOffsetMetadata.offset) {
+            if (currentEndOffset.offset() > endOffsetMetadata.offset()) {
                 throw new IllegalStateException("Detected non-monotonic update 
of local " +
-                    "end offset: " + currentEndOffset.offset + " -> " + 
endOffsetMetadata.offset);
+                    "end offset: " + currentEndOffset.offset() + " -> " + 
endOffsetMetadata.offset());
             }
         });
 
@@ -386,9 +386,9 @@ public class LeaderState<T> implements EpochState {
         ReplicaState state = getOrCreateReplicaState(replicaKey);
 
         state.endOffset.ifPresent(currentEndOffset -> {
-            if (currentEndOffset.offset > fetchOffsetMetadata.offset) {
+            if (currentEndOffset.offset() > fetchOffsetMetadata.offset()) {
                 log.warn("Detected non-monotonic update of fetch offset from 
nodeId {}: {} -> {}",
-                    state.replicaKey, currentEndOffset.offset, 
fetchOffsetMetadata.offset);
+                    state.replicaKey, currentEndOffset.offset(), 
fetchOffsetMetadata.offset());
             }
         });
 
@@ -460,7 +460,7 @@ public class LeaderState<T> implements EpochState {
             .setErrorCode(Errors.NONE.code())
             .setLeaderId(localReplicaKey.id())
             .setLeaderEpoch(epoch)
-            .setHighWatermark(highWatermark.map(offsetMetadata -> 
offsetMetadata.offset).orElse(-1L))
+            
.setHighWatermark(highWatermark.map(LogOffsetMetadata::offset).orElse(-1L))
             .setCurrentVoters(describeReplicaStates(voterStates.values(), 
currentTimeMs))
             .setObservers(describeReplicaStates(observerStates.values(), 
currentTimeMs));
     }
@@ -499,7 +499,7 @@ public class LeaderState<T> implements EpochState {
         // KAFKA-16953 will add support for the replica directory id
         return new DescribeQuorumResponseData.ReplicaState()
             .setReplicaId(replicaState.replicaKey.id())
-            .setLogEndOffset(replicaState.endOffset.map(md -> 
md.offset).orElse(-1L))
+            
.setLogEndOffset(replicaState.endOffset.map(LogOffsetMetadata::offset).orElse(-1L))
             .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
             .setLastFetchTimestamp(lastFetchTimestamp);
 
@@ -614,13 +614,13 @@ public class LeaderState<T> implements EpochState {
             // This allows us to use the previous value for 
`lastFetchTimestamp` if the
             // follower was able to catch up to `lastFetchLeaderLogEndOffset` 
on this fetch.
             leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
-                if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
+                if (fetchOffsetMetadata.offset() >= leaderEndOffset.offset()) {
                     lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, 
currentTimeMs);
                 } else if (lastFetchLeaderLogEndOffset > 0
-                    && fetchOffsetMetadata.offset >= 
lastFetchLeaderLogEndOffset) {
+                    && fetchOffsetMetadata.offset() >= 
lastFetchLeaderLogEndOffset) {
                     lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, 
lastFetchTimestamp);
                 }
-                lastFetchLeaderLogEndOffset = leaderEndOffset.offset;
+                lastFetchLeaderLogEndOffset = leaderEndOffset.offset();
             });
 
             lastFetchTimestamp = Math.max(lastFetchTimestamp, currentTimeMs);
@@ -637,7 +637,7 @@ public class LeaderState<T> implements EpochState {
             else if (!that.endOffset.isPresent())
                 return -1;
             else
-                return Long.compare(that.endOffset.get().offset, 
this.endOffset.get().offset);
+                return Long.compare(that.endOffset.get().offset(), 
this.endOffset.get().offset());
         }
 
         @Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java 
b/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java
index 6a966192cb0..38a90c84a4e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java
@@ -23,9 +23,8 @@ import java.util.Optional;
  * Metadata for specific local log offset
  */
 public class LogOffsetMetadata {
-
-    public final long offset;
-    public final Optional<OffsetMetadata> metadata;
+    private final long offset;
+    private final Optional<OffsetMetadata> metadata;
 
     public LogOffsetMetadata(long offset) {
         this(offset, Optional.empty());
@@ -36,6 +35,14 @@ public class LogOffsetMetadata {
         this.metadata = metadata;
     }
 
+    public long offset() {
+        return offset;
+    }
+
+    public Optional<OffsetMetadata> metadata() {
+        return metadata;
+    }
+
     @Override
     public String toString() {
         return "LogOffsetMetadata(offset=" + offset +
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java 
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index dbf5133f35b..a22f7fd73cd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -214,7 +214,7 @@ public interface ReplicatedLog extends AutoCloseable {
         final long truncationOffset;
         int leaderEpoch = endOffset.epoch();
         if (leaderEpoch == 0) {
-            truncationOffset = Math.min(endOffset.offset(), 
endOffset().offset);
+            truncationOffset = Math.min(endOffset.offset(), 
endOffset().offset());
         } else {
             OffsetAndEpoch localEndOffset = endOffsetForEpoch(leaderEpoch);
             if (localEndOffset.epoch() == leaderEpoch) {
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index c4c7b274fab..b57f5e104f9 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -189,7 +189,7 @@ public final class KRaftControlRecordStateMachine {
     }
 
     private void maybeLoadLog() {
-        while (log.endOffset().offset > nextOffset) {
+        while (log.endOffset().offset() > nextOffset) {
             LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
             try (RecordsIterator<?> iterator = new RecordsIterator<>(
                     info.records,
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index 3bdac5ffc77..2b7617df359 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.WindowedSum;
+import org.apache.kafka.raft.LogOffsetMetadata;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.QuorumState;
 
@@ -121,7 +122,10 @@ public class KafkaRaftMetrics implements AutoCloseable {
         metrics.addMetric(this.currentEpochMetricName, (mConfig, 
currentTimeMs) -> state.epoch());
 
         this.highWatermarkMetricName = metrics.metricName("high-watermark", 
metricGroupName, "The high watermark maintained on this member; -1 if it is 
unknown");
-        metrics.addMetric(this.highWatermarkMetricName, (mConfig, 
currentTimeMs) -> state.highWatermark().map(hw -> hw.offset).orElse(-1L));
+        metrics.addMetric(
+            this.highWatermarkMetricName,
+            (mConfig, currentTimeMs) -> 
state.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L)
+        );
 
         this.logEndOffsetMetricName = metrics.metricName("log-end-offset", 
metricGroupName, "The current raft log end offset.");
         metrics.addMetric(this.logEndOffsetMetricName, (mConfig, 
currentTimeMs) -> logEndOffset.offset());
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 2eb2f65ca0e..02a92e8303e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -117,7 +117,7 @@ public final class KafkaRaftClientSnapshotTest {
         int epoch = context.currentEpoch();
 
         // Advance the highWatermark
-        long localLogEndOffset = context.log.endOffset().offset;
+        long localLogEndOffset = context.log.endOffset().offset();
         context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 
localLogEndOffset, epoch, 0));
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
@@ -152,7 +152,7 @@ public final class KafkaRaftClientSnapshotTest {
         RaftClientTestContext context = contextBuilder.build();
 
         // Advance the highWatermark
-        long localLogEndOffset = context.log.endOffset().offset;
+        long localLogEndOffset = context.log.endOffset().offset();
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, 
snapshotId.epoch());
@@ -194,7 +194,7 @@ public final class KafkaRaftClientSnapshotTest {
         RaftClientTestContext context = contextBuilder.build();
 
         // Advance the highWatermark
-        long localLogEndOffset = context.log.endOffset().offset;
+        long localLogEndOffset = context.log.endOffset().offset();
         context.pollUntilRequest();
         RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
         context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, 
snapshotId.epoch());
@@ -242,7 +242,7 @@ public final class KafkaRaftClientSnapshotTest {
         context.listener.updateReadCommit(false);
 
         // Advance the highWatermark
-        long localLogEndOffset = context.log.endOffset().offset;
+        long localLogEndOffset = context.log.endOffset().offset();
         context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 
localLogEndOffset, epoch, 0));
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
@@ -328,7 +328,7 @@ public final class KafkaRaftClientSnapshotTest {
         context.time.sleep(context.appendLingerMs());
         context.client.poll();
 
-        long localLogEndOffset = context.log.endOffset().offset;
+        long localLogEndOffset = context.log.endOffset().offset();
         assertTrue(
             appendRecords.size() <= localLogEndOffset,
             String.format("Record length = %s, log end offset = %s", 
appendRecords.size(), localLogEndOffset)
@@ -377,7 +377,7 @@ public final class KafkaRaftClientSnapshotTest {
         context.time.sleep(context.appendLingerMs());
         context.client.poll();
 
-        long localLogEndOffset = context.log.endOffset().offset;
+        long localLogEndOffset = context.log.endOffset().offset();
         assertTrue(
             appendRecords.size() <= localLogEndOffset,
             String.format("Record length = %s, log end offset = %s", 
appendRecords.size(), localLogEndOffset)
@@ -631,7 +631,7 @@ public final class KafkaRaftClientSnapshotTest {
             context.fetchRequest(
                 epoch,
                 otherNodeKey,
-                context.log.endOffset().offset,
+                context.log.endOffset().offset(),
                 oldestSnapshotId.epoch() - 1,
                 0
             )
@@ -1887,7 +1887,7 @@ public final class KafkaRaftClientSnapshotTest {
         context.client.scheduleAppend(currentEpoch, newRecords);
         context.time.sleep(context.appendLingerMs());
         context.client.poll();
-        assertEquals(context.log.endOffset().offset, 
context.client.highWatermark().getAsLong() + newRecords.size());
+        assertEquals(context.log.endOffset().offset(), 
context.client.highWatermark().getAsLong() + newRecords.size());
 
         OffsetAndEpoch invalidSnapshotId2 = new 
OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch);
         assertThrows(IllegalArgumentException.class, () -> 
context.client.createSnapshot(invalidSnapshotId2, 0));
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index a7cf801bf3d..a8c108bb2d9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -87,7 +87,7 @@ public class KafkaRaftClientTest {
             .withKip853Rpc(withKip853Rpc)
             .build();
         context.assertElectedLeader(1, localId);
-        assertEquals(context.log.endOffset().offset, 
context.client.logEndOffset());
+        assertEquals(context.log.endOffset().offset(), 
context.client.logEndOffset());
     }
 
     @ParameterizedTest
@@ -103,8 +103,8 @@ public class KafkaRaftClientTest {
             .withElectedLeader(initialEpoch, localId)
             .build();
 
-        context.pollUntil(() -> context.log.endOffset().offset == 1L);
-        assertEquals(1L, context.log.endOffset().offset);
+        context.pollUntil(() -> context.log.endOffset().offset() == 1L);
+        assertEquals(1L, context.log.endOffset().offset());
         assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch());
         assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch 
+ 1),
             context.currentLeaderAndEpoch());
@@ -125,7 +125,7 @@ public class KafkaRaftClientTest {
             .withKip853Rpc(withKip853Rpc)
             .build();
 
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         context.assertElectedLeader(epoch, localId);
 
         // Since we were the leader in epoch 2, we should ensure that we will 
not vote for any
@@ -135,7 +135,7 @@ public class KafkaRaftClientTest {
                 epoch,
                 remoteKey,
                 context.log.lastFetchedEpoch(),
-                context.log.endOffset().offset
+                context.log.endOffset().offset()
             )
         );
         context.pollUntilResponse();
@@ -156,7 +156,7 @@ public class KafkaRaftClientTest {
             .withKip853Rpc(withKip853Rpc)
             .build();
 
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         context.assertVotedCandidate(epoch, localId);
 
         // Since we were the leader in epoch 2, we should ensure that we will 
not vote for any
@@ -166,7 +166,7 @@ public class KafkaRaftClientTest {
                 epoch,
                 remoteKey,
                 context.log.lastFetchedEpoch(),
-                context.log.endOffset().offset
+                context.log.endOffset().offset()
             )
         );
         context.pollUntilResponse();
@@ -189,7 +189,7 @@ public class KafkaRaftClientTest {
 
         // Resign from leader, will restart in resigned state
         assertTrue(context.client.quorum().isResigned());
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         context.assertElectedLeader(epoch, localId);
 
         // Send vote request with higher epoch
@@ -198,7 +198,7 @@ public class KafkaRaftClientTest {
                 epoch + 1,
                 remoteKey,
                 context.log.lastFetchedEpoch(),
-                context.log.endOffset().offset
+                context.log.endOffset().offset()
             )
         );
         context.client.poll();
@@ -225,7 +225,7 @@ public class KafkaRaftClientTest {
 
         // Resign from candidate, will restart in candidate state
         assertTrue(context.client.quorum().isCandidate());
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         context.assertVotedCandidate(epoch, localId);
 
         // Send vote request with higher epoch
@@ -234,7 +234,7 @@ public class KafkaRaftClientTest {
                 epoch + 1,
                 remoteKey,
                 context.log.lastFetchedEpoch(),
-                context.log.endOffset().offset
+                context.log.endOffset().offset()
             )
         );
         context.client.poll();
@@ -269,7 +269,7 @@ public class KafkaRaftClientTest {
                 epoch + 1,
                 remoteKey,
                 context.log.lastFetchedEpoch(),
-                context.log.endOffset().offset
+                context.log.endOffset().offset()
             )
         );
         context.client.poll();
@@ -296,7 +296,7 @@ public class KafkaRaftClientTest {
 
         // Resign from leader, will restart in resigned state
         assertTrue(context.client.quorum().isResigned());
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         context.assertElectedLeader(epoch, localId);
 
         // Election timeout
@@ -323,7 +323,7 @@ public class KafkaRaftClientTest {
 
         // The node will remain elected, but start up in a resigned state
         // in which no additional writes are accepted.
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         context.assertElectedLeader(epoch, localId);
         context.client.poll();
         assertThrows(NotLeaderException.class, () -> 
context.client.scheduleAppend(epoch, Arrays.asList("a", "b")));
@@ -492,12 +492,12 @@ public class KafkaRaftClientTest {
 
         // send fetch request when become leader
         int epoch = context.currentEpoch();
-        context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 
context.log.endOffset().offset, epoch, 1000));
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 
context.log.endOffset().offset(), epoch, 1000));
         context.client.poll();
 
         // append some record, but the fetch in purgatory will still fail
         context.log.appendAsLeader(
-            context.buildBatch(context.log.endOffset().offset, epoch, 
singletonList("raft")),
+            context.buildBatch(context.log.endOffset().offset(), epoch, 
singletonList("raft")),
             epoch
         );
 
@@ -800,7 +800,7 @@ public class KafkaRaftClientTest {
             .withKip853Rpc(withKip853Rpc)
             .build();
         context.assertVotedCandidate(2, localId);
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
 
         // The candidate will resume the election after reinitialization
         context.pollUntilRequest();
@@ -832,12 +832,12 @@ public class KafkaRaftClientTest {
         );
 
         // Become leader after receiving the vote
-        context.pollUntil(() -> context.log.endOffset().offset == 1L);
+        context.pollUntil(() -> context.log.endOffset().offset() == 1L);
         context.assertElectedLeader(1, localId);
         long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
         assertEquals(1L, context.log.firstUnflushedOffset());
 
         // Send BeginQuorumEpoch to voters
@@ -879,12 +879,12 @@ public class KafkaRaftClientTest {
         );
 
         // Become leader after receiving the vote
-        context.pollUntil(() -> context.log.endOffset().offset == 1L);
+        context.pollUntil(() -> context.log.endOffset().offset() == 1L);
         context.assertElectedLeader(1, localId);
         long electionTimestamp = context.time.milliseconds();
 
         // Leader change record appended
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
         assertEquals(1L, context.log.firstUnflushedOffset());
 
         // Send BeginQuorumEpoch to voters
@@ -1110,7 +1110,7 @@ public class KafkaRaftClientTest {
 
         assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
         context.deliverRequest(
-            context.voteRequest(epoch + 1, otherNodeKey, epoch, 
context.log.endOffset().offset)
+            context.voteRequest(epoch + 1, otherNodeKey, epoch, 
context.log.endOffset().offset())
         );
         context.pollUntilResponse();
 
@@ -1168,7 +1168,7 @@ public class KafkaRaftClientTest {
 
         context.becomeLeader();
         assertEquals(OptionalInt.of(localId), context.currentLeader());
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
 
         int epoch = context.currentEpoch();
         assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
@@ -1183,7 +1183,7 @@ public class KafkaRaftClientTest {
 
         context.time.sleep(30);
         context.client.poll();
-        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.endOffset().offset());
     }
 
     @ParameterizedTest
@@ -1204,7 +1204,7 @@ public class KafkaRaftClientTest {
 
         context.becomeLeader();
         assertEquals(OptionalInt.of(localId), context.currentLeader());
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
 
         int epoch = context.currentEpoch();
         assertEquals(1L, context.client.scheduleAppend(epoch, 
singletonList("a")));
@@ -1219,7 +1219,7 @@ public class KafkaRaftClientTest {
         assertTrue(context.messageQueue.wakeupRequested());
 
         context.client.poll();
-        assertEquals(3L, context.log.endOffset().offset);
+        assertEquals(3L, context.log.endOffset().offset());
     }
 
     @ParameterizedTest
@@ -1481,7 +1481,7 @@ public class KafkaRaftClientTest {
         // First poll has no high watermark advance
         context.client.poll();
         assertEquals(OptionalLong.empty(), context.client.highWatermark());
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
 
         // Let follower send a fetch to initialize the high watermark,
         // note the offset 0 would be a control message for becoming the leader
@@ -1984,7 +1984,7 @@ public class KafkaRaftClientTest {
         // First poll has no high watermark advance.
         context.client.poll();
         assertEquals(OptionalLong.empty(), context.client.highWatermark());
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
 
         // Now we will advance the high watermark with a follower fetch 
request.
         FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeKey, 1L, epoch, 0);
@@ -2433,7 +2433,7 @@ public class KafkaRaftClientTest {
         );
 
         context.client.poll();
-        assertEquals(0, context.log.endOffset().offset);
+        assertEquals(0, context.log.endOffset().offset());
         context.assertVotedCandidate(epoch + 1, localId);
     }
 
@@ -2475,7 +2475,7 @@ public class KafkaRaftClientTest {
         );
 
         context.client.poll();
-        assertEquals(0, context.log.endOffset().offset);
+        assertEquals(0, context.log.endOffset().offset());
         context.assertElectedLeader(epoch + 1, voter3);
     }
 
@@ -2956,7 +2956,7 @@ public class KafkaRaftClientTest {
         );
 
         context.client.poll();
-        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.endOffset().offset());
         assertEquals(2L, context.log.firstUnflushedOffset());
     }
 
@@ -2990,7 +2990,7 @@ public class KafkaRaftClientTest {
             fetchResponse
         );
         context.client.poll();
-        assertEquals(0L, context.log.endOffset().offset);
+        assertEquals(0L, context.log.endOffset().offset());
         assertEquals(OptionalLong.of(0L), context.client.highWatermark());
 
         // Receive some records in the next poll, but do not advance high 
watermark
@@ -3004,7 +3004,7 @@ public class KafkaRaftClientTest {
             fetchResponse
         );
         context.client.poll();
-        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.endOffset().offset());
         assertEquals(OptionalLong.of(0L), context.client.highWatermark());
 
         // The next fetch response is empty, but should still advance the high 
watermark
@@ -3023,7 +3023,7 @@ public class KafkaRaftClientTest {
             fetchResponse
         );
         context.client.poll();
-        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.endOffset().offset());
         assertEquals(OptionalLong.of(2L), context.client.highWatermark());
     }
 
@@ -3074,7 +3074,7 @@ public class KafkaRaftClientTest {
             .build();
         long now = context.time.milliseconds();
 
-        context.pollUntil(() -> context.log.endOffset().offset == 1L);
+        context.pollUntil(() -> context.log.endOffset().offset() == 1L);
         context.assertElectedLeader(1, localId);
 
         // We still write the leader change message
@@ -3156,7 +3156,7 @@ public class KafkaRaftClientTest {
             .build();
 
         context.assertElectedLeader(epoch, otherNodeId);
-        assertEquals(3L, context.log.endOffset().offset);
+        assertEquals(3L, context.log.endOffset().offset());
 
         context.pollUntilRequest();
 
@@ -3168,7 +3168,7 @@ public class KafkaRaftClientTest {
 
         // Poll again to complete truncation
         context.client.poll();
-        assertEquals(2L, context.log.endOffset().offset);
+        assertEquals(2L, context.log.endOffset().offset());
         assertEquals(2L, context.log.firstUnflushedOffset());
 
         // Now we should be fetching
@@ -3184,7 +3184,7 @@ public class KafkaRaftClientTest {
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
             .withKip853Rpc(withKip853Rpc)
             .build();
-        context.pollUntil(() -> context.log.endOffset().offset == 1L);
+        context.pollUntil(() -> context.log.endOffset().offset() == 1L);
 
         assertNotNull(getMetric(context.metrics, "current-state"));
         assertNotNull(getMetric(context.metrics, "current-leader"));
@@ -3350,7 +3350,7 @@ public class KafkaRaftClientTest {
         int epoch = context.currentEpoch();
 
         // After becoming leader, we expect the `LeaderChange` record to be 
appended.
-        assertEquals(1L, context.log.endOffset().offset);
+        assertEquals(1L, context.log.endOffset().offset());
 
         // The high watermark is not known to the leader until the followers
         // begin fetching, so we should not have fired the 
`handleLeaderChange` callback.
@@ -3407,7 +3407,7 @@ public class KafkaRaftClientTest {
 
         // After becoming leader, we expect the `LeaderChange` record to be 
appended
         // in addition to the initial 9 records in the log.
-        assertEquals(10L, context.log.endOffset().offset);
+        assertEquals(10L, context.log.endOffset().offset());
 
         // The high watermark is not known to the leader until the followers
         // begin fetching, so we should not have fired the 
`handleLeaderChange` callback.
@@ -3470,7 +3470,7 @@ public class KafkaRaftClientTest {
 
         context.becomeLeader();
         context.client.poll();
-        assertEquals(10L, context.log.endOffset().offset);
+        assertEquals(10L, context.log.endOffset().offset());
 
         // Let the initial listener catch up
         context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 10L, 
epoch, 0));
@@ -3513,7 +3513,7 @@ public class KafkaRaftClientTest {
 
         context.becomeLeader();
         context.client.poll();
-        assertEquals(10L, context.log.endOffset().offset);
+        assertEquals(10L, context.log.endOffset().offset());
 
         // Let the initial listener catch up
         context.advanceLocalLeaderHighWatermarkToLogEndOffset();
@@ -3662,7 +3662,7 @@ public class KafkaRaftClientTest {
 
         // Start off as the leader and receive a fetch to initialize the high 
watermark
         context.becomeLeader();
-        assertEquals(10L, context.log.endOffset().offset);
+        assertEquals(10L, context.log.endOffset().offset());
 
         context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 10L, 
epoch, 0));
         context.pollUntilResponse();
@@ -3813,7 +3813,7 @@ public class KafkaRaftClientTest {
             context.fetchResponse(leaderEpoch, leaderId, batch1, 0L, 
Errors.NONE)
         );
         context.client.poll();
-        assertEquals(3L, context.log.endOffset().offset);
+        assertEquals(3L, context.log.endOffset().offset());
         assertEquals(3, context.log.lastFetchedEpoch());
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 8247e137086..b8c2089caf2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -81,7 +81,7 @@ public class MockLog implements ReplicatedLog {
 
     @Override
     public void truncateTo(long offset) {
-        if (offset < highWatermark.offset) {
+        if (offset < highWatermark.offset()) {
             throw new IllegalArgumentException("Illegal attempt to truncate to 
offset " + offset +
                 " which is below the current high watermark " + highWatermark);
         }
@@ -89,7 +89,7 @@ public class MockLog implements ReplicatedLog {
         logger.debug("Truncating log to end offset {}", offset);
         batches.removeIf(entry -> entry.lastOffset() >= offset);
         epochStartOffsets.removeIf(epochStartOffset -> 
epochStartOffset.startOffset >= offset);
-        firstUnflushedOffset = Math.min(firstUnflushedOffset, 
endOffset().offset);
+        firstUnflushedOffset = Math.min(firstUnflushedOffset, 
endOffset().offset());
     }
 
     @Override
@@ -98,7 +98,7 @@ public class MockLog implements ReplicatedLog {
         latestSnapshotId().ifPresent(snapshotId -> {
             if (snapshotId.epoch() > logLastFetchedEpoch().orElse(0) ||
                 (snapshotId.epoch() == logLastFetchedEpoch().orElse(0) &&
-                 snapshotId.offset() > endOffset().offset)) {
+                 snapshotId.offset() > endOffset().offset())) {
 
                 logger.debug("Truncating to the latest snapshot at {}", 
snapshotId);
 
@@ -117,13 +117,13 @@ public class MockLog implements ReplicatedLog {
 
     @Override
     public void updateHighWatermark(LogOffsetMetadata offsetMetadata) {
-        if (this.highWatermark.offset > offsetMetadata.offset) {
+        if (this.highWatermark.offset() > offsetMetadata.offset()) {
             throw new IllegalArgumentException("Non-monotonic update of 
current high watermark " +
                 highWatermark + " to new value " + offsetMetadata);
-        } else if (offsetMetadata.offset > endOffset().offset) {
+        } else if (offsetMetadata.offset() > endOffset().offset()) {
             throw new IllegalArgumentException("Attempt to update high 
watermark to " + offsetMetadata +
                 " which is larger than the current end offset " + endOffset());
-        } else if (offsetMetadata.offset < startOffset()) {
+        } else if (offsetMetadata.offset() < startOffset()) {
             throw new IllegalArgumentException("Attempt to update high 
watermark to " + offsetMetadata +
                 " which is smaller than the current start offset " + 
startOffset());
         }
@@ -148,8 +148,8 @@ public class MockLog implements ReplicatedLog {
     }
 
     private Optional<OffsetMetadata> metadataForOffset(long offset) {
-        if (offset == endOffset().offset) {
-            return endOffset().metadata;
+        if (offset == endOffset().offset()) {
+            return endOffset().metadata();
         }
 
         for (LogBatch batch : batches) {
@@ -167,12 +167,12 @@ public class MockLog implements ReplicatedLog {
     }
 
     private void assertValidHighWatermarkMetadata(LogOffsetMetadata 
offsetMetadata) {
-        if (!offsetMetadata.metadata.isPresent()) {
+        if (!offsetMetadata.metadata().isPresent()) {
             return;
         }
 
-        long id = ((MockOffsetMetadata) offsetMetadata.metadata.get()).id;
-        long offset = offsetMetadata.offset;
+        long id = ((MockOffsetMetadata) offsetMetadata.metadata().get()).id;
+        long offset = offsetMetadata.offset();
 
         metadataForOffset(offset).ifPresent(metadata -> {
             long entryId = ((MockOffsetMetadata) metadata).id;
@@ -211,7 +211,7 @@ public class MockLog implements ReplicatedLog {
             epochLowerBound = epochStartOffset.epoch;
         }
 
-        return new OffsetAndEpoch(endOffset().offset, lastFetchedEpoch());
+        return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch());
     }
 
     private Optional<LogEntry> lastEntry() {
@@ -300,10 +300,10 @@ public class MockLog implements ReplicatedLog {
         if (records.sizeInBytes() == 0)
             throw new IllegalArgumentException("Attempt to append an empty 
record set");
 
-        long baseOffset = endOffset().offset;
+        long baseOffset = endOffset().offset();
         long lastOffset = baseOffset;
         for (RecordBatch batch : records.batches()) {
-            if (batch.baseOffset() != endOffset().offset) {
+            if (batch.baseOffset() != endOffset().offset()) {
                 /* KafkaMetadataLog throws an 
kafka.common.UnexpectedAppendOffsetException this is the
                  * best we can do from this module.
                  */
@@ -311,7 +311,7 @@ public class MockLog implements ReplicatedLog {
                     String.format(
                         "Illegal append at offset %s with current end offset 
of %s",
                         batch.baseOffset(),
-                        endOffset().offset
+                        endOffset().offset()
                     )
                 );
             }
@@ -340,7 +340,7 @@ public class MockLog implements ReplicatedLog {
     @Override
     public void flush(boolean forceFlushActiveSegment) {
         flushedSinceLastChecked = true;
-        firstUnflushedOffset = endOffset().offset;
+        firstUnflushedOffset = endOffset().offset();
     }
 
     @Override
@@ -373,7 +373,7 @@ public class MockLog implements ReplicatedLog {
     public List<LogBatch> readBatches(long startOffset, OptionalLong 
maxOffsetOpt) {
         verifyOffsetInRange(startOffset);
 
-        long maxOffset = maxOffsetOpt.orElse(endOffset().offset);
+        long maxOffset = maxOffsetOpt.orElse(endOffset().offset());
         if (startOffset == maxOffset) {
             return Collections.emptyList();
         }
@@ -384,9 +384,9 @@ public class MockLog implements ReplicatedLog {
     }
 
     private void verifyOffsetInRange(long offset) {
-        if (offset > endOffset().offset) {
+        if (offset > endOffset().offset()) {
             throw new OffsetOutOfRangeException("Requested offset " + offset + 
" is larger than " +
-                "then log end offset " + endOffset().offset);
+                "then log end offset " + endOffset().offset());
         }
 
         if (offset < this.startOffset()) {
@@ -399,7 +399,7 @@ public class MockLog implements ReplicatedLog {
     public LogFetchInfo read(long startOffset, Isolation isolation) {
         verifyOffsetInRange(startOffset);
 
-        long maxOffset = isolation == Isolation.COMMITTED ? 
highWatermark.offset : endOffset().offset;
+        long maxOffset = isolation == Isolation.COMMITTED ? 
highWatermark.offset() : endOffset().offset();
         if (startOffset >= maxOffset) {
             return new LogFetchInfo(MemoryRecords.EMPTY, new LogOffsetMetadata(
                 startOffset, metadataForOffset(startOffset)));
@@ -450,7 +450,7 @@ public class MockLog implements ReplicatedLog {
 
     @Override
     public void initializeLeaderEpoch(int epoch) {
-        long startOffset = endOffset().offset;
+        long startOffset = endOffset().offset();
         epochStartOffsets.removeIf(epochStartOffset ->
             epochStartOffset.startOffset >= startOffset || 
epochStartOffset.epoch >= epoch);
         epochStartOffsets.add(new EpochStartOffset(epoch, startOffset));
@@ -468,7 +468,7 @@ public class MockLog implements ReplicatedLog {
             return Optional.empty();
         }
 
-        long highWatermarkOffset = highWatermark().offset;
+        long highWatermarkOffset = highWatermark().offset();
         if (snapshotId.offset() > highWatermarkOffset) {
             throw new IllegalArgumentException(
                 String.format(
@@ -542,12 +542,12 @@ public class MockLog implements ReplicatedLog {
                 )
             );
         }
-        if (highWatermark.offset < snapshotId.offset()) {
+        if (highWatermark.offset() < snapshotId.offset()) {
             throw new OffsetOutOfRangeException(
                 String.format(
                     "New log start (%s) is greater than the high watermark 
(%s)",
                     snapshotId,
-                    highWatermark.offset
+                    highWatermark.offset()
                 )
             );
         }
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index c56bbe01adb..0c389a41522 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -87,15 +87,15 @@ public class MockLogTest {
         appendAsLeader(Collections.singleton(recordThree), epoch);
 
         assertEquals(0L, log.startOffset());
-        assertEquals(3L, log.endOffset().offset);
+        assertEquals(3L, log.endOffset().offset());
 
         log.truncateTo(2);
         assertEquals(0L, log.startOffset());
-        assertEquals(2L, log.endOffset().offset);
+        assertEquals(2L, log.endOffset().offset());
 
         log.truncateTo(1);
         assertEquals(0L, log.startOffset());
-        assertEquals(0L, log.endOffset().offset);
+        assertEquals(0L, log.endOffset().offset());
     }
 
     @Test
@@ -113,7 +113,7 @@ public class MockLogTest {
         appendBatch(5, 1);
         LogOffsetMetadata newOffset = new LogOffsetMetadata(5L);
         log.updateHighWatermark(newOffset);
-        assertEquals(newOffset.offset, log.highWatermark().offset);
+        assertEquals(newOffset.offset(), log.highWatermark().offset());
     }
 
     @Test
@@ -158,7 +158,7 @@ public class MockLogTest {
     public void testUnexpectedAppendOffset() {
         SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
         final int currentEpoch = 3;
-        final long initialOffset = log.endOffset().offset;
+        final long initialOffset = log.endOffset().offset();
 
         log.appendAsLeader(
             MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo),
@@ -198,7 +198,7 @@ public class MockLogTest {
         );
 
         assertEquals(0, log.startOffset());
-        assertEquals(1, log.endOffset().offset);
+        assertEquals(1, log.endOffset().offset());
         assertEquals(currentEpoch, log.lastFetchedEpoch());
 
         Records records = log.read(0, Isolation.UNCOMMITTED).records;
@@ -230,7 +230,7 @@ public class MockLogTest {
         log.appendAsFollower(MemoryRecords.withRecords(initialOffset, 
Compression.NONE, epoch, recordFoo));
 
         assertEquals(initialOffset, log.startOffset());
-        assertEquals(initialOffset + 1, log.endOffset().offset);
+        assertEquals(initialOffset + 1, log.endOffset().offset());
         assertEquals(3, log.lastFetchedEpoch());
 
         Records records = log.read(5L, Isolation.UNCOMMITTED).records;
@@ -242,7 +242,7 @@ public class MockLogTest {
         assertEquals(1, extractRecords.size());
         assertEquals(recordFoo.value(), extractRecords.get(0));
         assertEquals(new OffsetAndEpoch(5, 0), log.endOffsetForEpoch(0));
-        assertEquals(new OffsetAndEpoch(log.endOffset().offset, epoch), 
log.endOffsetForEpoch(epoch));
+        assertEquals(new OffsetAndEpoch(log.endOffset().offset(), epoch), 
log.endOffsetForEpoch(epoch));
     }
 
     @Test
@@ -330,12 +330,12 @@ public class MockLogTest {
         appendBatch(5, 1);
 
         LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
-        assertEquals(5L, readInfo.startOffsetMetadata.offset);
-        assertTrue(readInfo.startOffsetMetadata.metadata.isPresent());
+        assertEquals(5L, readInfo.startOffsetMetadata.offset());
+        assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
 
         // Update to a high watermark with valid offset metadata
         log.updateHighWatermark(readInfo.startOffsetMetadata);
-        assertEquals(readInfo.startOffsetMetadata.offset, 
log.highWatermark().offset);
+        assertEquals(readInfo.startOffsetMetadata.offset(), 
log.highWatermark().offset());
 
         // Now update to a high watermark with invalid metadata
         assertThrows(IllegalArgumentException.class, () ->
@@ -344,8 +344,8 @@ public class MockLogTest {
 
         // Ensure we can update the high watermark to the end offset
         LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
-        assertEquals(15, readFromEndInfo.startOffsetMetadata.offset);
-        assertTrue(readFromEndInfo.startOffsetMetadata.metadata.isPresent());
+        assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
+        assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
         log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
 
         // Ensure that the end offset metadata is valid after new entries are 
appended
@@ -393,14 +393,14 @@ public class MockLogTest {
 
         assertThrows(OffsetOutOfRangeException.class, () -> 
log.read(log.startOffset() - 1,
             Isolation.UNCOMMITTED));
-        assertThrows(OffsetOutOfRangeException.class, () -> 
log.read(log.endOffset().offset + 1,
+        assertThrows(OffsetOutOfRangeException.class, () -> 
log.read(log.endOffset().offset() + 1,
             Isolation.UNCOMMITTED));
     }
 
     @Test
     public void testMonotonicEpochStartOffset() {
         appendBatch(5, 1);
-        assertEquals(5L, log.endOffset().offset);
+        assertEquals(5L, log.endOffset().offset());
 
         log.initializeLeaderEpoch(2);
         assertEquals(new OffsetAndEpoch(5L, 1), log.endOffsetForEpoch(1));
@@ -424,7 +424,7 @@ public class MockLogTest {
         appendBatch(10, 4);
         log.reopen();
 
-        assertEquals(15L, log.endOffset().offset);
+        assertEquals(15L, log.endOffset().offset());
         assertEquals(2, log.lastFetchedEpoch());
     }
 
@@ -601,7 +601,7 @@ public class MockLogTest {
         assertTrue(log.deleteBeforeSnapshot(snapshotId));
         assertEquals(offset, log.startOffset());
         assertEquals(epoch, log.lastFetchedEpoch());
-        assertEquals(offset, log.endOffset().offset);
+        assertEquals(offset, log.endOffset().offset());
 
         int newRecords = 10;
         appendBatch(newRecords, epoch + 1);
@@ -612,8 +612,8 @@ public class MockLogTest {
         assertEquals(offset, log.startOffset());
 
         assertEquals(epoch + 1, log.lastFetchedEpoch());
-        assertEquals(offset + newRecords, log.endOffset().offset);
-        assertEquals(offset + newRecords, log.highWatermark().offset);
+        assertEquals(offset + newRecords, log.endOffset().offset());
+        assertEquals(offset + newRecords, log.highWatermark().offset());
     }
 
     @Test
@@ -627,8 +627,8 @@ public class MockLogTest {
         assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(1, epoch)));
         assertEquals(0, log.startOffset());
         assertEquals(epoch, log.lastFetchedEpoch());
-        assertEquals(offset, log.endOffset().offset);
-        assertEquals(offset, log.highWatermark().offset);
+        assertEquals(offset, log.endOffset().offset());
+        assertEquals(offset, log.highWatermark().offset());
     }
 
     @Test
@@ -665,8 +665,8 @@ public class MockLogTest {
         assertTrue(log.truncateToLatestSnapshot());
         assertEquals(sameEpochSnapshotId.offset(), log.startOffset());
         assertEquals(sameEpochSnapshotId.epoch(), log.lastFetchedEpoch());
-        assertEquals(sameEpochSnapshotId.offset(), log.endOffset().offset);
-        assertEquals(sameEpochSnapshotId.offset(), log.highWatermark().offset);
+        assertEquals(sameEpochSnapshotId.offset(), log.endOffset().offset());
+        assertEquals(sameEpochSnapshotId.offset(), 
log.highWatermark().offset());
 
         OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(3 * 
numberOfRecords, epoch + 1);
 
@@ -679,8 +679,8 @@ public class MockLogTest {
         assertTrue(log.truncateToLatestSnapshot());
         assertEquals(greaterEpochSnapshotId.offset(), log.startOffset());
         assertEquals(greaterEpochSnapshotId.epoch(), log.lastFetchedEpoch());
-        assertEquals(greaterEpochSnapshotId.offset(), log.endOffset().offset);
-        assertEquals(greaterEpochSnapshotId.offset(), 
log.highWatermark().offset);
+        assertEquals(greaterEpochSnapshotId.offset(), 
log.endOffset().offset());
+        assertEquals(greaterEpochSnapshotId.offset(), 
log.highWatermark().offset());
     }
 
     @Test
@@ -764,7 +764,7 @@ public class MockLogTest {
         appendBatch(numberOfRecords, epoch);
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(numberOfRecords, epoch + 1);
-        assertEquals(ValidOffsetAndEpoch.diverging(new 
OffsetAndEpoch(log.endOffset().offset, epoch)),
+        assertEquals(ValidOffsetAndEpoch.diverging(new 
OffsetAndEpoch(log.endOffset().offset(), epoch)),
             resultOffsetAndEpoch);
     }
 
@@ -859,7 +859,7 @@ public class MockLogTest {
         appendBatch(numberOfRecords, epoch);
 
         ValidOffsetAndEpoch resultOffsetAndEpoch = 
log.validateOffsetAndEpoch(numberOfRecords + 1, epoch);
-        assertEquals(ValidOffsetAndEpoch.diverging(new 
OffsetAndEpoch(log.endOffset().offset, epoch)),
+        assertEquals(ValidOffsetAndEpoch.diverging(new 
OffsetAndEpoch(log.endOffset().offset(), epoch)),
             resultOffsetAndEpoch);
     }
 
@@ -952,7 +952,7 @@ public class MockLogTest {
     private void appendAsLeader(Collection<SimpleRecord> records, int epoch) {
         log.appendAsLeader(
             MemoryRecords.withRecords(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 Compression.NONE,
                 records.toArray(new SimpleRecord[records.size()])
             ),
@@ -971,10 +971,10 @@ public class MockLogTest {
 
     private static void validateReadRecords(List<SimpleRecord> 
expectedRecords, MockLog log) {
         assertEquals(0L, log.startOffset());
-        assertEquals(expectedRecords.size(), log.endOffset().offset);
+        assertEquals(expectedRecords.size(), log.endOffset().offset());
 
         int currentOffset = 0;
-        while (currentOffset < log.endOffset().offset) {
+        while (currentOffset < log.endOffset().offset()) {
             Records records = log.read(currentOffset, 
Isolation.UNCOMMITTED).records;
             List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 04374e57a39..573c92b1120 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -209,7 +209,7 @@ public final class RaftClientTestContext {
         public Builder appendToLog(int epoch, List<String> records) {
             MemoryRecords batch = buildBatch(
                 time.milliseconds(),
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 epoch,
                 records
             );
@@ -233,7 +233,7 @@ public final class RaftClientTestContext {
         }
 
         Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) {
-            if (snapshotId.offset() > log.highWatermark().offset) {
+            if (snapshotId.offset() > log.highWatermark().offset()) {
                 log.updateHighWatermark(new 
LogOffsetMetadata(snapshotId.offset()));
             }
             log.deleteBeforeSnapshot(snapshotId);
@@ -462,7 +462,7 @@ public final class RaftClientTestContext {
         pollUntilRequest();
 
         List<RaftRequest.Outbound> voteRequests = collectVoteRequests(epoch,
-            log.lastFetchedEpoch(), log.endOffset().offset);
+            log.lastFetchedEpoch(), log.endOffset().offset());
 
         for (RaftRequest.Outbound request : voteRequests) {
             VoteResponseData voteResponse = voteResponse(true, 
OptionalInt.empty(), epoch);
@@ -1511,7 +1511,7 @@ public final class RaftClientTestContext {
 
     public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws 
InterruptedException {
         assertEquals(localId, currentLeader());
-        long localLogEndOffset = log.endOffset().offset;
+        long localLogEndOffset = log.endOffset().offset();
 
         Iterable<ReplicaKey> followers = () -> voters
             .voterKeys()
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 30c2bbf39aa..6e9f364c8e3 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -866,12 +866,12 @@ public class RaftEventSimulationTest {
 
         long highWatermark() {
             return client.quorum().highWatermark()
-                .map(hw -> hw.offset)
+                .map(LogOffsetMetadata::offset)
                 .orElse(0L);
         }
 
         long logEndOffset() {
-            return log.endOffset().offset;
+            return log.endOffset().offset();
         }
 
         @Override
@@ -1016,7 +1016,7 @@ public class RaftEventSimulationTest {
             cluster.leaderHighWatermark().ifPresent(highWatermark -> {
                 long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
                     .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset 
>= highWatermark)
+                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
                     .count();
                 assertTrue(
                     numReachedHighWatermark >= cluster.majoritySize(),
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
 
b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
index fc7c9724048..39b5ce3e589 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java
@@ -78,7 +78,7 @@ final class KRaftControlRecordStateMachineTest {
         short kraftVersion = 1;
         log.appendAsLeader(
             MemoryRecords.withKRaftVersionRecord(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 0,
                 epoch,
                 bufferSupplier.get(300),
@@ -91,7 +91,7 @@ final class KRaftControlRecordStateMachineTest {
         VoterSet voterSet = 
VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true));
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 0,
                 epoch,
                 bufferSupplier.get(300),
@@ -104,8 +104,8 @@ final class KRaftControlRecordStateMachineTest {
         partitionState.updateState();
 
         assertEquals(voterSet, partitionState.lastVoterSet());
-        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset - 1));
-        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset - 1));
+        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
+        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
     }
 
     @Test
@@ -129,7 +129,7 @@ final class KRaftControlRecordStateMachineTest {
         short kraftVersion = 1;
         log.appendAsLeader(
             MemoryRecords.withKRaftVersionRecord(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 0,
                 epoch,
                 bufferSupplier.get(300),
@@ -142,7 +142,7 @@ final class KRaftControlRecordStateMachineTest {
         VoterSet voterSet = 
VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true));
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 0,
                 epoch,
                 bufferSupplier.get(300),
@@ -155,8 +155,8 @@ final class KRaftControlRecordStateMachineTest {
         partitionState.updateState();
 
         assertEquals(voterSet, partitionState.lastVoterSet());
-        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset - 1));
-        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset - 1));
+        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
+        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
     }
 
     @Test
@@ -184,8 +184,8 @@ final class KRaftControlRecordStateMachineTest {
         partitionState.updateState();
 
         assertEquals(voterSet, partitionState.lastVoterSet());
-        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset - 1));
-        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset - 1));
+        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
+        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
     }
 
     @Test
@@ -215,7 +215,7 @@ final class KRaftControlRecordStateMachineTest {
         VoterSet voterSet = 
snapshotVoterSet.addVoter(VoterSetTest.voterNode(7, true)).get();
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 0,
                 epoch,
                 bufferSupplier.get(300),
@@ -228,8 +228,8 @@ final class KRaftControlRecordStateMachineTest {
         partitionState.updateState();
 
         assertEquals(voterSet, partitionState.lastVoterSet());
-        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset - 1));
-        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset - 1));
+        assertEquals(Optional.of(voterSet), 
partitionState.voterSetAtOffset(log.endOffset().offset() - 1));
+        assertEquals(kraftVersion, 
partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1));
 
         // Check the voter set at the snapshot
         assertEquals(Optional.of(snapshotVoterSet), 
partitionState.voterSetAtOffset(snapshotId.offset() - 1));
@@ -248,7 +248,7 @@ final class KRaftControlRecordStateMachineTest {
         short kraftVersion = 1;
         log.appendAsLeader(
             MemoryRecords.withKRaftVersionRecord(
-                log.endOffset().offset,
+                log.endOffset().offset(),
                 0,
                 epoch,
                 bufferSupplier.get(300),
@@ -258,7 +258,7 @@ final class KRaftControlRecordStateMachineTest {
         );
 
         // Append the voter set control record
-        long firstVoterSetOffset = log.endOffset().offset;
+        long firstVoterSetOffset = log.endOffset().offset();
         VoterSet firstVoterSet = 
VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true));
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(
@@ -272,7 +272,7 @@ final class KRaftControlRecordStateMachineTest {
         );
 
         // Append another voter set control record
-        long voterSetOffset = log.endOffset().offset;
+        long voterSetOffset = log.endOffset().offset();
         VoterSet voterSet = firstVoterSet.addVoter(VoterSetTest.voterNode(7, 
true)).get();
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(
@@ -313,7 +313,7 @@ final class KRaftControlRecordStateMachineTest {
         KRaftControlRecordStateMachine partitionState = 
buildPartitionListener(log, Optional.of(staticVoterSet));
 
         // Append the kraft.version control record
-        long kraftVersionOffset = log.endOffset().offset;
+        long kraftVersionOffset = log.endOffset().offset();
         short kraftVersion = 1;
         log.appendAsLeader(
             MemoryRecords.withKRaftVersionRecord(
@@ -327,7 +327,7 @@ final class KRaftControlRecordStateMachineTest {
         );
 
         // Append the voter set control record
-        long firstVoterSetOffset = log.endOffset().offset;
+        long firstVoterSetOffset = log.endOffset().offset();
         VoterSet firstVoterSet = 
VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true));
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(
@@ -341,7 +341,7 @@ final class KRaftControlRecordStateMachineTest {
         );
 
         // Append another voter set control record
-        long voterSetOffset = log.endOffset().offset;
+        long voterSetOffset = log.endOffset().offset();
         VoterSet voterSet = firstVoterSet.addVoter(VoterSetTest.voterNode(7, 
true)).get();
         log.appendAsLeader(
             MemoryRecords.withVotersRecord(

Reply via email to