This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 376365d9da8 MINOR; Add property methods to LogOffsetMetadata (#16521) 376365d9da8 is described below commit 376365d9da8099e3eb9175090cc456f55985fcb0 Author: José Armando García Sancio <jsan...@users.noreply.github.com> AuthorDate: Thu Jul 4 15:03:32 2024 -0400 MINOR; Add property methods to LogOffsetMetadata (#16521) This change simply adds property methods to LogOffsetMetadata. It changes all of the callers to use the new property methods instead of using the fields directly. Reviewers: Colin P. McCabe <cmcc...@apache.org> --- core/src/test/java/kafka/test/ClusterInstance.java | 2 +- .../java/org/apache/kafka/raft/FollowerState.java | 2 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 28 +++---- .../java/org/apache/kafka/raft/LeaderState.java | 32 ++++---- .../org/apache/kafka/raft/LogOffsetMetadata.java | 13 +++- .../java/org/apache/kafka/raft/ReplicatedLog.java | 2 +- .../internals/KRaftControlRecordStateMachine.java | 2 +- .../kafka/raft/internals/KafkaRaftMetrics.java | 6 +- .../kafka/raft/KafkaRaftClientSnapshotTest.java | 16 ++-- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 88 +++++++++++----------- .../test/java/org/apache/kafka/raft/MockLog.java | 48 ++++++------ .../java/org/apache/kafka/raft/MockLogTest.java | 60 +++++++-------- .../apache/kafka/raft/RaftClientTestContext.java | 8 +- .../apache/kafka/raft/RaftEventSimulationTest.java | 6 +- .../KRaftControlRecordStateMachineTest.java | 38 +++++----- 15 files changed, 181 insertions(+), 170 deletions(-) diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index d11d0764c8c..41cc8b485f1 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -194,7 +194,7 @@ public interface ClusterInstance { ), 60000L, topic + " metadata not propagated after 60000 ms"); for (ControllerServer controller : controllers().values()) { - long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset - 1; + long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset() - 1; TestUtils.waitForCondition( () -> brokers().values().stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset), 60000L, "Timeout waiting for controller metadata propagating to brokers"); diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java index c43d74d10ca..a4d635e0388 100644 --- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java +++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java @@ -134,7 +134,7 @@ public class FollowerState implements EpochState { } if (highWatermark.isPresent()) { - long previousHighWatermark = highWatermark.get().offset; + long previousHighWatermark = highWatermark.get().offset(); long updatedHighWatermark = newHighWatermark.getAsLong(); if (updatedHighWatermark < 0) { diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 721472a5729..b9f9f9bef5d 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -320,7 +320,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { onUpdateLeaderHighWatermark(state, currentTimeMs); } - fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); + fetchPurgatory.maybeComplete(endOffsetMetadata.offset(), currentTimeMs); } private void onUpdateLeaderHighWatermark( @@ -334,12 +334,12 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // After updating the high watermark, we first clear the append // purgatory so that we have an opportunity to route the pending // records still held in memory directly to the listener - appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); + appendPurgatory.maybeComplete(highWatermark.offset(), currentTimeMs); // It is also possible that the high watermark is being updated // for the first time following the leader election, so we need // to give lagging listeners an opportunity to catch up as well - updateListenersProgress(highWatermark.offset); + updateListenersProgress(highWatermark.offset()); }); } @@ -469,7 +469,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // so there are no unknown voter connections. Report this metric as 0. kafkaRaftMetrics.updateNumUnknownVoterConnections(0); - quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); + quorum.initialize(new OffsetAndEpoch(log.endOffset().offset(), log.lastFetchedEpoch())); long currentTimeMs = time.milliseconds(); if (quorum.isLeader()) { @@ -514,7 +514,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { } private OffsetAndEpoch endOffset() { - return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch()); + return new OffsetAndEpoch(log.endOffset().offset(), log.lastFetchedEpoch()); } private void resetConnections() { @@ -522,7 +522,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { } private void onBecomeLeader(long currentTimeMs) { - long endOffset = log.endOffset().offset; + long endOffset = log.endOffset().offset(); BatchAccumulator<T> accumulator = new BatchAccumulator<>( quorum.epoch(), @@ -1201,7 +1201,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { .setErrorCode(error.code()) .setLogStartOffset(log.startOffset()) .setHighWatermark( - highWatermark.map(offsetMetadata -> offsetMetadata.offset).orElse(-1L) + highWatermark.map(offsetMetadata -> offsetMetadata.offset()).orElse(-1L) ); partitionData.currentLeader() @@ -1515,7 +1515,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { divergingEpoch.endOffset(), divergingEpoch.epoch()); state.highWatermark().ifPresent(highWatermark -> { - if (divergingOffsetAndEpoch.offset() < highWatermark.offset) { + if (divergingOffsetAndEpoch.offset() < highWatermark.offset()) { throw new KafkaException("The leader requested truncation to offset " + divergingOffsetAndEpoch.offset() + ", which is below the current high watermark" + " " + highWatermark); @@ -1694,7 +1694,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { FetchSnapshotRequestData.PartitionSnapshot partitionSnapshot = partitionSnapshotOpt.get(); Optional<Errors> leaderValidation = validateLeaderOnlyRequest( - partitionSnapshot.currentLeaderEpoch() + partitionSnapshot.currentLeaderEpoch() ); if (leaderValidation.isPresent()) { return RaftUtil.singletonFetchSnapshotResponse( @@ -1935,7 +1935,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // is always less than the snapshot id just downloaded. partitionState.updateState(); - updateFollowerHighWatermark(state, OptionalLong.of(log.highWatermark().offset)); + updateFollowerHighWatermark(state, OptionalLong.of(log.highWatermark().offset())); } else { throw new IllegalStateException( String.format( @@ -2365,7 +2365,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { fetchPartition -> fetchPartition .setCurrentLeaderEpoch(quorum.epoch()) .setLastFetchedEpoch(log.lastFetchedEpoch()) - .setFetchOffset(log.endOffset().offset) + .setFetchOffset(log.endOffset().offset()) .setReplicaDirectoryId(quorum.localDirectoryId()) ); @@ -2766,7 +2766,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // Check listener progress to see if reads are expected quorum.highWatermark().ifPresent(highWatermarkMetadata -> { - updateListenersProgress(highWatermarkMetadata.offset); + updateListenersProgress(highWatermarkMetadata.offset()); }); // Notify the new listeners of the latest leader and epoch @@ -3024,7 +3024,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { @Override public long logEndOffset() { - return log.endOffset().offset; + return log.endOffset().offset(); } @Override @@ -3042,7 +3042,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { @Override public OptionalLong highWatermark() { if (isInitialized() && quorum.highWatermark().isPresent()) { - return OptionalLong.of(quorum.highWatermark().get().offset); + return OptionalLong.of(quorum.highWatermark().get().offset()); } else { return OptionalLong.empty(); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index eb1b86983a7..5e6dd8de6b5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -266,14 +266,14 @@ public class LeaderState<T> implements EpochState { // as all records from previous epochs that the current leader has committed. LogOffsetMetadata highWatermarkUpdateMetadata = highWatermarkUpdateOpt.get(); - long highWatermarkUpdateOffset = highWatermarkUpdateMetadata.offset; + long highWatermarkUpdateOffset = highWatermarkUpdateMetadata.offset(); if (highWatermarkUpdateOffset > epochStartOffset) { if (highWatermark.isPresent()) { LogOffsetMetadata currentHighWatermarkMetadata = highWatermark.get(); - if (highWatermarkUpdateOffset > currentHighWatermarkMetadata.offset - || (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset && - !highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata))) { + if (highWatermarkUpdateOffset > currentHighWatermarkMetadata.offset() + || (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset() && + !highWatermarkUpdateMetadata.metadata().equals(currentHighWatermarkMetadata.metadata()))) { Optional<LogOffsetMetadata> oldHighWatermark = highWatermark; highWatermark = highWatermarkUpdateOpt; logHighWatermarkUpdate( @@ -283,12 +283,12 @@ public class LeaderState<T> implements EpochState { followersByDescendingFetchOffset ); return true; - } else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset) { + } else if (highWatermarkUpdateOffset < currentHighWatermarkMetadata.offset()) { log.info("The latest computed high watermark {} is smaller than the current " + "value {}, which should only happen when voter set membership changes. If the voter " + "set has not changed this suggests that one of the voters has lost committed data. " + "Full voter replication state: {}", highWatermarkUpdateOffset, - currentHighWatermarkMetadata.offset, voterStates.values()); + currentHighWatermarkMetadata.offset(), voterStates.values()); return false; } else { return false; @@ -347,9 +347,9 @@ public class LeaderState<T> implements EpochState { ) { ReplicaState state = getOrCreateReplicaState(localReplicaKey); state.endOffset.ifPresent(currentEndOffset -> { - if (currentEndOffset.offset > endOffsetMetadata.offset) { + if (currentEndOffset.offset() > endOffsetMetadata.offset()) { throw new IllegalStateException("Detected non-monotonic update of local " + - "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); + "end offset: " + currentEndOffset.offset() + " -> " + endOffsetMetadata.offset()); } }); @@ -386,9 +386,9 @@ public class LeaderState<T> implements EpochState { ReplicaState state = getOrCreateReplicaState(replicaKey); state.endOffset.ifPresent(currentEndOffset -> { - if (currentEndOffset.offset > fetchOffsetMetadata.offset) { + if (currentEndOffset.offset() > fetchOffsetMetadata.offset()) { log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}", - state.replicaKey, currentEndOffset.offset, fetchOffsetMetadata.offset); + state.replicaKey, currentEndOffset.offset(), fetchOffsetMetadata.offset()); } }); @@ -460,7 +460,7 @@ public class LeaderState<T> implements EpochState { .setErrorCode(Errors.NONE.code()) .setLeaderId(localReplicaKey.id()) .setLeaderEpoch(epoch) - .setHighWatermark(highWatermark.map(offsetMetadata -> offsetMetadata.offset).orElse(-1L)) + .setHighWatermark(highWatermark.map(LogOffsetMetadata::offset).orElse(-1L)) .setCurrentVoters(describeReplicaStates(voterStates.values(), currentTimeMs)) .setObservers(describeReplicaStates(observerStates.values(), currentTimeMs)); } @@ -499,7 +499,7 @@ public class LeaderState<T> implements EpochState { // KAFKA-16953 will add support for the replica directory id return new DescribeQuorumResponseData.ReplicaState() .setReplicaId(replicaState.replicaKey.id()) - .setLogEndOffset(replicaState.endOffset.map(md -> md.offset).orElse(-1L)) + .setLogEndOffset(replicaState.endOffset.map(LogOffsetMetadata::offset).orElse(-1L)) .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) .setLastFetchTimestamp(lastFetchTimestamp); @@ -614,13 +614,13 @@ public class LeaderState<T> implements EpochState { // This allows us to use the previous value for `lastFetchTimestamp` if the // follower was able to catch up to `lastFetchLeaderLogEndOffset` on this fetch. leaderEndOffsetOpt.ifPresent(leaderEndOffset -> { - if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) { + if (fetchOffsetMetadata.offset() >= leaderEndOffset.offset()) { lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, currentTimeMs); } else if (lastFetchLeaderLogEndOffset > 0 - && fetchOffsetMetadata.offset >= lastFetchLeaderLogEndOffset) { + && fetchOffsetMetadata.offset() >= lastFetchLeaderLogEndOffset) { lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, lastFetchTimestamp); } - lastFetchLeaderLogEndOffset = leaderEndOffset.offset; + lastFetchLeaderLogEndOffset = leaderEndOffset.offset(); }); lastFetchTimestamp = Math.max(lastFetchTimestamp, currentTimeMs); @@ -637,7 +637,7 @@ public class LeaderState<T> implements EpochState { else if (!that.endOffset.isPresent()) return -1; else - return Long.compare(that.endOffset.get().offset, this.endOffset.get().offset); + return Long.compare(that.endOffset.get().offset(), this.endOffset.get().offset()); } @Override diff --git a/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java b/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java index 6a966192cb0..38a90c84a4e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java +++ b/raft/src/main/java/org/apache/kafka/raft/LogOffsetMetadata.java @@ -23,9 +23,8 @@ import java.util.Optional; * Metadata for specific local log offset */ public class LogOffsetMetadata { - - public final long offset; - public final Optional<OffsetMetadata> metadata; + private final long offset; + private final Optional<OffsetMetadata> metadata; public LogOffsetMetadata(long offset) { this(offset, Optional.empty()); @@ -36,6 +35,14 @@ public class LogOffsetMetadata { this.metadata = metadata; } + public long offset() { + return offset; + } + + public Optional<OffsetMetadata> metadata() { + return metadata; + } + @Override public String toString() { return "LogOffsetMetadata(offset=" + offset + diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java index dbf5133f35b..a22f7fd73cd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java @@ -214,7 +214,7 @@ public interface ReplicatedLog extends AutoCloseable { final long truncationOffset; int leaderEpoch = endOffset.epoch(); if (leaderEpoch == 0) { - truncationOffset = Math.min(endOffset.offset(), endOffset().offset); + truncationOffset = Math.min(endOffset.offset(), endOffset().offset()); } else { OffsetAndEpoch localEndOffset = endOffsetForEpoch(leaderEpoch); if (localEndOffset.epoch() == leaderEpoch) { diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index c4c7b274fab..b57f5e104f9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -189,7 +189,7 @@ public final class KRaftControlRecordStateMachine { } private void maybeLoadLog() { - while (log.endOffset().offset > nextOffset) { + while (log.endOffset().offset() > nextOffset) { LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED); try (RecordsIterator<?> iterator = new RecordsIterator<>( info.records, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java index 3bdac5ffc77..2b7617df359 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.WindowedSum; +import org.apache.kafka.raft.LogOffsetMetadata; import org.apache.kafka.raft.OffsetAndEpoch; import org.apache.kafka.raft.QuorumState; @@ -121,7 +122,10 @@ public class KafkaRaftMetrics implements AutoCloseable { metrics.addMetric(this.currentEpochMetricName, (mConfig, currentTimeMs) -> state.epoch()); this.highWatermarkMetricName = metrics.metricName("high-watermark", metricGroupName, "The high watermark maintained on this member; -1 if it is unknown"); - metrics.addMetric(this.highWatermarkMetricName, (mConfig, currentTimeMs) -> state.highWatermark().map(hw -> hw.offset).orElse(-1L)); + metrics.addMetric( + this.highWatermarkMetricName, + (mConfig, currentTimeMs) -> state.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L) + ); this.logEndOffsetMetricName = metrics.metricName("log-end-offset", metricGroupName, "The current raft log end offset."); metrics.addMetric(this.logEndOffsetMetricName, (mConfig, currentTimeMs) -> logEndOffset.offset()); diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 2eb2f65ca0e..02a92e8303e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -117,7 +117,7 @@ public final class KafkaRaftClientSnapshotTest { int epoch = context.currentEpoch(); // Advance the highWatermark - long localLogEndOffset = context.log.endOffset().offset; + long localLogEndOffset = context.log.endOffset().offset(); context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, localLogEndOffset, epoch, 0)); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); @@ -152,7 +152,7 @@ public final class KafkaRaftClientSnapshotTest { RaftClientTestContext context = contextBuilder.build(); // Advance the highWatermark - long localLogEndOffset = context.log.endOffset().offset; + long localLogEndOffset = context.log.endOffset().offset(); context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch()); @@ -194,7 +194,7 @@ public final class KafkaRaftClientSnapshotTest { RaftClientTestContext context = contextBuilder.build(); // Advance the highWatermark - long localLogEndOffset = context.log.endOffset().offset; + long localLogEndOffset = context.log.endOffset().offset(); context.pollUntilRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); context.assertFetchRequestData(fetchRequest, epoch, localLogEndOffset, snapshotId.epoch()); @@ -242,7 +242,7 @@ public final class KafkaRaftClientSnapshotTest { context.listener.updateReadCommit(false); // Advance the highWatermark - long localLogEndOffset = context.log.endOffset().offset; + long localLogEndOffset = context.log.endOffset().offset(); context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, localLogEndOffset, epoch, 0)); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(localId)); @@ -328,7 +328,7 @@ public final class KafkaRaftClientSnapshotTest { context.time.sleep(context.appendLingerMs()); context.client.poll(); - long localLogEndOffset = context.log.endOffset().offset; + long localLogEndOffset = context.log.endOffset().offset(); assertTrue( appendRecords.size() <= localLogEndOffset, String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset) @@ -377,7 +377,7 @@ public final class KafkaRaftClientSnapshotTest { context.time.sleep(context.appendLingerMs()); context.client.poll(); - long localLogEndOffset = context.log.endOffset().offset; + long localLogEndOffset = context.log.endOffset().offset(); assertTrue( appendRecords.size() <= localLogEndOffset, String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset) @@ -631,7 +631,7 @@ public final class KafkaRaftClientSnapshotTest { context.fetchRequest( epoch, otherNodeKey, - context.log.endOffset().offset, + context.log.endOffset().offset(), oldestSnapshotId.epoch() - 1, 0 ) @@ -1887,7 +1887,7 @@ public final class KafkaRaftClientSnapshotTest { context.client.scheduleAppend(currentEpoch, newRecords); context.time.sleep(context.appendLingerMs()); context.client.poll(); - assertEquals(context.log.endOffset().offset, context.client.highWatermark().getAsLong() + newRecords.size()); + assertEquals(context.log.endOffset().offset(), context.client.highWatermark().getAsLong() + newRecords.size()); OffsetAndEpoch invalidSnapshotId2 = new OffsetAndEpoch(context.client.highWatermark().getAsLong() + 2, currentEpoch); assertThrows(IllegalArgumentException.class, () -> context.client.createSnapshot(invalidSnapshotId2, 0)); diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index a7cf801bf3d..a8c108bb2d9 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -87,7 +87,7 @@ public class KafkaRaftClientTest { .withKip853Rpc(withKip853Rpc) .build(); context.assertElectedLeader(1, localId); - assertEquals(context.log.endOffset().offset, context.client.logEndOffset()); + assertEquals(context.log.endOffset().offset(), context.client.logEndOffset()); } @ParameterizedTest @@ -103,8 +103,8 @@ public class KafkaRaftClientTest { .withElectedLeader(initialEpoch, localId) .build(); - context.pollUntil(() -> context.log.endOffset().offset == 1L); - assertEquals(1L, context.log.endOffset().offset); + context.pollUntil(() -> context.log.endOffset().offset() == 1L); + assertEquals(1L, context.log.endOffset().offset()); assertEquals(initialEpoch + 1, context.log.lastFetchedEpoch()); assertEquals(new LeaderAndEpoch(OptionalInt.of(localId), initialEpoch + 1), context.currentLeaderAndEpoch()); @@ -125,7 +125,7 @@ public class KafkaRaftClientTest { .withKip853Rpc(withKip853Rpc) .build(); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); context.assertElectedLeader(epoch, localId); // Since we were the leader in epoch 2, we should ensure that we will not vote for any @@ -135,7 +135,7 @@ public class KafkaRaftClientTest { epoch, remoteKey, context.log.lastFetchedEpoch(), - context.log.endOffset().offset + context.log.endOffset().offset() ) ); context.pollUntilResponse(); @@ -156,7 +156,7 @@ public class KafkaRaftClientTest { .withKip853Rpc(withKip853Rpc) .build(); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); context.assertVotedCandidate(epoch, localId); // Since we were the leader in epoch 2, we should ensure that we will not vote for any @@ -166,7 +166,7 @@ public class KafkaRaftClientTest { epoch, remoteKey, context.log.lastFetchedEpoch(), - context.log.endOffset().offset + context.log.endOffset().offset() ) ); context.pollUntilResponse(); @@ -189,7 +189,7 @@ public class KafkaRaftClientTest { // Resign from leader, will restart in resigned state assertTrue(context.client.quorum().isResigned()); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); context.assertElectedLeader(epoch, localId); // Send vote request with higher epoch @@ -198,7 +198,7 @@ public class KafkaRaftClientTest { epoch + 1, remoteKey, context.log.lastFetchedEpoch(), - context.log.endOffset().offset + context.log.endOffset().offset() ) ); context.client.poll(); @@ -225,7 +225,7 @@ public class KafkaRaftClientTest { // Resign from candidate, will restart in candidate state assertTrue(context.client.quorum().isCandidate()); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); context.assertVotedCandidate(epoch, localId); // Send vote request with higher epoch @@ -234,7 +234,7 @@ public class KafkaRaftClientTest { epoch + 1, remoteKey, context.log.lastFetchedEpoch(), - context.log.endOffset().offset + context.log.endOffset().offset() ) ); context.client.poll(); @@ -269,7 +269,7 @@ public class KafkaRaftClientTest { epoch + 1, remoteKey, context.log.lastFetchedEpoch(), - context.log.endOffset().offset + context.log.endOffset().offset() ) ); context.client.poll(); @@ -296,7 +296,7 @@ public class KafkaRaftClientTest { // Resign from leader, will restart in resigned state assertTrue(context.client.quorum().isResigned()); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); context.assertElectedLeader(epoch, localId); // Election timeout @@ -323,7 +323,7 @@ public class KafkaRaftClientTest { // The node will remain elected, but start up in a resigned state // in which no additional writes are accepted. - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); context.assertElectedLeader(epoch, localId); context.client.poll(); assertThrows(NotLeaderException.class, () -> context.client.scheduleAppend(epoch, Arrays.asList("a", "b"))); @@ -492,12 +492,12 @@ public class KafkaRaftClientTest { // send fetch request when become leader int epoch = context.currentEpoch(); - context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, context.log.endOffset().offset, epoch, 1000)); + context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, context.log.endOffset().offset(), epoch, 1000)); context.client.poll(); // append some record, but the fetch in purgatory will still fail context.log.appendAsLeader( - context.buildBatch(context.log.endOffset().offset, epoch, singletonList("raft")), + context.buildBatch(context.log.endOffset().offset(), epoch, singletonList("raft")), epoch ); @@ -800,7 +800,7 @@ public class KafkaRaftClientTest { .withKip853Rpc(withKip853Rpc) .build(); context.assertVotedCandidate(2, localId); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); // The candidate will resume the election after reinitialization context.pollUntilRequest(); @@ -832,12 +832,12 @@ public class KafkaRaftClientTest { ); // Become leader after receiving the vote - context.pollUntil(() -> context.log.endOffset().offset == 1L); + context.pollUntil(() -> context.log.endOffset().offset() == 1L); context.assertElectedLeader(1, localId); long electionTimestamp = context.time.milliseconds(); // Leader change record appended - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); assertEquals(1L, context.log.firstUnflushedOffset()); // Send BeginQuorumEpoch to voters @@ -879,12 +879,12 @@ public class KafkaRaftClientTest { ); // Become leader after receiving the vote - context.pollUntil(() -> context.log.endOffset().offset == 1L); + context.pollUntil(() -> context.log.endOffset().offset() == 1L); context.assertElectedLeader(1, localId); long electionTimestamp = context.time.milliseconds(); // Leader change record appended - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); assertEquals(1L, context.log.firstUnflushedOffset()); // Send BeginQuorumEpoch to voters @@ -1110,7 +1110,7 @@ public class KafkaRaftClientTest { assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a"))); context.deliverRequest( - context.voteRequest(epoch + 1, otherNodeKey, epoch, context.log.endOffset().offset) + context.voteRequest(epoch + 1, otherNodeKey, epoch, context.log.endOffset().offset()) ); context.pollUntilResponse(); @@ -1168,7 +1168,7 @@ public class KafkaRaftClientTest { context.becomeLeader(); assertEquals(OptionalInt.of(localId), context.currentLeader()); - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); int epoch = context.currentEpoch(); assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a"))); @@ -1183,7 +1183,7 @@ public class KafkaRaftClientTest { context.time.sleep(30); context.client.poll(); - assertEquals(2L, context.log.endOffset().offset); + assertEquals(2L, context.log.endOffset().offset()); } @ParameterizedTest @@ -1204,7 +1204,7 @@ public class KafkaRaftClientTest { context.becomeLeader(); assertEquals(OptionalInt.of(localId), context.currentLeader()); - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); int epoch = context.currentEpoch(); assertEquals(1L, context.client.scheduleAppend(epoch, singletonList("a"))); @@ -1219,7 +1219,7 @@ public class KafkaRaftClientTest { assertTrue(context.messageQueue.wakeupRequested()); context.client.poll(); - assertEquals(3L, context.log.endOffset().offset); + assertEquals(3L, context.log.endOffset().offset()); } @ParameterizedTest @@ -1481,7 +1481,7 @@ public class KafkaRaftClientTest { // First poll has no high watermark advance context.client.poll(); assertEquals(OptionalLong.empty(), context.client.highWatermark()); - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); // Let follower send a fetch to initialize the high watermark, // note the offset 0 would be a control message for becoming the leader @@ -1984,7 +1984,7 @@ public class KafkaRaftClientTest { // First poll has no high watermark advance. context.client.poll(); assertEquals(OptionalLong.empty(), context.client.highWatermark()); - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); // Now we will advance the high watermark with a follower fetch request. FetchRequestData fetchRequestData = context.fetchRequest(epoch, otherNodeKey, 1L, epoch, 0); @@ -2433,7 +2433,7 @@ public class KafkaRaftClientTest { ); context.client.poll(); - assertEquals(0, context.log.endOffset().offset); + assertEquals(0, context.log.endOffset().offset()); context.assertVotedCandidate(epoch + 1, localId); } @@ -2475,7 +2475,7 @@ public class KafkaRaftClientTest { ); context.client.poll(); - assertEquals(0, context.log.endOffset().offset); + assertEquals(0, context.log.endOffset().offset()); context.assertElectedLeader(epoch + 1, voter3); } @@ -2956,7 +2956,7 @@ public class KafkaRaftClientTest { ); context.client.poll(); - assertEquals(2L, context.log.endOffset().offset); + assertEquals(2L, context.log.endOffset().offset()); assertEquals(2L, context.log.firstUnflushedOffset()); } @@ -2990,7 +2990,7 @@ public class KafkaRaftClientTest { fetchResponse ); context.client.poll(); - assertEquals(0L, context.log.endOffset().offset); + assertEquals(0L, context.log.endOffset().offset()); assertEquals(OptionalLong.of(0L), context.client.highWatermark()); // Receive some records in the next poll, but do not advance high watermark @@ -3004,7 +3004,7 @@ public class KafkaRaftClientTest { fetchResponse ); context.client.poll(); - assertEquals(2L, context.log.endOffset().offset); + assertEquals(2L, context.log.endOffset().offset()); assertEquals(OptionalLong.of(0L), context.client.highWatermark()); // The next fetch response is empty, but should still advance the high watermark @@ -3023,7 +3023,7 @@ public class KafkaRaftClientTest { fetchResponse ); context.client.poll(); - assertEquals(2L, context.log.endOffset().offset); + assertEquals(2L, context.log.endOffset().offset()); assertEquals(OptionalLong.of(2L), context.client.highWatermark()); } @@ -3074,7 +3074,7 @@ public class KafkaRaftClientTest { .build(); long now = context.time.milliseconds(); - context.pollUntil(() -> context.log.endOffset().offset == 1L); + context.pollUntil(() -> context.log.endOffset().offset() == 1L); context.assertElectedLeader(1, localId); // We still write the leader change message @@ -3156,7 +3156,7 @@ public class KafkaRaftClientTest { .build(); context.assertElectedLeader(epoch, otherNodeId); - assertEquals(3L, context.log.endOffset().offset); + assertEquals(3L, context.log.endOffset().offset()); context.pollUntilRequest(); @@ -3168,7 +3168,7 @@ public class KafkaRaftClientTest { // Poll again to complete truncation context.client.poll(); - assertEquals(2L, context.log.endOffset().offset); + assertEquals(2L, context.log.endOffset().offset()); assertEquals(2L, context.log.firstUnflushedOffset()); // Now we should be fetching @@ -3184,7 +3184,7 @@ public class KafkaRaftClientTest { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, Collections.singleton(localId)) .withKip853Rpc(withKip853Rpc) .build(); - context.pollUntil(() -> context.log.endOffset().offset == 1L); + context.pollUntil(() -> context.log.endOffset().offset() == 1L); assertNotNull(getMetric(context.metrics, "current-state")); assertNotNull(getMetric(context.metrics, "current-leader")); @@ -3350,7 +3350,7 @@ public class KafkaRaftClientTest { int epoch = context.currentEpoch(); // After becoming leader, we expect the `LeaderChange` record to be appended. - assertEquals(1L, context.log.endOffset().offset); + assertEquals(1L, context.log.endOffset().offset()); // The high watermark is not known to the leader until the followers // begin fetching, so we should not have fired the `handleLeaderChange` callback. @@ -3407,7 +3407,7 @@ public class KafkaRaftClientTest { // After becoming leader, we expect the `LeaderChange` record to be appended // in addition to the initial 9 records in the log. - assertEquals(10L, context.log.endOffset().offset); + assertEquals(10L, context.log.endOffset().offset()); // The high watermark is not known to the leader until the followers // begin fetching, so we should not have fired the `handleLeaderChange` callback. @@ -3470,7 +3470,7 @@ public class KafkaRaftClientTest { context.becomeLeader(); context.client.poll(); - assertEquals(10L, context.log.endOffset().offset); + assertEquals(10L, context.log.endOffset().offset()); // Let the initial listener catch up context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 10L, epoch, 0)); @@ -3513,7 +3513,7 @@ public class KafkaRaftClientTest { context.becomeLeader(); context.client.poll(); - assertEquals(10L, context.log.endOffset().offset); + assertEquals(10L, context.log.endOffset().offset()); // Let the initial listener catch up context.advanceLocalLeaderHighWatermarkToLogEndOffset(); @@ -3662,7 +3662,7 @@ public class KafkaRaftClientTest { // Start off as the leader and receive a fetch to initialize the high watermark context.becomeLeader(); - assertEquals(10L, context.log.endOffset().offset); + assertEquals(10L, context.log.endOffset().offset()); context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 10L, epoch, 0)); context.pollUntilResponse(); @@ -3813,7 +3813,7 @@ public class KafkaRaftClientTest { context.fetchResponse(leaderEpoch, leaderId, batch1, 0L, Errors.NONE) ); context.client.poll(); - assertEquals(3L, context.log.endOffset().offset); + assertEquals(3L, context.log.endOffset().offset()); assertEquals(3, context.log.lastFetchedEpoch()); } diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java b/raft/src/test/java/org/apache/kafka/raft/MockLog.java index 8247e137086..b8c2089caf2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java @@ -81,7 +81,7 @@ public class MockLog implements ReplicatedLog { @Override public void truncateTo(long offset) { - if (offset < highWatermark.offset) { + if (offset < highWatermark.offset()) { throw new IllegalArgumentException("Illegal attempt to truncate to offset " + offset + " which is below the current high watermark " + highWatermark); } @@ -89,7 +89,7 @@ public class MockLog implements ReplicatedLog { logger.debug("Truncating log to end offset {}", offset); batches.removeIf(entry -> entry.lastOffset() >= offset); epochStartOffsets.removeIf(epochStartOffset -> epochStartOffset.startOffset >= offset); - firstUnflushedOffset = Math.min(firstUnflushedOffset, endOffset().offset); + firstUnflushedOffset = Math.min(firstUnflushedOffset, endOffset().offset()); } @Override @@ -98,7 +98,7 @@ public class MockLog implements ReplicatedLog { latestSnapshotId().ifPresent(snapshotId -> { if (snapshotId.epoch() > logLastFetchedEpoch().orElse(0) || (snapshotId.epoch() == logLastFetchedEpoch().orElse(0) && - snapshotId.offset() > endOffset().offset)) { + snapshotId.offset() > endOffset().offset())) { logger.debug("Truncating to the latest snapshot at {}", snapshotId); @@ -117,13 +117,13 @@ public class MockLog implements ReplicatedLog { @Override public void updateHighWatermark(LogOffsetMetadata offsetMetadata) { - if (this.highWatermark.offset > offsetMetadata.offset) { + if (this.highWatermark.offset() > offsetMetadata.offset()) { throw new IllegalArgumentException("Non-monotonic update of current high watermark " + highWatermark + " to new value " + offsetMetadata); - } else if (offsetMetadata.offset > endOffset().offset) { + } else if (offsetMetadata.offset() > endOffset().offset()) { throw new IllegalArgumentException("Attempt to update high watermark to " + offsetMetadata + " which is larger than the current end offset " + endOffset()); - } else if (offsetMetadata.offset < startOffset()) { + } else if (offsetMetadata.offset() < startOffset()) { throw new IllegalArgumentException("Attempt to update high watermark to " + offsetMetadata + " which is smaller than the current start offset " + startOffset()); } @@ -148,8 +148,8 @@ public class MockLog implements ReplicatedLog { } private Optional<OffsetMetadata> metadataForOffset(long offset) { - if (offset == endOffset().offset) { - return endOffset().metadata; + if (offset == endOffset().offset()) { + return endOffset().metadata(); } for (LogBatch batch : batches) { @@ -167,12 +167,12 @@ public class MockLog implements ReplicatedLog { } private void assertValidHighWatermarkMetadata(LogOffsetMetadata offsetMetadata) { - if (!offsetMetadata.metadata.isPresent()) { + if (!offsetMetadata.metadata().isPresent()) { return; } - long id = ((MockOffsetMetadata) offsetMetadata.metadata.get()).id; - long offset = offsetMetadata.offset; + long id = ((MockOffsetMetadata) offsetMetadata.metadata().get()).id; + long offset = offsetMetadata.offset(); metadataForOffset(offset).ifPresent(metadata -> { long entryId = ((MockOffsetMetadata) metadata).id; @@ -211,7 +211,7 @@ public class MockLog implements ReplicatedLog { epochLowerBound = epochStartOffset.epoch; } - return new OffsetAndEpoch(endOffset().offset, lastFetchedEpoch()); + return new OffsetAndEpoch(endOffset().offset(), lastFetchedEpoch()); } private Optional<LogEntry> lastEntry() { @@ -300,10 +300,10 @@ public class MockLog implements ReplicatedLog { if (records.sizeInBytes() == 0) throw new IllegalArgumentException("Attempt to append an empty record set"); - long baseOffset = endOffset().offset; + long baseOffset = endOffset().offset(); long lastOffset = baseOffset; for (RecordBatch batch : records.batches()) { - if (batch.baseOffset() != endOffset().offset) { + if (batch.baseOffset() != endOffset().offset()) { /* KafkaMetadataLog throws an kafka.common.UnexpectedAppendOffsetException this is the * best we can do from this module. */ @@ -311,7 +311,7 @@ public class MockLog implements ReplicatedLog { String.format( "Illegal append at offset %s with current end offset of %s", batch.baseOffset(), - endOffset().offset + endOffset().offset() ) ); } @@ -340,7 +340,7 @@ public class MockLog implements ReplicatedLog { @Override public void flush(boolean forceFlushActiveSegment) { flushedSinceLastChecked = true; - firstUnflushedOffset = endOffset().offset; + firstUnflushedOffset = endOffset().offset(); } @Override @@ -373,7 +373,7 @@ public class MockLog implements ReplicatedLog { public List<LogBatch> readBatches(long startOffset, OptionalLong maxOffsetOpt) { verifyOffsetInRange(startOffset); - long maxOffset = maxOffsetOpt.orElse(endOffset().offset); + long maxOffset = maxOffsetOpt.orElse(endOffset().offset()); if (startOffset == maxOffset) { return Collections.emptyList(); } @@ -384,9 +384,9 @@ public class MockLog implements ReplicatedLog { } private void verifyOffsetInRange(long offset) { - if (offset > endOffset().offset) { + if (offset > endOffset().offset()) { throw new OffsetOutOfRangeException("Requested offset " + offset + " is larger than " + - "then log end offset " + endOffset().offset); + "then log end offset " + endOffset().offset()); } if (offset < this.startOffset()) { @@ -399,7 +399,7 @@ public class MockLog implements ReplicatedLog { public LogFetchInfo read(long startOffset, Isolation isolation) { verifyOffsetInRange(startOffset); - long maxOffset = isolation == Isolation.COMMITTED ? highWatermark.offset : endOffset().offset; + long maxOffset = isolation == Isolation.COMMITTED ? highWatermark.offset() : endOffset().offset(); if (startOffset >= maxOffset) { return new LogFetchInfo(MemoryRecords.EMPTY, new LogOffsetMetadata( startOffset, metadataForOffset(startOffset))); @@ -450,7 +450,7 @@ public class MockLog implements ReplicatedLog { @Override public void initializeLeaderEpoch(int epoch) { - long startOffset = endOffset().offset; + long startOffset = endOffset().offset(); epochStartOffsets.removeIf(epochStartOffset -> epochStartOffset.startOffset >= startOffset || epochStartOffset.epoch >= epoch); epochStartOffsets.add(new EpochStartOffset(epoch, startOffset)); @@ -468,7 +468,7 @@ public class MockLog implements ReplicatedLog { return Optional.empty(); } - long highWatermarkOffset = highWatermark().offset; + long highWatermarkOffset = highWatermark().offset(); if (snapshotId.offset() > highWatermarkOffset) { throw new IllegalArgumentException( String.format( @@ -542,12 +542,12 @@ public class MockLog implements ReplicatedLog { ) ); } - if (highWatermark.offset < snapshotId.offset()) { + if (highWatermark.offset() < snapshotId.offset()) { throw new OffsetOutOfRangeException( String.format( "New log start (%s) is greater than the high watermark (%s)", snapshotId, - highWatermark.offset + highWatermark.offset() ) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index c56bbe01adb..0c389a41522 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -87,15 +87,15 @@ public class MockLogTest { appendAsLeader(Collections.singleton(recordThree), epoch); assertEquals(0L, log.startOffset()); - assertEquals(3L, log.endOffset().offset); + assertEquals(3L, log.endOffset().offset()); log.truncateTo(2); assertEquals(0L, log.startOffset()); - assertEquals(2L, log.endOffset().offset); + assertEquals(2L, log.endOffset().offset()); log.truncateTo(1); assertEquals(0L, log.startOffset()); - assertEquals(0L, log.endOffset().offset); + assertEquals(0L, log.endOffset().offset()); } @Test @@ -113,7 +113,7 @@ public class MockLogTest { appendBatch(5, 1); LogOffsetMetadata newOffset = new LogOffsetMetadata(5L); log.updateHighWatermark(newOffset); - assertEquals(newOffset.offset, log.highWatermark().offset); + assertEquals(newOffset.offset(), log.highWatermark().offset()); } @Test @@ -158,7 +158,7 @@ public class MockLogTest { public void testUnexpectedAppendOffset() { SimpleRecord recordFoo = new SimpleRecord("foo".getBytes()); final int currentEpoch = 3; - final long initialOffset = log.endOffset().offset; + final long initialOffset = log.endOffset().offset(); log.appendAsLeader( MemoryRecords.withRecords(initialOffset, Compression.NONE, currentEpoch, recordFoo), @@ -198,7 +198,7 @@ public class MockLogTest { ); assertEquals(0, log.startOffset()); - assertEquals(1, log.endOffset().offset); + assertEquals(1, log.endOffset().offset()); assertEquals(currentEpoch, log.lastFetchedEpoch()); Records records = log.read(0, Isolation.UNCOMMITTED).records; @@ -230,7 +230,7 @@ public class MockLogTest { log.appendAsFollower(MemoryRecords.withRecords(initialOffset, Compression.NONE, epoch, recordFoo)); assertEquals(initialOffset, log.startOffset()); - assertEquals(initialOffset + 1, log.endOffset().offset); + assertEquals(initialOffset + 1, log.endOffset().offset()); assertEquals(3, log.lastFetchedEpoch()); Records records = log.read(5L, Isolation.UNCOMMITTED).records; @@ -242,7 +242,7 @@ public class MockLogTest { assertEquals(1, extractRecords.size()); assertEquals(recordFoo.value(), extractRecords.get(0)); assertEquals(new OffsetAndEpoch(5, 0), log.endOffsetForEpoch(0)); - assertEquals(new OffsetAndEpoch(log.endOffset().offset, epoch), log.endOffsetForEpoch(epoch)); + assertEquals(new OffsetAndEpoch(log.endOffset().offset(), epoch), log.endOffsetForEpoch(epoch)); } @Test @@ -330,12 +330,12 @@ public class MockLogTest { appendBatch(5, 1); LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED); - assertEquals(5L, readInfo.startOffsetMetadata.offset); - assertTrue(readInfo.startOffsetMetadata.metadata.isPresent()); + assertEquals(5L, readInfo.startOffsetMetadata.offset()); + assertTrue(readInfo.startOffsetMetadata.metadata().isPresent()); // Update to a high watermark with valid offset metadata log.updateHighWatermark(readInfo.startOffsetMetadata); - assertEquals(readInfo.startOffsetMetadata.offset, log.highWatermark().offset); + assertEquals(readInfo.startOffsetMetadata.offset(), log.highWatermark().offset()); // Now update to a high watermark with invalid metadata assertThrows(IllegalArgumentException.class, () -> @@ -344,8 +344,8 @@ public class MockLogTest { // Ensure we can update the high watermark to the end offset LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED); - assertEquals(15, readFromEndInfo.startOffsetMetadata.offset); - assertTrue(readFromEndInfo.startOffsetMetadata.metadata.isPresent()); + assertEquals(15, readFromEndInfo.startOffsetMetadata.offset()); + assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent()); log.updateHighWatermark(readFromEndInfo.startOffsetMetadata); // Ensure that the end offset metadata is valid after new entries are appended @@ -393,14 +393,14 @@ public class MockLogTest { assertThrows(OffsetOutOfRangeException.class, () -> log.read(log.startOffset() - 1, Isolation.UNCOMMITTED)); - assertThrows(OffsetOutOfRangeException.class, () -> log.read(log.endOffset().offset + 1, + assertThrows(OffsetOutOfRangeException.class, () -> log.read(log.endOffset().offset() + 1, Isolation.UNCOMMITTED)); } @Test public void testMonotonicEpochStartOffset() { appendBatch(5, 1); - assertEquals(5L, log.endOffset().offset); + assertEquals(5L, log.endOffset().offset()); log.initializeLeaderEpoch(2); assertEquals(new OffsetAndEpoch(5L, 1), log.endOffsetForEpoch(1)); @@ -424,7 +424,7 @@ public class MockLogTest { appendBatch(10, 4); log.reopen(); - assertEquals(15L, log.endOffset().offset); + assertEquals(15L, log.endOffset().offset()); assertEquals(2, log.lastFetchedEpoch()); } @@ -601,7 +601,7 @@ public class MockLogTest { assertTrue(log.deleteBeforeSnapshot(snapshotId)); assertEquals(offset, log.startOffset()); assertEquals(epoch, log.lastFetchedEpoch()); - assertEquals(offset, log.endOffset().offset); + assertEquals(offset, log.endOffset().offset()); int newRecords = 10; appendBatch(newRecords, epoch + 1); @@ -612,8 +612,8 @@ public class MockLogTest { assertEquals(offset, log.startOffset()); assertEquals(epoch + 1, log.lastFetchedEpoch()); - assertEquals(offset + newRecords, log.endOffset().offset); - assertEquals(offset + newRecords, log.highWatermark().offset); + assertEquals(offset + newRecords, log.endOffset().offset()); + assertEquals(offset + newRecords, log.highWatermark().offset()); } @Test @@ -627,8 +627,8 @@ public class MockLogTest { assertFalse(log.deleteBeforeSnapshot(new OffsetAndEpoch(1, epoch))); assertEquals(0, log.startOffset()); assertEquals(epoch, log.lastFetchedEpoch()); - assertEquals(offset, log.endOffset().offset); - assertEquals(offset, log.highWatermark().offset); + assertEquals(offset, log.endOffset().offset()); + assertEquals(offset, log.highWatermark().offset()); } @Test @@ -665,8 +665,8 @@ public class MockLogTest { assertTrue(log.truncateToLatestSnapshot()); assertEquals(sameEpochSnapshotId.offset(), log.startOffset()); assertEquals(sameEpochSnapshotId.epoch(), log.lastFetchedEpoch()); - assertEquals(sameEpochSnapshotId.offset(), log.endOffset().offset); - assertEquals(sameEpochSnapshotId.offset(), log.highWatermark().offset); + assertEquals(sameEpochSnapshotId.offset(), log.endOffset().offset()); + assertEquals(sameEpochSnapshotId.offset(), log.highWatermark().offset()); OffsetAndEpoch greaterEpochSnapshotId = new OffsetAndEpoch(3 * numberOfRecords, epoch + 1); @@ -679,8 +679,8 @@ public class MockLogTest { assertTrue(log.truncateToLatestSnapshot()); assertEquals(greaterEpochSnapshotId.offset(), log.startOffset()); assertEquals(greaterEpochSnapshotId.epoch(), log.lastFetchedEpoch()); - assertEquals(greaterEpochSnapshotId.offset(), log.endOffset().offset); - assertEquals(greaterEpochSnapshotId.offset(), log.highWatermark().offset); + assertEquals(greaterEpochSnapshotId.offset(), log.endOffset().offset()); + assertEquals(greaterEpochSnapshotId.offset(), log.highWatermark().offset()); } @Test @@ -764,7 +764,7 @@ public class MockLogTest { appendBatch(numberOfRecords, epoch); ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1); - assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(log.endOffset().offset, epoch)), + assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(log.endOffset().offset(), epoch)), resultOffsetAndEpoch); } @@ -859,7 +859,7 @@ public class MockLogTest { appendBatch(numberOfRecords, epoch); ValidOffsetAndEpoch resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords + 1, epoch); - assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(log.endOffset().offset, epoch)), + assertEquals(ValidOffsetAndEpoch.diverging(new OffsetAndEpoch(log.endOffset().offset(), epoch)), resultOffsetAndEpoch); } @@ -952,7 +952,7 @@ public class MockLogTest { private void appendAsLeader(Collection<SimpleRecord> records, int epoch) { log.appendAsLeader( MemoryRecords.withRecords( - log.endOffset().offset, + log.endOffset().offset(), Compression.NONE, records.toArray(new SimpleRecord[records.size()]) ), @@ -971,10 +971,10 @@ public class MockLogTest { private static void validateReadRecords(List<SimpleRecord> expectedRecords, MockLog log) { assertEquals(0L, log.startOffset()); - assertEquals(expectedRecords.size(), log.endOffset().offset); + assertEquals(expectedRecords.size(), log.endOffset().offset()); int currentOffset = 0; - while (currentOffset < log.endOffset().offset) { + while (currentOffset < log.endOffset().offset()) { Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records; List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator()); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 04374e57a39..573c92b1120 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -209,7 +209,7 @@ public final class RaftClientTestContext { public Builder appendToLog(int epoch, List<String> records) { MemoryRecords batch = buildBatch( time.milliseconds(), - log.endOffset().offset, + log.endOffset().offset(), epoch, records ); @@ -233,7 +233,7 @@ public final class RaftClientTestContext { } Builder deleteBeforeSnapshot(OffsetAndEpoch snapshotId) { - if (snapshotId.offset() > log.highWatermark().offset) { + if (snapshotId.offset() > log.highWatermark().offset()) { log.updateHighWatermark(new LogOffsetMetadata(snapshotId.offset())); } log.deleteBeforeSnapshot(snapshotId); @@ -462,7 +462,7 @@ public final class RaftClientTestContext { pollUntilRequest(); List<RaftRequest.Outbound> voteRequests = collectVoteRequests(epoch, - log.lastFetchedEpoch(), log.endOffset().offset); + log.lastFetchedEpoch(), log.endOffset().offset()); for (RaftRequest.Outbound request : voteRequests) { VoteResponseData voteResponse = voteResponse(true, OptionalInt.empty(), epoch); @@ -1511,7 +1511,7 @@ public final class RaftClientTestContext { public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException { assertEquals(localId, currentLeader()); - long localLogEndOffset = log.endOffset().offset; + long localLogEndOffset = log.endOffset().offset(); Iterable<ReplicaKey> followers = () -> voters .voterKeys() diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 30c2bbf39aa..6e9f364c8e3 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -866,12 +866,12 @@ public class RaftEventSimulationTest { long highWatermark() { return client.quorum().highWatermark() - .map(hw -> hw.offset) + .map(LogOffsetMetadata::offset) .orElse(0L); } long logEndOffset() { - return log.endOffset().offset; + return log.endOffset().offset(); } @Override @@ -1016,7 +1016,7 @@ public class RaftEventSimulationTest { cluster.leaderHighWatermark().ifPresent(highWatermark -> { long numReachedHighWatermark = cluster.nodes.entrySet().stream() .filter(entry -> cluster.voters.containsKey(entry.getKey())) - .filter(entry -> entry.getValue().log.endOffset().offset >= highWatermark) + .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) .count(); assertTrue( numReachedHighWatermark >= cluster.majoritySize(), diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java index fc7c9724048..39b5ce3e589 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java @@ -78,7 +78,7 @@ final class KRaftControlRecordStateMachineTest { short kraftVersion = 1; log.appendAsLeader( MemoryRecords.withKRaftVersionRecord( - log.endOffset().offset, + log.endOffset().offset(), 0, epoch, bufferSupplier.get(300), @@ -91,7 +91,7 @@ final class KRaftControlRecordStateMachineTest { VoterSet voterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true)); log.appendAsLeader( MemoryRecords.withVotersRecord( - log.endOffset().offset, + log.endOffset().offset(), 0, epoch, bufferSupplier.get(300), @@ -104,8 +104,8 @@ final class KRaftControlRecordStateMachineTest { partitionState.updateState(); assertEquals(voterSet, partitionState.lastVoterSet()); - assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset - 1)); - assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset - 1)); + assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1)); + assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1)); } @Test @@ -129,7 +129,7 @@ final class KRaftControlRecordStateMachineTest { short kraftVersion = 1; log.appendAsLeader( MemoryRecords.withKRaftVersionRecord( - log.endOffset().offset, + log.endOffset().offset(), 0, epoch, bufferSupplier.get(300), @@ -142,7 +142,7 @@ final class KRaftControlRecordStateMachineTest { VoterSet voterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true)); log.appendAsLeader( MemoryRecords.withVotersRecord( - log.endOffset().offset, + log.endOffset().offset(), 0, epoch, bufferSupplier.get(300), @@ -155,8 +155,8 @@ final class KRaftControlRecordStateMachineTest { partitionState.updateState(); assertEquals(voterSet, partitionState.lastVoterSet()); - assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset - 1)); - assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset - 1)); + assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1)); + assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1)); } @Test @@ -184,8 +184,8 @@ final class KRaftControlRecordStateMachineTest { partitionState.updateState(); assertEquals(voterSet, partitionState.lastVoterSet()); - assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset - 1)); - assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset - 1)); + assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1)); + assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1)); } @Test @@ -215,7 +215,7 @@ final class KRaftControlRecordStateMachineTest { VoterSet voterSet = snapshotVoterSet.addVoter(VoterSetTest.voterNode(7, true)).get(); log.appendAsLeader( MemoryRecords.withVotersRecord( - log.endOffset().offset, + log.endOffset().offset(), 0, epoch, bufferSupplier.get(300), @@ -228,8 +228,8 @@ final class KRaftControlRecordStateMachineTest { partitionState.updateState(); assertEquals(voterSet, partitionState.lastVoterSet()); - assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset - 1)); - assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset - 1)); + assertEquals(Optional.of(voterSet), partitionState.voterSetAtOffset(log.endOffset().offset() - 1)); + assertEquals(kraftVersion, partitionState.kraftVersionAtOffset(log.endOffset().offset() - 1)); // Check the voter set at the snapshot assertEquals(Optional.of(snapshotVoterSet), partitionState.voterSetAtOffset(snapshotId.offset() - 1)); @@ -248,7 +248,7 @@ final class KRaftControlRecordStateMachineTest { short kraftVersion = 1; log.appendAsLeader( MemoryRecords.withKRaftVersionRecord( - log.endOffset().offset, + log.endOffset().offset(), 0, epoch, bufferSupplier.get(300), @@ -258,7 +258,7 @@ final class KRaftControlRecordStateMachineTest { ); // Append the voter set control record - long firstVoterSetOffset = log.endOffset().offset; + long firstVoterSetOffset = log.endOffset().offset(); VoterSet firstVoterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true)); log.appendAsLeader( MemoryRecords.withVotersRecord( @@ -272,7 +272,7 @@ final class KRaftControlRecordStateMachineTest { ); // Append another voter set control record - long voterSetOffset = log.endOffset().offset; + long voterSetOffset = log.endOffset().offset(); VoterSet voterSet = firstVoterSet.addVoter(VoterSetTest.voterNode(7, true)).get(); log.appendAsLeader( MemoryRecords.withVotersRecord( @@ -313,7 +313,7 @@ final class KRaftControlRecordStateMachineTest { KRaftControlRecordStateMachine partitionState = buildPartitionListener(log, Optional.of(staticVoterSet)); // Append the kraft.version control record - long kraftVersionOffset = log.endOffset().offset; + long kraftVersionOffset = log.endOffset().offset(); short kraftVersion = 1; log.appendAsLeader( MemoryRecords.withKRaftVersionRecord( @@ -327,7 +327,7 @@ final class KRaftControlRecordStateMachineTest { ); // Append the voter set control record - long firstVoterSetOffset = log.endOffset().offset; + long firstVoterSetOffset = log.endOffset().offset(); VoterSet firstVoterSet = VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(4, 5, 6), true)); log.appendAsLeader( MemoryRecords.withVotersRecord( @@ -341,7 +341,7 @@ final class KRaftControlRecordStateMachineTest { ); // Append another voter set control record - long voterSetOffset = log.endOffset().offset; + long voterSetOffset = log.endOffset().offset(); VoterSet voterSet = firstVoterSet.addVoter(VoterSetTest.voterNode(7, true)).get(); log.appendAsLeader( MemoryRecords.withVotersRecord(