[ https://issues.apache.org/jira/browse/KAFKA-13350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440760#comment-17440760 ]
Guozhang Wang commented on KAFKA-13350: --------------------------------------- The broker would return on the first met `OffsetOutOfRangeException` when fetching records, and hence would not fetch the other partitions and return error early. You are right that the next request that excluded the previous `OffsetOutOfRangeException` partitions would hit the second partition and return error again. So our proposed improvement should be okay. > Handle task corrupted exception on a per state store basis > ---------------------------------------------------------- > > Key: KAFKA-13350 > URL: https://issues.apache.org/jira/browse/KAFKA-13350 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Major > > When we hit an `OffsetOutOfRangeException` during restore, we close a tasks > as dirty and retry the restore process from scratch. For this case, we wipe > out the task's state stores. > If a task has multiple state stores, we also wipe out state that is actually > clean and thus need to redo work for no reason. Instead of wiping out all > state store, we should only wipe out the single state store that corresponds > to the changelog topic partition that hit the `OffsetOutOfRangeException`, > but preserve the restore progress for all other state stores. > We need to consider persistent and in-memory stores: for persistent stores, > it would be fine to close the not affected stores cleanly and also write the > checkpoint file. For in-memory stores however, we should not close the store > to avoid dropping the in-memory data. > *TODO:* verify consumer behavior: if a consumer subscribes to multiple > partitions, and two or more partitions are on the same broker, both could > trigger an `OffsetOutOfRangeException` from a single fetch request at the > same time – however, it seems that the consumer only reports a single > `TopicPartition` when it raises an `OffsetOutOfRangeException`. Thus, we need > to ensure to not lose information and maybe need to update the consumer to > report all affected partitions at once? Or maybe it won't be an issue, > because the next fetch request would send the same offset for the "missed" > partitions and thus we would get a new `OffsetOutOfRangeException` anyway (it > might still be more efficient to get all affected partitions at once). -- This message was sent by Atlassian Jira (v8.20.1#820001)