hachikuji commented on a change in pull request #8841:
URL: https://github.com/apache/kafka/pull/8841#discussion_r441776235



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1281,11 +1291,12 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
                 Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);
                 if (!clearedReplicaId.isPresent()) {
                     // If there's no preferred replica to clear, we're 
fetching from the leader so handle this error normally
-                    if (fetchOffset != subscriptions.position(tp).offset) {
+                    FetchPosition position = subscriptions.position(tp);
+                    if (position != null && fetchOffset != position.offset) {
                         log.debug("Discarding stale fetch response for 
partition {} since the fetched offset {} " +
-                                "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
+                                "does not match the current offset {}", tp, 
fetchOffset, position);
                     } else {
-                        handleOffsetOutOfRange(subscriptions.position(tp), tp, 
"error response in offset fetch");
+                        handleOffsetOutOfRange(position, tp, "error response 
in offset fetch");

Review comment:
       Since we have the check for `hasValidPosition` at the start of this 
method, we _could_ raise an exception. However, in the success case, we 
currently just ignore the response if the position is null. I'm ok with either 
option.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to