Lianet Magrans created KAFKA-16777:
--------------------------------------
Summary: New consumer should throw NoOffsetForPartitionException
on continuous poll zero if no reset strategy
Key: KAFKA-16777
URL: https://issues.apache.org/jira/browse/KAFKA-16777
Project: Kafka
Issue Type: Bug
Components: consumer
Reporter: Lianet Magrans
If the consumer does not define an offset reset strategy, a call to poll should
fail with NoOffsetForPartitionException. That works as expected on the new
consumer when polling with a timeout > 0 (existing integration test
[here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
but fails when polling continuously with ZERO timeout.
This can be easily reproduced with a new integration test like this (passes for
the legacy consumer but fails for the new consumer). We should add it as part
of the fix, for better coverage:
{code:java}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testNoOffsetForPartitionExceptionOnPollZero(quorum: String,
groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"none")
val consumer = createConsumer(configOverrides = this.consumerConfig)
consumer.assign(List(tp).asJava)
// continuous poll should eventually fail because there is no offset reset
strategy set (fail only when resetting positions after coordinator is known)
TestUtils.tryUntilNoAssertionError() {
assertThrows(classOf[NoOffsetForPartitionException], () =>
consumer.poll(Duration.ZERO))
}
}
{code}
Also this is covered in the unit test
[KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
that is currently enabled only for the LegacyConsumer. After fixing this issue
we should be able to enable it for the new consumer too.
The issue seems to be around calling poll with ZERO timeout, that even when
continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so
the updateFetchPositions never makes it to
[resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
where the exception is thrown.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)