[
https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028599#comment-18028599
]
Uladzislau Blok commented on KAFKA-19430:
-----------------------------------------
Hello. I did more tests locally and it looks to me as this exception _could_
sometimes be retryable:
# There is corruption on the disk. In this case partition will fail with
exception when trying to read log fail and return error response to consumer
# There is network issue when transfer message over internet. for more details
you can check this https://issues.apache.org/jira/browse/KAFKA-19613
I have created KIP to expose TP and offset as part of this exception, but I
still see few gaps. If we'll just skip corrupted part we can lose the messages
between current offset and corrupted part, I believe this is not good
I can see new approach for to fix it (inspired by rust). How [~mjsax] said
DeserializationExceptionHandler is the read-path handler and we can try to
catch the exception when pooling the message and then propagate it to current
handler.
Now:
{code:java}
ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
try {
records = mainConsumer.poll(pollTime);
} catch (final InvalidOffsetException e) {
resetOffsets(e.partitions(), e);
}
return records;
{code}
Proposal:
{code:java}
Result<ConsumerRecords<byte[], byte[]>, Exception> result =
Result.of(ConsumerRecords.empty());
try {
result = result.of(mainConsumer.poll(pollTime));
} catch (final InvalidOffsetException e) {
resetOffsets(e.partitions(), e);
} catch (final Exception e) {
result = result.of(e);
}
return result; {code}
This way we can catch either records or exception and 'unpack' the result
inside of exception handler.
[~mjsax] [~lianetm] Any thoughts?
> Don't fail on RecordCorruptedException
> --------------------------------------
>
> Key: KAFKA-19430
> URL: https://issues.apache.org/jira/browse/KAFKA-19430
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: Uladzislau Blok
> Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/524]
> Currently, the existing `DeserializationExceptionHandler` is applied when
> de-serializing the record key/value byte[] inside Kafka Streams. This implies
> that a `RecordCorruptedException` is not handled.
> We should explore to not let Kafka Streams crash, but maybe retry this error
> automatically (as `RecordCorruptedException extends RetriableException`), and
> find a way to pump the error into the existing exception handler.
> If the error is transient, user can still use `REPLACE_THREAD` in the
> uncaught exception handler, but this is a rather heavy weight approach.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)