[
https://issues.apache.org/jira/browse/SAMZA-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13976214#comment-13976214
]
Yan Fang commented on SAMZA-64:
-------------------------------
Cool.
Attached the patch.
RB: https://reviews.apache.org/r/20542/
1. Added InvalidMessageException, UnknownTopicOrPartitionException and
InvalidMessageSizeException in (exception, loop) function to fail the Samza
job. All of them throw KafkaCheckpointException. The reason I did not use the
exception code is that: if I used the exception code, I had to put all the
throw code in different parts of the retryBackoff method, messing up the code
and giving difficulties to unit test.
2. In unit test, inject serde to throw Kafka exceptions. When serde.fromBytes
is called, exceptions are thrown.
> Fail KafkaCheckpointManager on unrecoverable errors
> ---------------------------------------------------
>
> Key: SAMZA-64
> URL: https://issues.apache.org/jira/browse/SAMZA-64
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Attachments: SAMZA-64.patch
>
>
> KafkaCheckpointManager currently handles all errors from Kafka by waiting,
> and retrying. These error codes are located in Kafka's ErrorMapping class:
> {noformat}
> val UnknownCode : Short = -1
> val NoError : Short = 0
> val OffsetOutOfRangeCode : Short = 1
> val InvalidMessageCode : Short = 2
> val UnknownTopicOrPartitionCode : Short = 3
> val InvalidFetchSizeCode : Short = 4
> val LeaderNotAvailableCode : Short = 5
> val NotLeaderForPartitionCode : Short = 6
> val RequestTimedOutCode: Short = 7
> val BrokerNotAvailableCode: Short = 8
> val ReplicaNotAvailableCode: Short = 9
> val MessageSizeTooLargeCode: Short = 10
> val StaleControllerEpochCode: Short = 11
> {noformat}
> There was some discussion on SAMZA-62's RB about potentially not retrying on
> ALL errors. Some are not recoverable (e.g. InvalidMessageCode).
--
This message was sent by Atlassian JIRA
(v6.2#6252)