[ https://issues.apache.org/jira/browse/KAFKA-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson updated KAFKA-10123: ------------------------------------ Description: We saw this error in system tests: {code} java.lang.NullPointerException at org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111) at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) {code} The logs showed that the consumer was in the middle of an offset reset when this happened. We changed the validation logic in KAFKA-9724 to include the following check with the intent of skipping validation for old brokers: {code} NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString()); if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { return assignedState(tp).maybeValidatePosition(leaderAndEpoch); } else { // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation completeValidation(tp); return false; } {code} The problem seems to be the shortcut call to `completeValidation`, which executes the following logic: {code} if (hasPosition()) { transitionState(FetchStates.FETCHING, () -> this.nextRetryTimeMs = null); } {code} We should be protected by the call to `hasPosition` here, but in the case of the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to enter the `FETCHING` state without a position, which ultimately leads to the NPE. was: We saw this error in system tests: {code} java.lang.NullPointerException at org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111) at org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437) at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) {code} The logs showed that the consumer was in the middle of an offset reset when this happened. We changed the logic in KAFKA-9724 to include the following check: {code} NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString()); if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { return assignedState(tp).maybeValidatePosition(leaderAndEpoch); } else { // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation completeValidation(tp); return false; } {code} The problem seems to be the shortcut call to `completeValidation`, which executes the following logic: {code} if (hasPosition()) { transitionState(FetchStates.FETCHING, () -> this.nextRetryTimeMs = null); } {code} We should be protected by the call to `hasPosition` here, but in the case of the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to enter the `FETCHING` state without a position, which ultimately leads to the NPE. > Regression resetting offsets in consumer when fetching from old broker > ---------------------------------------------------------------------- > > Key: KAFKA-10123 > URL: https://issues.apache.org/jira/browse/KAFKA-10123 > Project: Kafka > Issue Type: Bug > Reporter: Jason Gustafson > Assignee: David Arthur > Priority: Blocker > Fix For: 2.6.0 > > > We saw this error in system tests: > {code} > java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111) > at > org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > {code} > The logs showed that the consumer was in the middle of an offset reset when > this happened. We changed the validation logic in KAFKA-9724 to include the > following check with the intent of skipping validation for old brokers: > {code} > NodeApiVersions nodeApiVersions = > apiVersions.get(leaderAndEpoch.leader.get().idString()); > if (nodeApiVersions == null || > hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { > return > assignedState(tp).maybeValidatePosition(leaderAndEpoch); > } else { > // If the broker does not support a newer version of > OffsetsForLeaderEpoch, we skip validation > completeValidation(tp); > return false; > } > {code} > The problem seems to be the shortcut call to `completeValidation`, which > executes the following logic: > {code} > if (hasPosition()) { > transitionState(FetchStates.FETCHING, () -> > this.nextRetryTimeMs = null); > } > {code} > We should be protected by the call to `hasPosition` here, but in the case of > the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to > enter the `FETCHING` state without a position, which ultimately leads to the > NPE. -- This message was sent by Atlassian Jira (v8.3.4#803005)