[ 
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16777:
------------------------------
    Fix Version/s: 3.8.0

> New consumer should throw NoOffsetForPartitionException on continuous poll 
> zero if no reset strategy
> ----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16777
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16777
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Lianet Magrans
>            Priority: Major
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>
> If the consumer does not define an offset reset strategy, a call to poll 
> should fail with NoOffsetForPartitionException. That works as expected on the 
> new consumer when polling with a timeout > 0 (existing integration test 
> [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
>  but fails when polling continuously with ZERO timeout.
> This can be easily reproduced with a new integration test like this (passes 
> for the legacy consumer but fails for the new consumer). We should add it as 
> part of the fix, for better coverage:
> {code:java}
>   @ParameterizedTest(name = 
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
>   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
>   def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, 
> groupProtocol: String): Unit = {
>     this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> "none")
>     val consumer = createConsumer(configOverrides = this.consumerConfig)
>     consumer.assign(List(tp).asJava)
>     // continuous poll should eventually fail because there is no offset 
> reset strategy set (fail only when resetting positions after coordinator is 
> known)
>     TestUtils.tryUntilNoAssertionError() {
>       assertThrows(classOf[NoOffsetForPartitionException], () => 
> consumer.poll(Duration.ZERO))
>     }
>   }
> {code}
> Also this is covered in the unit test 
> [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
>  that is currently enabled only for the LegacyConsumer. After fixing this 
> issue we should be able to enable it for the new consumer too.
> The issue seems to be around calling poll with ZERO timeout, that even when 
> called continuously, the consumer is not able to 
> initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it 
> to 
> [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
>  where the exception is thrown.
>  
> There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, 
> but filing this one to provide more context and point out the test failures 
> and suggested new tests,. All fail even with the current patch in KAFKA-16637 
> so needs investigation. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to