[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006992#comment-17006992 ]
Guozhang Wang commented on KAFKA-8037: -------------------------------------- For non-global KTables, it now makes me thinking if our current mechanism for source KTables on restoring is sound: upon (re-) starting that application, there should only be three cases: 1) the local checkpointed offset is equal to the committed offset, in which case there should be no restoring needed at all. 2) the local checkpointed offset is smaller to the committed offset (including the case checkpoint offset does not exist, in which case it is just the beginning offset), in which case we can safely restore up to the committed offset since these records have been through serde during the normal processing in the last run. 3) there's no committed offset, in this case there MUST be no checkpointed offset too. Now this case is the one that I'm now wondering: today we just restore to the read-once end-offset, in which case it is possible to load bad data since they do not go through normal serde in the last run, but should we really do it? Or should we just transit to normal processing all at once? > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Patrik Kleindl > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)