[
https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kuan Po Tseng reassigned KAFKA-19051:
-------------------------------------
Assignee: Kuan Po Tseng
> Fix implicit acknowledgement cannot be overridden when
> RecordDeserializationException occurs
> --------------------------------------------------------------------------------------------
>
> Key: KAFKA-19051
> URL: https://issues.apache.org/jira/browse/KAFKA-19051
> Project: Kafka
> Issue Type: Sub-task
> Components: consumer
> Reporter: Frédérik ROULEAU
> Assignee: Kuan Po Tseng
> Priority: Major
>
> When a record generates a RecordDeserializationException, KIP mentioned that
> with explicit acknowledgement the default Release can be overridden.
> When tried, I have:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: The record cannot
> be acknowledged.
> at
> org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
> at
> org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
> at
> org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
> at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
> It looks like the record was already released.
> Code used:
> {code:java}
> //....
> } catch (RecordDeserializationException re) {
> long offset = re.offset();
> Throwable t = re.getCause();
> LOGGER.error("Failed to deserialize record at partition={} offset={}",
> re.topicPartition().partition(), offset, t);
> ConsumerRecord<String,String> record = new
> ConsumerRecord<>(re.topicPartition().topic(),
> re.topicPartition().partition(), offset, "", "");
> consumer.acknowledge(record, AcknowledgeType.REJECT);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)