[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lianet Magrans updated KAFKA-16777: ----------------------------------- Priority: Blocker (was: Major) > New consumer should throw NoOffsetForPartitionException on continuous poll > zero if no reset strategy > ---------------------------------------------------------------------------------------------------- > > Key: KAFKA-16777 > URL: https://issues.apache.org/jira/browse/KAFKA-16777 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Reporter: Lianet Magrans > Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If the consumer does not define an offset reset strategy, a call to poll > should fail with NoOffsetForPartitionException. That works as expected on the > new consumer when polling with a timeout > 0 (existing integration test > [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), > but fails when polling continuously with ZERO timeout. > This can be easily reproduced with a new integration test like this (passes > for the legacy consumer but fails for the new consumer). We should add it as > part of the fix, for better coverage: > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, > groupProtocol: String): Unit = { > this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "none") > val consumer = createConsumer(configOverrides = this.consumerConfig) > consumer.assign(List(tp).asJava) > // continuous poll should eventually fail because there is no offset > reset strategy set (fail only when resetting positions after coordinator is > known) > TestUtils.tryUntilNoAssertionError() { > assertThrows(classOf[NoOffsetForPartitionException], () => > consumer.poll(Duration.ZERO)) > } > } > {code} > Also this is covered in the unit test > [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], > that is currently enabled only for the LegacyConsumer. After fixing this > issue we should be able to enable it for the new consumer too. > The issue seems to be around calling poll with ZERO timeout, that even when > called continuously, the consumer is not able to > initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it > to > [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], > where the exception is thrown. > > There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, > but filing this one to provide more context and point out the test failures > and suggested new tests,. All fail even with the current patch in KAFKA-16637 > so needs investigation. -- This message was sent by Atlassian Jira (v8.20.10#820010)