This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c7f051914e KAFKA-13888; Implement `LastFetchTimestamp` and in `LastCaughtUpTimestamp` for DescribeQuorumResponse [KIP-836] (#12508) c7f051914e is described below commit c7f051914e5d20e137901b6d687d6e7cf8775df1 Author: Niket <niket-g...@users.noreply.github.com> AuthorDate: Fri Aug 19 15:09:09 2022 -0700 KAFKA-13888; Implement `LastFetchTimestamp` and in `LastCaughtUpTimestamp` for DescribeQuorumResponse [KIP-836] (#12508) This commit implements the newly added fields `LastFetchTimestamp` and `LastCaughtUpTimestamp` for KIP-836: https://cwiki.apache.org/confluence/display/KAFKA/KIP-836:+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../main/scala/kafka/network/RequestChannel.scala | 9 +- .../kafka/server/KRaftClusterTest.scala | 55 +++++++- .../kafka/server/DescribeQuorumRequestTest.scala | 9 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 16 +-- .../java/org/apache/kafka/raft/LeaderState.java | 94 ++++++++++---- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 22 +++- .../org/apache/kafka/raft/LeaderStateTest.java | 140 +++++++++++++++++---- .../kafka/raft/internals/KafkaRaftMetricsTest.java | 2 +- 8 files changed, 281 insertions(+), 66 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 4fa611206a..2200757c70 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -121,10 +121,17 @@ object RequestChannel extends Logging { def isForwarded: Boolean = envelope.isDefined + private def shouldReturnNotController(response: AbstractResponse): Boolean = { + response match { + case describeQuorumResponse: DescribeQuorumResponse => response.errorCounts.containsKey(Errors.NOT_LEADER_OR_FOLLOWER) + case _ => response.errorCounts.containsKey(Errors.NOT_CONTROLLER) + } + } + def buildResponseSend(abstractResponse: AbstractResponse): Send = { envelope match { case Some(request) => - val envelopeResponse = if (abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) { + val envelopeResponse = if (shouldReturnNotController(abstractResponse)) { // Since it's a NOT_CONTROLLER error response, we need to make envelope response with NOT_CONTROLLER error // to notify the requester (i.e. BrokerToControllerRequestThread) to update active controller new EnvelopeResponse(new EnvelopeResponseData() diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 509facf921..a16cf821d4 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -21,7 +21,7 @@ import kafka.network.SocketServer import kafka.server.IntegrationTestUtils.connectAndReceive import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes} import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, ConfigEntry, NewPartitionReassignment, NewTopic} +import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, NewPartitionReassignment, NewTopic} import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo} import org.apache.kafka.common.message.DescribeClusterRequestData import org.apache.kafka.common.network.ListenerName @@ -32,7 +32,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{Tag, Test, Timeout} import java.util -import java.util.{Arrays, Collections, Optional} +import java.util.{Arrays, Collections, Optional, OptionalLong, Properties} import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type @@ -778,4 +778,55 @@ class KRaftClusterTest { cluster.close() } } + def createAdminClient(cluster: KafkaClusterTestKit): Admin = { + var props: Properties = null + props = cluster.clientProperties() + props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) + Admin.create(props) + } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format + cluster.startup + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val admin = createAdminClient(cluster) + try { + val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + val quorumInfo = quorumState.quorumInfo.get() + + assertEquals(cluster.controllers.asScala.keySet, quorumInfo.voters.asScala.map(_.replicaId).toSet) + assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId), + s"Leader ID ${quorumInfo.leaderId} was not a controller ID.") + + quorumInfo.voters.forEach { voter => + assertTrue(0 < voter.logEndOffset, + s"logEndOffset for voter with ID ${voter.replicaId} was ${voter.logEndOffset}") + assertNotEquals(OptionalLong.empty(), voter.lastFetchTimeMs) + assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimeMs) + } + + assertEquals(cluster.brokers.asScala.keySet, quorumInfo.observers.asScala.map(_.replicaId).toSet) + quorumInfo.observers.forEach { observer => + assertTrue(0 < observer.logEndOffset, + s"logEndOffset for observer with ID ${observer.replicaId} was ${observer.logEndOffset}") + assertNotEquals(OptionalLong.empty(), observer.lastFetchTimeMs) + assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimeMs) + } + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + } diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index b53004b2ea..eed58961e4 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -87,8 +87,13 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { (voterData ++ observerData).foreach { state => assertTrue(0 < state.logEndOffset) - assertEquals(-1, state.lastFetchTimestamp) - assertEquals(-1, state.lastCaughtUpTimestamp) + if (version == 0) { + assertEquals(-1, state.lastFetchTimestamp) + assertEquals(-1, state.lastCaughtUpTimestamp) + } else { + assertNotEquals(-1, state.lastFetchTimestamp) + assertNotEquals(-1, state.lastCaughtUpTimestamp) + } } } } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 042a141a76..69d2025b6b 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; import org.apache.kafka.common.message.DescribeQuorumRequestData; import org.apache.kafka.common.message.DescribeQuorumResponseData; -import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState; import org.apache.kafka.common.message.EndQuorumEpochRequestData; import org.apache.kafka.common.message.EndQuorumEpochResponseData; import org.apache.kafka.common.message.FetchRequestData; @@ -95,7 +94,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import java.util.stream.Collectors; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition; @@ -1016,7 +1014,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> { if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) { LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); - if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { + if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata, log.endOffset().offset)) { onUpdateLeaderHighWatermark(state, currentTimeMs); } @@ -1182,8 +1180,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> { leaderState.localId(), leaderState.epoch(), leaderState.highWatermark().isPresent() ? leaderState.highWatermark().get().offset : -1, - convertToReplicaStates(leaderState.getVoterEndOffsets()), - convertToReplicaStates(leaderState.getObserverStates(currentTimeMs)) + leaderState.quorumResponseVoterStates(currentTimeMs), + leaderState.quorumResponseObserverStates(currentTimeMs) ); } @@ -1421,14 +1419,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> { return true; } - List<ReplicaState> convertToReplicaStates(Map<Integer, Long> replicaEndOffsets) { - return replicaEndOffsets.entrySet().stream() - .map(entry -> new ReplicaState() - .setReplicaId(entry.getKey()) - .setLogEndOffset(entry.getValue())) - .collect(Collectors.toList()); - } - private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) { // Only elected leaders are sent in the request/response header, so if we have an elected // leaderId, it should be consistent with what is in the message. diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 8717d4e4d2..0b8ebad8bd 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.message.DescribeQuorumResponseData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.raft.internals.BatchAccumulator; import org.slf4j.Logger; @@ -25,6 +26,7 @@ import org.apache.kafka.common.message.LeaderChangeMessage.Voter; import org.apache.kafka.common.record.ControlRecordUtils; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -205,10 +207,10 @@ public class LeaderState<T> implements EpochState { /** * Update the local replica state. * - * See {@link #updateReplicaState(int, long, LogOffsetMetadata)} + * See {@link #updateReplicaState(int, long, LogOffsetMetadata, long)} */ public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata logOffsetMetadata) { - return updateReplicaState(localId, fetchTimestamp, logOffsetMetadata); + return updateReplicaState(localId, fetchTimestamp, logOffsetMetadata, logOffsetMetadata.offset); } /** @@ -217,11 +219,15 @@ public class LeaderState<T> implements EpochState { * @param replicaId replica id * @param fetchTimestamp fetch timestamp * @param logOffsetMetadata new log offset and metadata + * @param leaderLogEndOffset current log end offset of the leader * @return true if the high watermark is updated too */ - public boolean updateReplicaState(int replicaId, - long fetchTimestamp, - LogOffsetMetadata logOffsetMetadata) { + public boolean updateReplicaState( + int replicaId, + long fetchTimestamp, + LogOffsetMetadata logOffsetMetadata, + long leaderLogEndOffset + ) { // Ignore fetches from negative replica id, as it indicates // the fetch is from non-replica. For example, a consumer. if (replicaId < 0) { @@ -229,7 +235,18 @@ public class LeaderState<T> implements EpochState { } ReplicaState state = getReplicaState(replicaId); - state.updateFetchTimestamp(fetchTimestamp); + + // Only proceed with updating the states if the offset update is valid + verifyEndOffsetUpdate(state, logOffsetMetadata); + + // Update the Last CaughtUp Time + if (logOffsetMetadata.offset >= leaderLogEndOffset) { + state.updateLastCaughtUpTimestamp(fetchTimestamp); + } else if (logOffsetMetadata.offset >= state.lastFetchLeaderLogEndOffset.orElse(-1L)) { + state.updateLastCaughtUpTimestamp(state.lastFetchTimestamp.orElse(-1L)); + } + + state.updateFetchTimestamp(fetchTimestamp, leaderLogEndOffset); return updateEndOffset(state, logOffsetMetadata); } @@ -246,8 +263,10 @@ public class LeaderState<T> implements EpochState { .collect(Collectors.toList()); } - private boolean updateEndOffset(ReplicaState state, - LogOffsetMetadata endOffsetMetadata) { + private void verifyEndOffsetUpdate( + ReplicaState state, + LogOffsetMetadata endOffsetMetadata + ) { state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset > endOffsetMetadata.offset) { if (state.nodeId == localId) { @@ -259,7 +278,11 @@ public class LeaderState<T> implements EpochState { } } }); - + } + private boolean updateEndOffset( + ReplicaState state, + LogOffsetMetadata endOffsetMetadata + ) { state.endOffset = Optional.of(endOffsetMetadata); state.hasAcknowledgedLeader = true; return isVoter(state.nodeId) && updateHighWatermark(); @@ -290,22 +313,36 @@ public class LeaderState<T> implements EpochState { return state; } - Map<Integer, Long> getVoterEndOffsets() { - return getReplicaEndOffsets(voterStates); + List<DescribeQuorumResponseData.ReplicaState> quorumResponseVoterStates(long currentTimeMs) { + return quorumResponseReplicaStates(voterStates.values(), localId, currentTimeMs); } - Map<Integer, Long> getObserverStates(final long currentTimeMs) { + List<DescribeQuorumResponseData.ReplicaState> quorumResponseObserverStates(long currentTimeMs) { clearInactiveObservers(currentTimeMs); - return getReplicaEndOffsets(observerStates); + return quorumResponseReplicaStates(observerStates.values(), localId, currentTimeMs); } - private static <R extends ReplicaState> Map<Integer, Long> getReplicaEndOffsets( - Map<Integer, R> replicaStates) { - return replicaStates.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> e.getValue().endOffset.map( - logOffsetMetadata -> logOffsetMetadata.offset).orElse(-1L)) - ); + private static List<DescribeQuorumResponseData.ReplicaState> quorumResponseReplicaStates( + Collection<ReplicaState> state, + int leaderId, + long currentTimeMs + ) { + return state.stream().map(s -> { + final long lastCaughtUpTimestamp; + final long lastFetchTimestamp; + if (s.nodeId == leaderId) { + lastCaughtUpTimestamp = currentTimeMs; + lastFetchTimestamp = currentTimeMs; + } else { + lastCaughtUpTimestamp = s.lastCaughtUpTimestamp.orElse(-1); + lastFetchTimestamp = s.lastFetchTimestamp.orElse(-1); + } + return new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(s.nodeId) + .setLogEndOffset(s.endOffset.map(md -> md.offset).orElse(-1L)) + .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) + .setLastFetchTimestamp(lastFetchTimestamp); + }).collect(Collectors.toList()); } private void clearInactiveObservers(final long currentTimeMs) { @@ -323,19 +360,30 @@ public class LeaderState<T> implements EpochState { final int nodeId; Optional<LogOffsetMetadata> endOffset; OptionalLong lastFetchTimestamp; + OptionalLong lastFetchLeaderLogEndOffset; + OptionalLong lastCaughtUpTimestamp; boolean hasAcknowledgedLeader; public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) { this.nodeId = nodeId; this.endOffset = Optional.empty(); this.lastFetchTimestamp = OptionalLong.empty(); + this.lastFetchLeaderLogEndOffset = OptionalLong.empty(); + this.lastCaughtUpTimestamp = OptionalLong.empty(); this.hasAcknowledgedLeader = hasAcknowledgedLeader; } - void updateFetchTimestamp(long currentFetchTimeMs) { + void updateFetchTimestamp(long currentFetchTimeMs, long leaderLogEndOffset) { // To be resilient to system time shifts we do not strictly // require the timestamp be monotonically increasing. lastFetchTimestamp = OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs)); + lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset); + } + + void updateLastCaughtUpTimestamp(long lastCaughtUpTime) { + // This value relies on the fetch timestamp which does not + // require monotonicity + lastCaughtUpTimestamp = OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime)); } @Override @@ -353,10 +401,12 @@ public class LeaderState<T> implements EpochState { @Override public String toString() { return String.format( - "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, hasAcknowledgedLeader=%s)", + "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, " + + "lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)", nodeId, endOffset, lastFetchTimestamp, + lastCaughtUpTimestamp, hasAcknowledgedLeader ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index a8a346e6db..678648505b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1985,6 +1985,7 @@ public class KafkaRaftClientTest { RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); + long laggingFollowerFetchTime = context.time.milliseconds(); context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0)); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(1L, epoch); @@ -1992,16 +1993,21 @@ public class KafkaRaftClientTest { context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar")); context.client.poll(); + context.time.sleep(100); + long closeFollowerFetchTime = context.time.milliseconds(); context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0)); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(3L, epoch); // Create observer int observerId = 3; + context.time.sleep(100); + long observerFetchTime = context.time.milliseconds(); context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 0)); context.pollUntilResponse(); context.assertSentFetchPartitionResponse(3L, epoch); + context.time.sleep(100); context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition)); context.pollUntilResponse(); @@ -2011,17 +2017,25 @@ public class KafkaRaftClientTest { .setReplicaId(localId) // As we are appending the records directly to the log, // the leader end offset hasn't been updated yet. - .setLogEndOffset(3L), + .setLogEndOffset(3L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), new ReplicaState() .setReplicaId(laggingFollower) - .setLogEndOffset(1L), + .setLogEndOffset(1L) + .setLastFetchTimestamp(laggingFollowerFetchTime) + .setLastCaughtUpTimestamp(laggingFollowerFetchTime), new ReplicaState() .setReplicaId(closeFollower) - .setLogEndOffset(3)), + .setLogEndOffset(3L) + .setLastFetchTimestamp(closeFollowerFetchTime) + .setLastCaughtUpTimestamp(closeFollowerFetchTime)), singletonList( new ReplicaState() .setReplicaId(observerId) - .setLogEndOffset(0L))); + .setLogEndOffset(0L) + .setLastFetchTimestamp(observerFetchTime) + .setLastCaughtUpTimestamp(-1L))); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 5f9989d55e..fa54d5cbc6 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.message.DescribeQuorumResponseData; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -31,7 +32,9 @@ import java.util.Collections; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -119,6 +122,93 @@ public class LeaderStateTest { () -> state.updateLocalState(0, new LogOffsetMetadata(15L))); } + @Test + public void testLastCaughtUpTimeVoters() { + int node1 = 1; + int node2 = 2; + int currentTime = 1000; + int fetchTime = 0; + int caughtupTime = -1; + LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 10L); + assertEquals(Optional.empty(), state.highWatermark()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); + assertEquals(Optional.empty(), state.highWatermark()); + + // Node 1 falls behind + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L), 11L)); + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L), 11L)); + caughtupTime = fetchTime; + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + + // Node 1 falls behind + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L), 100L)); + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + + // Node 1 catches up to the last fetch offset + int prevFetchTime = fetchTime; + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L), 200L)); + caughtupTime = prevFetchTime; + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + + // Node2 has never caught up to leader + assertEquals(-1L, state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp()); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(202L), 300L)); + assertEquals(-1L, state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp()); + assertFalse(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L), 300L)); + assertEquals(-1L, state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp()); + } + + @Test + public void testLastCaughtUpTimeObserver() { + int node1Index = 0; + int node1Id = 1; + int currentTime = 1000; + int fetchTime = 0; + int caughtUpTime = -1; + LeaderState<?> state = newLeaderState(singleton(localId), 5L); + assertEquals(Optional.empty(), state.highWatermark()); + assertEquals(emptySet(), state.nonAcknowledgingVoters()); + + // Node 1 falls behind + assertTrue(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(10L), 11L)); + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(11L), 11L)); + caughtUpTime = fetchTime; + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + + // Node 1 falls behind + assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(50L), 100L)); + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + + // Node 1 catches up to the last fetch offset + int prevFetchTime = fetchTime; + assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(102L), 200L)); + caughtUpTime = prevFetchTime; + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + + // Node 1 catches up to leader + assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(202L), 200L)); + caughtUpTime = fetchTime; + assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + } + @Test public void testIdempotentEndOffsetUpdate() { LeaderState<?> state = newLeaderState(singleton(localId), 15L); @@ -149,12 +239,12 @@ public class LeaderStateTest { assertFalse(state.updateLocalState(0, new LogOffsetMetadata(13L))); assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L), 11L)); assertEquals(emptySet(), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); + assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L), 12L)); assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); - assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); + assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L), 14L)); assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); } @@ -166,19 +256,19 @@ public class LeaderStateTest { assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L))); assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L), 11L)); assertEquals(singleton(node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L), 11L)); assertEquals(emptySet(), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L), 16L)); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L), 21L)); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); - assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L))); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L), 21L)); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } @@ -188,13 +278,13 @@ public class LeaderStateTest { int node1 = 1; LeaderState<?> state = newLeaderState(mkSet(localId, node1), 0L); state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(10L)); - state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L)); + state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L), 11L); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); // Follower crashes and disk is lost. It fetches an earlier offset to rebuild state. // The leader will report an error in the logs, but will not let the high watermark rewind - assertFalse(state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L))); - assertEquals(5L, state.getVoterEndOffsets().get(node1)); + assertFalse(state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L), 11L)); + assertEquals(5L, state.quorumResponseVoterStates(time.milliseconds()).get(node1).logEndOffset()); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); } @@ -224,7 +314,9 @@ public class LeaderStateTest { mkEntry(localId, leaderEndOffset), mkEntry(node1, leaderStartOffset), mkEntry(node2, leaderEndOffset) - ), state.getVoterEndOffsets()); + ), state.quorumResponseVoterStates(0) + .stream() + .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset))); } private LeaderState<?> setUpLeaderAndFollowers(int follower1, @@ -234,8 +326,8 @@ public class LeaderStateTest { LeaderState<?> state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset); state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset)); assertEquals(Optional.empty(), state.highWatermark()); - state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset)); - state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset)); + state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset), leaderEndOffset); + state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset), leaderEndOffset); return state; } @@ -246,9 +338,12 @@ public class LeaderStateTest { LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset); long timestamp = 20L; - assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset))); + assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10)); - assertEquals(Collections.singletonMap(observerId, epochStartOffset), state.getObserverStates(timestamp)); + assertEquals(Collections.singletonMap(observerId, epochStartOffset), + state.quorumResponseObserverStates(timestamp) + .stream() + .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset))); } @Test @@ -257,9 +352,9 @@ public class LeaderStateTest { long epochStartOffset = 10L; LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset); - assertFalse(state.updateReplicaState(observerId, 0, new LogOffsetMetadata(epochStartOffset))); + assertFalse(state.updateReplicaState(observerId, 0, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10)); - assertEquals(Collections.emptyMap(), state.getObserverStates(10)); + assertEquals(emptyList(), state.quorumResponseObserverStates(10)); } @Test @@ -269,11 +364,14 @@ public class LeaderStateTest { long epochStartOffset = 10L; LeaderState<?> state = newLeaderState(mkSet(localId), epochStartOffset); - state.updateReplicaState(observerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset)); - assertEquals(singleton(observerId), state.getObserverStates(time.milliseconds()).keySet()); + state.updateReplicaState(observerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10); + assertEquals(singleton(observerId), + state.quorumResponseObserverStates(time.milliseconds()) + .stream().map(o -> o.replicaId()) + .collect(Collectors.toSet())); time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS); - assertEquals(emptySet(), state.getObserverStates(time.milliseconds()).keySet()); + assertEquals(emptyList(), state.quorumResponseObserverStates(time.milliseconds())); } @ParameterizedTest diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index 0d64eac1cc..cc2700bb17 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -103,7 +103,7 @@ public class KafkaRaftMetricsTest { assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.leaderStateOrThrow().updateLocalState(0, new LogOffsetMetadata(5L)); - state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); + state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L), 6L); assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1);