[ 
https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241992#comment-17241992
 ] 

Guozhang Wang commented on KAFKA-10688:
---------------------------------------

Had some more discussions with [~cadonna] about different scenarios, and I 
think we can potentially enlarge the scope of this ticket to include all the 
following cases:

1) When starting the application for the first time, the repartition is newly 
created. In this case we should set the starting offset on the repartition 
topics according to the global reset policy.

2) When restarting the application, where the repartition topic already exist 
and may have some data. In this case we would try to read the committed offset 
and start from there.
    2.a) If the committed offset is already out of the range --- i.e. a 
truncation happens before restarting the application --- we should treat it as 
a fatal error.
    2.b) if there is no committed offset, indicating that either the 
application was not gracefully shutdown before (since otherwise the committed 
offset should be found), or the committed offset is somehow lost. We should 
treat it as a fatal error.

3) During normal processing, suddenly the consumer found itself out of the 
range --- i.e. a truncation happens at the same time --- we should treat it as 
a fatal error.

The challenge today is that we cannot easily distinguish case 1) from case 2) 
and 3), since the consumer would throw the same invalid offset exception and 
Streams would handle it universally. Instead of relying on consumer to improve 
(KAFKA-3370), we can do it at the Streams layer only, as the following:

* Whenever we create the repartition topic, we commit an offset as 0 regardless 
to the global offset reset policy, since in either earliest or latest it should 
just be 0.
* Whenever we get an invalid offset exception (note we still keep the 
consumer's configuration as `none`), we check if it is from the repartition 
topic, if yes we always treat it as fatal error; if not we use the reset policy 
on the corresponding source topic accordingly.

> Handle accidental truncation of repartition topics as exceptional failure
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-10688
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10688
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> Today we always handle InvalidOffsetException from the main consumer by the 
> resetting policy assuming they are for source topics. But repartition topics 
> are also source topics and should never be truncated and hence cause 
> InvalidOffsetException.
> We should differentiate these repartition topics from external source topics 
> and treat the InvalidOffsetException from repartition topics as fatal and 
> close the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to