[ 
https://issues.apache.org/jira/browse/KAFKA-10123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10123:
------------------------------------
    Description: 
We saw this error in system tests:
{code}
java.lang.NullPointerException
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at 
kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
{code}

The logs showed that the consumer was in the middle of an offset reset when 
this happened. We changed the validation logic in KAFKA-9724 to include the 
following check with the intent of skipping validation for old brokers:
{code}
            NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
            if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
            } else {
                // If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
                completeValidation(tp);
                return false;
            }
{code}

The problem seems to be the shortcut call to `completeValidation`, which 
executes the following logic:
{code}
            if (hasPosition()) {
                transitionState(FetchStates.FETCHING, () -> 
this.nextRetryTimeMs = null);
            }
{code}

We should be protected by the call to `hasPosition` here, but in the case of 
the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to 
enter the `FETCHING` state without a position, which ultimately leads to the 
NPE.

  was:
We saw this error in system tests:
{code}
java.lang.NullPointerException
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at 
kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
{code}

The logs showed that the consumer was in the middle of an offset reset when 
this happened. We changed the logic in KAFKA-9724 to include the following 
check:
{code}
            NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
            if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
            } else {
                // If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
                completeValidation(tp);
                return false;
            }
{code}

The problem seems to be the shortcut call to `completeValidation`, which 
executes the following logic:
{code}
            if (hasPosition()) {
                transitionState(FetchStates.FETCHING, () -> 
this.nextRetryTimeMs = null);
            }
{code}

We should be protected by the call to `hasPosition` here, but in the case of 
the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to 
enter the `FETCHING` state without a position, which ultimately leads to the 
NPE.


> Regression resetting offsets in consumer when fetching from old broker
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-10123
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10123
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: David Arthur
>            Priority: Blocker
>             Fix For: 2.6.0
>
>
> We saw this error in system tests:
> {code}
> java.lang.NullPointerException
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.prepareFetchRequests(Fetcher.java:1111)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.sendFetches(Fetcher.java:246)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:437)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}
> The logs showed that the consumer was in the middle of an offset reset when 
> this happened. We changed the validation logic in KAFKA-9724 to include the 
> following check with the intent of skipping validation for old brokers:
> {code}
>             NodeApiVersions nodeApiVersions = 
> apiVersions.get(leaderAndEpoch.leader.get().idString());
>             if (nodeApiVersions == null || 
> hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
>                 return 
> assignedState(tp).maybeValidatePosition(leaderAndEpoch);
>             } else {
>                 // If the broker does not support a newer version of 
> OffsetsForLeaderEpoch, we skip validation
>                 completeValidation(tp);
>                 return false;
>             }
> {code}
> The problem seems to be the shortcut call to `completeValidation`, which 
> executes the following logic:
> {code}
>             if (hasPosition()) {
>                 transitionState(FetchStates.FETCHING, () -> 
> this.nextRetryTimeMs = null);
>             }
> {code}
> We should be protected by the call to `hasPosition` here, but in the case of 
> the `AWAIT_RESET` state, we are incorrectly returning true. This causes us to 
> enter the `FETCHING` state without a position, which ultimately leads to the 
> NPE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to