[ 
https://issues.apache.org/jira/browse/KAFKA-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743252#comment-16743252
 ] 

John Roesler commented on KAFKA-7480:
-------------------------------------

Hi [~mjsax], just to confirm the scope of work here, after reading the other 
ticket, it sounds like we just need to set the policy to "none", catch the 
InvalidOffsetException, and in that case, seek to the beginning and rebuild the 
store accordingly.

But your latter comment confused me. What could be a more resilient 
configuration than starting from the beginning and rebuilding the store? From 
my knowledge of the InvalidOffsetException, it happens when a state store 
doesn't consume its input topic for a long time (eg., the task was assigned to 
a different node for a while and then migrates back, so the store hasn't kept 
up), and its "current offset" has already been deleted by the retention policy.

I suppose that if we know this has happened, then we could actually keep the 
store, but seek to the beginning (verifying that the beginning offset is larger 
than our invalid "current" offset) and consume from there. If the topic is 
compacted, then anything that needs to be updated will get updated in course, 
and anything that needs to stay the same will idempotently remain the same. The 
only problem would be anything that needs to be tombstoned, but I think we keep 
the tombstones in the compacted topic for a much longer time for this reason.

Was this what you had in mind?

It seems like we have several choices for scope:

A: smaller: just make the global store behave like a local store and rebuild 
from the beginning when it gets the exception

B: medium: consider optimizing by only discarding and rebuilding the state for 
the partition that gets the exception. Eg., we could keep each partition in a 
separate column family (CF) and employ a localized CF drop-and-restore 
strategy. This would depend on knowing the partition function so we can route 
queries both from Streams operators and from IQ.

C: medium: try to keep the existing store, and still rebuild on top of it. This 
*should* be fine, but requires some detail-oriented analysis to make sure we 
won't produce corrupted state. This same strategy could apply to local stores 
as well, and in fact is orthogonal to B. (We could do both B and C).

> GlobalThread should honor custom auto.offset.reset policy
> ---------------------------------------------------------
>
>                 Key: KAFKA-7480
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7480
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>              Labels: needs-kip
>
> With KAFKA-6121 we improved Kafka Streams resilience and correctness with 
> regard to consumer auto.offset.reset and state cleanup.
> Back than, we decided to let GlobalStreamThread die and not handle 
> InvalidOffsetException during regular processing, because this error 
> indicates a fatal issue and the user should be notified about it. However, as 
> reported on the user mailing list, the only thing a user can do is, to 
> restart the application (and investigate the root cause). During restart, the 
> state will be cleaned up and bootstrapped correctly.
> Thus, we might want to allow users to specify a more resilient configuration 
> for this case and log an ERROR message if the error occurs. To ensure 
> consistency, we might not allow to set reset policy "latest" though (need 
> discussion)? By default, we can still keep "none" and fail.
> Note: `Topology.addGlobalStore` does not allow to set a reset policy. Thus, 
> this might require a KIP to extend `Topology.addGlobalStore` accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to