[ https://issues.apache.org/jira/browse/KAFKA-19471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18004133#comment-18004133 ]
Lan Ding commented on KAFKA-19471: ---------------------------------- During the process of fixing this issue, I seem to have discovered another issue. When parseRecord fails and throws a {{{}SerializationException{}}}, the following code executes: {code:java} inFlightBatch.addAcknowledgement(lastRecord.offset(), AcknowledgeType.RELEASE); inFlightBatch.setException(cachedRecordException); {code} In the outer code, the exception is not caught and is directly thrown: {code:java} if (batch.getException() != null) { throw batch.getException(); } {code} This ultimately causes the code currentFetch = fetch; in ShareConsumerImpl#poll() not to be executed. Consequently, the Acknowledgement information is not saved. When subsequent acknowledgments (acks) are attempted, this specific record will not be released, remaining locked until the lock timeout period is reached. > Enable acknowledgement for a record which could not be deserialized > ------------------------------------------------------------------- > > Key: KAFKA-19471 > URL: https://issues.apache.org/jira/browse/KAFKA-19471 > Project: Kafka > Issue Type: Sub-task > Reporter: Andrew Schofield > Assignee: Lan Ding > Priority: Major > Fix For: 4.2.0 > > > If a record fetched by a share consumer fails to be deserialized, the KIP > states that it is automatically released and that the application cannot > override this behavior. Actually, experience with KafkaShareConsumer shows > that it would be helpful to be able to override this to REJECT such records > instead. > We can add an override `KafkaShareConsumer.acknowledge(String topic, int > partition, long offset, AcknowledgeType type)` for this purpose where the > user does not have a `ConsumerRecord` instance available, but they do know > the topic, partition and offset from the DeserializationException. The > validation of this information is exactly the same as for the same > information when it is implied by the `ConsumerRecord`. -- This message was sent by Atlassian Jira (v8.20.10#820010)