Frédérik ROULEAU created KAFKA-19051:
----------------------------------------
Summary: Fix implicit acknowledgement cannot be override when
RecordDeserializationException occurs
Key: KAFKA-19051
URL: https://issues.apache.org/jira/browse/KAFKA-19051
Project: Kafka
Issue Type: Sub-task
Components: consumer
Reporter: Frédérik ROULEAU
When a record generates a RecordDeserializationException, KIP mentioned that
with explicit acknowledgement the default Release can be overridden.
When tried, I have:
{code:java}
Exception in thread "main" java.lang.IllegalStateException: The record cannot
be acknowledged.
at
org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
at
org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
at
org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
It looks like the record was already released.
Code used:
{code:java}
//....
} catch (RecordDeserializationException re) {
long offset = re.offset();
Throwable t = re.getCause();
LOGGER.error("Failed to deserialize record at partition={} offset={}",
re.topicPartition().partition(), offset, t);
ConsumerRecord<String,String> record = new
ConsumerRecord<>(re.topicPartition().topic(), re.topicPartition().partition(),
offset, "", "");
consumer.acknowledge(record, AcknowledgeType.REJECT);
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)