This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fd9a20e KAFKA-8429; Handle offset change when OffsetForLeaderEpoch inflight (#6811) fd9a20e is described below commit fd9a20e4167c51c6645c55ed98800b768518c863 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Thu May 30 08:50:45 2019 -0700 KAFKA-8429; Handle offset change when OffsetForLeaderEpoch inflight (#6811) It is possible for the offset of a partition to be changed while we are in the middle of validation. If the OffsetForLeaderEpoch request is in-flight and the offset changes, we need to redo the validation after it returns. We had a check for this situation previously, but it was only checking if the current leader epoch had changed. This patch fixes this and moves the validation in `SubscriptionState` where it can be protected with a lock. Additionally, this patch adds test cases for the SubscriptionState validation API. We fix a small bug handling broker downgrades. Basically we should skip validation if the latest metadata does not include leader epoch information. Reviewers: David Arthur <mum...@gmail.com> --- .../kafka/clients/consumer/KafkaConsumer.java | 4 +- .../consumer/internals/ConsumerCoordinator.java | 4 +- .../kafka/clients/consumer/internals/Fetcher.java | 55 ++--- .../consumer/internals/SubscriptionState.java | 98 ++++++--- .../clients/consumer/internals/FetcherTest.java | 56 ++++- .../consumer/internals/SubscriptionStateTest.java | 240 ++++++++++++++++++++- 6 files changed, 368 insertions(+), 89 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9c1a45c..01b8989 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1547,7 +1547,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { offset, Optional.empty(), // This will ensure we skip validation this.metadata.leaderAndEpoch(partition)); - this.subscriptions.seek(partition, newPosition); + this.subscriptions.seekUnvalidated(partition, newPosition); } finally { release(); } @@ -1583,7 +1583,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { offsetAndMetadata.leaderEpoch(), currentLeaderAndEpoch); this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata); - this.subscriptions.seekAndValidate(partition, newPosition); + this.subscriptions.seekUnvalidated(partition, newPosition); } finally { release(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index f965571..5d39da5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -511,11 +511,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator { final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(tp); final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), - new ConsumerMetadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.empty())); + leaderAndEpoch); log.info("Setting offset for partition {} to the committed offset {}", tp, position); entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch)); - this.subscriptions.seekAndValidate(tp, position); + this.subscriptions.seekUnvalidated(tp, position); } return true; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d839791..e638963 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.FetchSessionHandler; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MetadataCache; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.StaleMetadataException; @@ -466,7 +465,7 @@ public class Fetcher<K, V> implements Closeable { // Validate each partition against the current leader and epoch subscriptions.assignedPartitions().forEach(topicPartition -> { ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.leaderAndEpoch(topicPartition); - subscriptions.maybeValidatePosition(topicPartition, leaderAndEpoch); + subscriptions.maybeValidatePositionForCurrentLeader(topicPartition, leaderAndEpoch); }); // Collect positions needing validation, with backoff @@ -677,7 +676,7 @@ public class Fetcher<K, V> implements Closeable { SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition( offsetData.offset, offsetData.leaderEpoch, metadata.leaderAndEpoch(partition)); offsetData.leaderEpoch.ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(partition, epoch)); - subscriptions.maybeSeek(partition, position.offset, requestedResetStrategy); + subscriptions.maybeSeekUnvalidated(partition, position.offset, requestedResetStrategy); } private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) { @@ -735,16 +734,12 @@ public class Fetcher<K, V> implements Closeable { final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped = regroupFetchPositionsByLeader(partitionsToValidate); - regrouped.forEach((node, dataMap) -> { + regrouped.forEach((node, fetchPostitions) -> { if (node.isEmpty()) { metadata.requestUpdate(); return; } - final Map<TopicPartition, Metadata.LeaderAndEpoch> cachedLeaderAndEpochs = partitionsToValidate.entrySet() - .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().currentLeader)); - NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); if (nodeApiVersions == null) { client.tryConnect(node); @@ -754,14 +749,14 @@ public class Fetcher<K, V> implements Closeable { if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + "support the required protocol version (introduced in Kafka 2.3)", - cachedLeaderAndEpochs.keySet()); - for (TopicPartition partition : cachedLeaderAndEpochs.keySet()) { + fetchPostitions.keySet()); + for (TopicPartition partition : fetchPostitions.keySet()) { subscriptions.completeValidation(partition); } return; } - subscriptions.setNextAllowedRetry(dataMap.keySet(), time.milliseconds() + requestTimeoutMs); + subscriptions.setNextAllowedRetry(fetchPostitions.keySet(), time.milliseconds() + requestTimeoutMs); RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate); future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() { @@ -777,34 +772,12 @@ public class Fetcher<K, V> implements Closeable { // for the partition. If so, it means we have experienced log truncation and need to reposition // that partition's offset. offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { - if (!subscriptions.isAssigned(respTopicPartition)) { - log.debug("Ignoring OffsetsForLeader response for partition {} which is not currently assigned.", respTopicPartition); - return; - } - - if (subscriptions.awaitingValidation(respTopicPartition)) { - SubscriptionState.FetchPosition currentPosition = subscriptions.position(respTopicPartition); - Metadata.LeaderAndEpoch currentLeader = currentPosition.currentLeader; - if (!currentLeader.equals(cachedLeaderAndEpochs.get(respTopicPartition))) { - return; - } - - if (respEndOffset.endOffset() < currentPosition.offset) { - if (subscriptions.hasDefaultOffsetResetPolicy()) { - SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( - respEndOffset.endOffset(), Optional.of(respEndOffset.leaderEpoch()), currentLeader); - log.info("Truncation detected for partition {}, resetting offset to {}", respTopicPartition, newPosition); - subscriptions.seek(respTopicPartition, newPosition); - } else { - log.warn("Truncation detected for partition {}, but no reset policy is set", respTopicPartition); - truncationWithoutResetPolicy.put(respTopicPartition, new OffsetAndMetadata( - respEndOffset.endOffset(), Optional.of(respEndOffset.leaderEpoch()), null)); - } - } else { - // Offset is fine, clear the validation state - subscriptions.completeValidation(respTopicPartition); - } - } + SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition); + Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation( + respTopicPartition, requestPosition, respEndOffset); + divergentOffsetOpt.ifPresent(divergentOffset -> { + truncationWithoutResetPolicy.put(respTopicPartition, divergentOffset); + }); }); if (!truncationWithoutResetPolicy.isEmpty()) { @@ -814,7 +787,7 @@ public class Fetcher<K, V> implements Closeable { @Override public void onFailure(RuntimeException e) { - subscriptions.requestFailed(dataMap.keySet(), time.milliseconds() + retryBackoffMs); + subscriptions.requestFailed(fetchPostitions.keySet(), time.milliseconds() + retryBackoffMs); metadata.requestUpdate(); if (!(e instanceof RetriableException) && !cachedOffsetForLeaderException.compareAndSet(null, e)) { @@ -1084,7 +1057,7 @@ public class Fetcher<K, V> implements Closeable { // Ensure the position has an up-to-date leader subscriptions.assignedPartitions().forEach( - tp -> subscriptions.maybeValidatePosition(tp, metadata.leaderAndEpoch(tp))); + tp -> subscriptions.maybeValidatePositionForCurrentLeader(tp, metadata.leaderAndEpoch(tp))); long currentTimeMs = time.milliseconds(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 640ed55..4a29ffc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.PartitionStates; +import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.requests.IsolationLevel; import org.apache.kafka.common.utils.LogContext; import org.slf4j.Logger; @@ -50,7 +51,7 @@ import java.util.stream.Collectors; * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription). * * Once assigned, the partition is not considered "fetchable" until its initial position has - * been set with {@link #seek(TopicPartition, FetchPosition)}. Fetchable partitions track a fetch + * been set with {@link #seekValidated(TopicPartition, FetchPosition)}. Fetchable partitions track a fetch * position which is used to set the offset of the next fetch, and a consumed position * which is the last offset that has been returned to the user. You can suspend fetching * from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed @@ -315,7 +316,7 @@ public class SubscriptionState { return Collections.emptySet(); } - public Set<TopicPartition> pausedPartitions() { + public synchronized Set<TopicPartition> pausedPartitions() { return collectPartitions(TopicPartitionState::isPaused, Collectors.toSet()); } @@ -349,19 +350,19 @@ public class SubscriptionState { return this.assignment.stateValue(tp); } - public synchronized void seek(TopicPartition tp, FetchPosition position) { - assignedState(tp).seek(position); + public synchronized void seekValidated(TopicPartition tp, FetchPosition position) { + assignedState(tp).seekValidated(position); } - public synchronized void seekAndValidate(TopicPartition tp, FetchPosition position) { - assignedState(tp).seekAndValidate(position); + public void seek(TopicPartition tp, long offset) { + seekValidated(tp, new FetchPosition(offset)); } - public void seek(TopicPartition tp, long offset) { - seek(tp, new FetchPosition(offset)); + public void seekUnvalidated(TopicPartition tp, FetchPosition position) { + assignedState(tp).seekUnvalidated(position); } - synchronized void maybeSeek(TopicPartition tp, long offset, OffsetResetStrategy requestedResetStrategy) { + synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetResetStrategy requestedResetStrategy) { TopicPartitionState state = assignedStateOrNull(tp); if (state == null) { log.debug("Skipping reset of partition {} since it is no longer assigned", tp); @@ -371,7 +372,7 @@ public class SubscriptionState { log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp); } else { log.info("Resetting offset for partition {} to offset {}.", tp, offset); - state.seek(new FetchPosition(offset)); + state.seekUnvalidated(new FetchPosition(offset)); } } @@ -405,23 +406,64 @@ public class SubscriptionState { assignedState(tp).position(position); } - synchronized boolean maybeValidatePosition(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { + public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { return assignedState(tp).maybeValidatePosition(leaderAndEpoch); } - synchronized boolean awaitingValidation(TopicPartition tp) { + /** + * Attempt to complete validation with the end offset returned from the OffsetForLeaderEpoch request. + * @return The diverging offset if truncation was detected and no reset policy is defined. + */ + public synchronized Optional<OffsetAndMetadata> maybeCompleteValidation(TopicPartition tp, + FetchPosition requestPosition, + EpochEndOffset epochEndOffset) { + TopicPartitionState state = assignedStateOrNull(tp); + if (state == null) { + log.debug("Skipping completed validation for partition {} which is not currently assigned.", tp); + } else if (!state.awaitingValidation()) { + log.debug("Skipping completed validation for partition {} which is no longer expecting validation.", tp); + } else { + SubscriptionState.FetchPosition currentPosition = state.position; + if (!currentPosition.equals(requestPosition)) { + log.debug("Skipping completed validation for partition {} since the current position {} " + + "no longer matches the position {} when the request was sent", + tp, currentPosition, requestPosition); + } else if (epochEndOffset.endOffset() < currentPosition.offset) { + if (hasDefaultOffsetResetPolicy()) { + SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition( + epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()), + currentPosition.currentLeader); + log.info("Truncation detected for partition {} at offset {}, resetting offset to " + + "the first offset known to diverge {}", tp, currentPosition, newPosition); + state.seekValidated(newPosition); + } else { + log.warn("Truncation detected for partition {} at offset {} (the end offset from the " + + "broker is {}), but no reset policy is set", + tp, currentPosition, epochEndOffset); + return Optional.of(new OffsetAndMetadata(epochEndOffset.endOffset(), + Optional.of(epochEndOffset.leaderEpoch()), null)); + } + } else { + state.completeValidation(); + } + } + + return Optional.empty(); + } + + public synchronized boolean awaitingValidation(TopicPartition tp) { return assignedState(tp).awaitingValidation(); } public synchronized void completeValidation(TopicPartition tp) { - assignedState(tp).validate(); + assignedState(tp).completeValidation(); } public synchronized FetchPosition validPosition(TopicPartition tp) { return assignedState(tp).validPosition(); } - synchronized public FetchPosition position(TopicPartition tp) { + public synchronized FetchPosition position(TopicPartition tp) { return assignedState(tp).position; } @@ -531,11 +573,11 @@ public class SubscriptionState { return assignment.stream().allMatch(state -> state.value().hasValidPosition()); } - Set<TopicPartition> missingFetchPositions() { + public synchronized Set<TopicPartition> missingFetchPositions() { return collectPartitions(state -> !state.hasPosition(), Collectors.toSet()); } - private synchronized <T extends Collection<TopicPartition>> T collectPartitions(Predicate<TopicPartitionState> filter, Collector<TopicPartition, ?, T> collector) { + private <T extends Collection<TopicPartition>> T collectPartitions(Predicate<TopicPartitionState> filter, Collector<TopicPartition, ?, T> collector) { return assignment.stream() .filter(state -> filter.test(state.value())) .map(PartitionStates.PartitionState::topicPartition) @@ -560,12 +602,12 @@ public class SubscriptionState { throw new NoOffsetForPartitionException(partitionsWithNoOffsets); } - Set<TopicPartition> partitionsNeedingReset(long nowMs) { + public synchronized Set<TopicPartition> partitionsNeedingReset(long nowMs) { return collectPartitions(state -> state.awaitingReset() && !state.awaitingRetryBackoff(nowMs), Collectors.toSet()); } - Set<TopicPartition> partitionsNeedingValidation(long nowMs) { + public synchronized Set<TopicPartition> partitionsNeedingValidation(long nowMs) { return collectPartitions(state -> state.awaitingValidation() && !state.awaitingRetryBackoff(nowMs), Collectors.toSet()); } @@ -695,7 +737,7 @@ public class SubscriptionState { return false; } - if (position != null && !position.safeToFetchFrom(currentLeaderAndEpoch)) { + if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) { FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch); validatePosition(newPosition); preferredReadReplica = null; @@ -704,7 +746,7 @@ public class SubscriptionState { } private void validatePosition(FetchPosition position) { - if (position.offsetEpoch.isPresent()) { + if (position.offsetEpoch.isPresent() && position.currentLeader.epoch.isPresent()) { transitionState(FetchStates.AWAIT_VALIDATION, () -> { this.position = position; this.nextRetryTimeMs = null; @@ -721,7 +763,7 @@ public class SubscriptionState { /** * Clear the awaiting validation state and enter fetching. */ - private void validate() { + private void completeValidation() { if (hasPosition()) { transitionState(FetchStates.FETCHING, () -> { this.nextRetryTimeMs = null; @@ -761,7 +803,7 @@ public class SubscriptionState { return paused; } - private void seek(FetchPosition position) { + private void seekValidated(FetchPosition position) { transitionState(FetchStates.FETCHING, () -> { this.position = position; this.resetStrategy = null; @@ -769,8 +811,8 @@ public class SubscriptionState { }); } - private void seekAndValidate(FetchPosition fetchPosition) { - seek(fetchPosition); + private void seekUnvalidated(FetchPosition fetchPosition) { + seekValidated(fetchPosition); validatePosition(fetchPosition); } @@ -934,14 +976,6 @@ public class SubscriptionState { this.currentLeader = Objects.requireNonNull(currentLeader); } - /** - * Test if it is "safe" to fetch from a given leader and epoch. This effectively is testing if - * {@link Metadata.LeaderAndEpoch} known to the subscription is equal to the one supplied by the caller. - */ - boolean safeToFetchFrom(Metadata.LeaderAndEpoch leaderAndEpoch) { - return !currentLeader.leader.isEmpty() && currentLeader.equals(leaderAndEpoch); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 1e64e3b..44c00c4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -75,6 +75,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -1504,7 +1505,7 @@ public class FetcherTest { Object result = invocation.callRealMethod(); latchEarliestDone.countDown(); return result; - }).when(subscriptions).maybeSeek(tp0, 0L, OffsetResetStrategy.EARLIEST); + }).when(subscriptions).maybeSeekUnvalidated(tp0, 0L, OffsetResetStrategy.EARLIEST); es.submit(() -> { subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); @@ -2795,8 +2796,8 @@ public class FetcherTest { List<ConsumerRecord<byte[], byte[]>> records; assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); - subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0))); - subscriptions.seek(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); + subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.empty(), metadata.leaderAndEpoch(tp0))); + subscriptions.seekValidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.leaderAndEpoch(tp1))); // Fetch some records and establish an incremental fetch session. LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions1 = new LinkedHashMap<>(); @@ -3276,7 +3277,7 @@ public class FetcherTest { // Seek with a position and leader+epoch Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch( metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne)); - subscriptions.seekAndValidate(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch)); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch)); assertFalse(client.isConnected(node.idString())); assertTrue(subscriptions.awaitingValidation(tp0)); @@ -3325,7 +3326,7 @@ public class FetcherTest { // Seek with a position and leader+epoch Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch( metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne)); - subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); // Update metadata to epoch=2, enter validation metadata.update(TestUtils.metadataUpdateWith("dummy", 1, @@ -3337,6 +3338,45 @@ public class FetcherTest { } @Test + public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() { + buildFetcher(); + assignFromUser(singleton(tp0)); + + Map<String, Integer> partitionCounts = new HashMap<>(); + partitionCounts.put(tp0.topic(), 4); + + final int epochOne = 1; + + metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> epochOne), 0L); + + // Offset validation requires OffsetForLeaderEpoch request v3 or higher + Node node = metadata.fetch().nodes().get(0); + apiVersions.update(node.idString(), NodeApiVersions.create()); + + Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne)); + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); + + fetcher.validateOffsetsIfNeeded(); + consumerClient.poll(time.timer(Duration.ZERO)); + assertTrue(subscriptions.awaitingValidation(tp0)); + assertTrue(client.hasInFlightRequests()); + + // While the OffsetForLeaderEpoch request is in-flight, we seek to a different offset. + subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(5, Optional.of(epochOne), leaderAndEpoch)); + assertTrue(subscriptions.awaitingValidation(tp0)); + + client.respond(request -> { + OffsetsForLeaderEpochRequest epochRequest = (OffsetsForLeaderEpochRequest) request; + OffsetsForLeaderEpochRequest.PartitionData partitionData = epochRequest.epochsByTopicPartition().get(tp0); + return partitionData.currentLeaderEpoch.equals(Optional.of(epochOne)) && partitionData.leaderEpoch == epochOne; + }, new OffsetsForLeaderEpochResponse(singletonMap(tp0, new EpochEndOffset(0, 0L)))); + consumerClient.poll(time.timer(Duration.ZERO)); + + // The response should be ignored since we were validating a different position. + assertTrue(subscriptions.awaitingValidation(tp0)); + } + + @Test public void testOffsetValidationFencing() { buildFetcher(); assignFromUser(singleton(tp0)); @@ -3357,7 +3397,7 @@ public class FetcherTest { // Seek with a position and leader+epoch Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne)); - subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); + subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch)); // Update metadata to epoch=2, enter validation metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> epochTwo), 0L); @@ -3371,7 +3411,7 @@ public class FetcherTest { Optional.of(epochTwo), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochTwo))); subscriptions.position(tp0, nextPosition); - subscriptions.maybeValidatePosition(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree))); + subscriptions.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(epochThree))); // Prepare offset list response from async validation with epoch=2 Map<TopicPartition, EpochEndOffset> endOffsetMap = new HashMap<>(); @@ -3427,7 +3467,7 @@ public class FetcherTest { // Seek Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1)); - subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch)); + subscriptions.seekValidated(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch)); // Check for truncation, this should cause tp0 to go into validation fetcher.validateOffsetsIfNeeded(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 528c2b9..484b9de 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -18,9 +18,11 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.EpochEndOffset; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; @@ -42,9 +44,7 @@ import static org.junit.Assert.assertTrue; public class SubscriptionStateTest { - private final SubscriptionState state = new SubscriptionState( - new LogContext(), - OffsetResetStrategy.EARLIEST); + private SubscriptionState state = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); private final String topic = "test"; private final String topic1 = "test1"; private final TopicPartition tp0 = new TopicPartition(topic, 0); @@ -340,13 +340,245 @@ public class SubscriptionStateTest { assertFalse(state.preferredReadReplica(tp0, 31L).isPresent()); } + @Test + public void testSeekUnvalidatedWithNoOffsetEpoch() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + // Seek with no offset epoch requires no validation no matter what the current leader is + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), + new Metadata.LeaderAndEpoch(broker1, Optional.of(5)))); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + + assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.empty()))); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + + assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(10)))); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + } + + @Test + public void testSeekUnvalidatedWithNoEpochClearsAwaitingValidation() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + // Seek with no offset epoch requires no validation no matter what the current leader is + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), + new Metadata.LeaderAndEpoch(broker1, Optional.of(5)))); + assertFalse(state.hasValidPosition(tp0)); + assertTrue(state.awaitingValidation(tp0)); + + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), + new Metadata.LeaderAndEpoch(broker1, Optional.of(5)))); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + } + + @Test + public void testSeekUnvalidatedWithOffsetEpoch() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(0L, Optional.of(2), + new Metadata.LeaderAndEpoch(broker1, Optional.of(5)))); + assertFalse(state.hasValidPosition(tp0)); + assertTrue(state.awaitingValidation(tp0)); + + // Update using the current leader and epoch + assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(5)))); + assertFalse(state.hasValidPosition(tp0)); + assertTrue(state.awaitingValidation(tp0)); + + // Update with a newer leader and epoch + assertTrue(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.of(15)))); + assertFalse(state.hasValidPosition(tp0)); + assertTrue(state.awaitingValidation(tp0)); + + // If the updated leader has no epoch information, then skip validation and begin fetching + assertFalse(state.maybeValidatePositionForCurrentLeader(tp0, new Metadata.LeaderAndEpoch(broker1, Optional.empty()))); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + } + + @Test + public void testSeekValidatedShouldClearAwaitingValidation() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), + new Metadata.LeaderAndEpoch(broker1, Optional.of(10)))); + assertFalse(state.hasValidPosition(tp0)); + assertTrue(state.awaitingValidation(tp0)); + assertEquals(10L, state.position(tp0).offset); + + state.seekValidated(tp0, new SubscriptionState.FetchPosition(8L, Optional.of(4), + new Metadata.LeaderAndEpoch(broker1, Optional.of(10)))); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + assertEquals(8L, state.position(tp0).offset); + } + + @Test + public void testCompleteValidationShouldClearAwaitingValidation() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), + new Metadata.LeaderAndEpoch(broker1, Optional.of(10)))); + assertFalse(state.hasValidPosition(tp0)); + assertTrue(state.awaitingValidation(tp0)); + assertEquals(10L, state.position(tp0).offset); + + state.completeValidation(tp0); + assertTrue(state.hasValidPosition(tp0)); + assertFalse(state.awaitingValidation(tp0)); + assertEquals(10L, state.position(tp0).offset); + } + + @Test + public void testOffsetResetWhileAwaitingValidation() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L, Optional.of(5), + new Metadata.LeaderAndEpoch(broker1, Optional.of(10)))); + assertTrue(state.awaitingValidation(tp0)); + + state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST); + assertFalse(state.awaitingValidation(tp0)); + assertTrue(state.isOffsetResetNeeded(tp0)); + } + + @Test + public void testMaybeCompleteValidation() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(initialOffsetEpoch, initialOffset + 5)); + assertEquals(Optional.empty(), divergentOffsetMetadataOpt); + assertFalse(state.awaitingValidation(tp0)); + assertEquals(initialPosition, state.position(tp0)); + } + + @Test + public void testMaybeCompleteValidationAfterPositionChange() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + long updateOffset = 20L; + int updateOffsetEpoch = 8; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + SubscriptionState.FetchPosition updatePosition = new SubscriptionState.FetchPosition(updateOffset, + Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, updatePosition); + + Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(initialOffsetEpoch, initialOffset + 5)); + assertEquals(Optional.empty(), divergentOffsetMetadataOpt); + assertTrue(state.awaitingValidation(tp0)); + assertEquals(updatePosition, state.position(tp0)); + } + + @Test + public void testMaybeCompleteValidationAfterOffsetReset() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + state.requestOffsetReset(tp0); + + Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(initialOffsetEpoch, initialOffset + 5)); + assertEquals(Optional.empty(), divergentOffsetMetadataOpt); + assertFalse(state.awaitingValidation(tp0)); + assertTrue(state.isOffsetResetNeeded(tp0)); + } + + @Test + public void testTruncationDetectionWithResetPolicy() { + Node broker1 = new Node(1, "localhost", 9092); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + long divergentOffset = 5L; + int divergentOffsetEpoch = 7; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + Optional<OffsetAndMetadata> divergentOffsetMetadata = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(divergentOffsetEpoch, divergentOffset)); + assertEquals(Optional.empty(), divergentOffsetMetadata); + assertFalse(state.awaitingValidation(tp0)); + + SubscriptionState.FetchPosition updatedPosition = new SubscriptionState.FetchPosition(divergentOffset, + Optional.of(divergentOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + assertEquals(updatedPosition, state.position(tp0)); + } + + @Test + public void testTruncationDetectionWithoutResetPolicy() { + Node broker1 = new Node(1, "localhost", 9092); + state = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); + state.assignFromUser(Collections.singleton(tp0)); + + int currentEpoch = 10; + long initialOffset = 10L; + int initialOffsetEpoch = 5; + long divergentOffset = 5L; + int divergentOffsetEpoch = 7; + + SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, + Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(broker1, Optional.of(currentEpoch))); + state.seekUnvalidated(tp0, initialPosition); + assertTrue(state.awaitingValidation(tp0)); + + Optional<OffsetAndMetadata> divergentOffsetMetadata = state.maybeCompleteValidation(tp0, initialPosition, + new EpochEndOffset(divergentOffsetEpoch, divergentOffset)); + assertEquals(Optional.of(new OffsetAndMetadata(divergentOffset, Optional.of(divergentOffsetEpoch), "")), + divergentOffsetMetadata); + assertTrue(state.awaitingValidation(tp0)); + } + private static class MockRebalanceListener implements ConsumerRebalanceListener { public Collection<TopicPartition> revoked; public Collection<TopicPartition> assigned; public int revokedCount = 0; public int assignedCount = 0; - @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { this.assigned = partitions;