hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441680618



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -782,6 +787,13 @@ private void reset(OffsetResetStrategy strategy) {
             });
         }
 
+        /**
+         * Check if the position exists and needs to be validated. If so, 
enter the AWAIT_VALIDATION state. This method
+         * also will update the position with the current leader and epoch.
+         *
+         * @param currentLeaderAndEpoch leader and epoch to compare the offset 
with
+         * @return

Review comment:
       nit: document return 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -783,7 +783,7 @@ public boolean rejoinNeededOrPending() {
      * @return true iff the operation completed within the timeout
      */
     public boolean refreshCommittedOffsetsIfNeeded(Timer timer) {
-        final Set<TopicPartition> missingFetchPositions = 
subscriptions.missingFetchPositions();
+        final Set<TopicPartition> missingFetchPositions = 
subscriptions.initializingPartitions();

Review comment:
       nit: rename variable as well?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're 
fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for 
partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, 
fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, 
"error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response 
in offset fetch");

Review comment:
       Hmm, if the position is null, we raise out of range?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
##########
@@ -673,4 +673,37 @@ public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
 
     }
 
+    @Test
+    public void resetOffsetNoValidation() {
+        // Check that offset reset works when we can't validate offsets (older 
brokers)
+
+        Node broker1 = new Node(1, "localhost", 9092);
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Reset offsets
+        state.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+
+        // Attempt to validate with older API version, should do nothing
+        ApiVersions oldApis = new ApiVersions();
+        oldApis.update("1", 
NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 
2));
+        assertFalse(state.maybeValidatePositionForCurrentLeader(oldApis, tp0, 
new Metadata.LeaderAndEpoch(
+                Optional.of(broker1), Optional.empty())));
+        assertFalse(state.hasValidPosition(tp0));
+        assertFalse(state.awaitingValidation(tp0));
+        assertTrue(state.isOffsetResetNeeded(tp0));
+
+        // Complete the reset via unvalidated seek
+        state.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(10L));

Review comment:
       This test would might be more interesting if we did a seek which 
required validation. Could we provide an epoch in the fetch position? Maybe 
both cases should be covered?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -924,10 +951,19 @@ default FetchState transitionTo(FetchState newState) {
             }
         }
 
+        /**
+         * Return the valid states which this state can transition to
+         */
         Collection<FetchState> validTransitions();
 
-        boolean hasPosition();
+        /**
+         * Test if this state has a position

Review comment:
       nit: fix doc

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -799,6 +806,21 @@ private boolean 
maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
 
+        /**
+         * For older versions of the API, we cannot perform offset validation 
so we simply transition directly to FETCHING
+         *
+         * @param currentLeaderAndEpoch
+         */
+        private void updatePositionLeaderNoValidation(Metadata.LeaderAndEpoch 
currentLeaderAndEpoch) {
+            if (position != null && 
!position.currentLeader.equals(currentLeaderAndEpoch)) {
+                FetchPosition newPosition = new FetchPosition(position.offset, 
position.offsetEpoch, currentLeaderAndEpoch);

Review comment:
       +1. Moving this into the transition function below seems reasonable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to