[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166090#comment-17166090 ]
Guozhang Wang commented on KAFKA-8037: -------------------------------------- Thank you all for providing your thoughts! In consolidating them I'm going to propose the following: 1) when source tables have source topics as changelog topics, we will make the restoration behavior to be consistent with normal processing by: 1.a) During normal processing, use the deserializer from Consumed and handle exceptions if necessary, and then put the raw bytes into the store; the deserialized object from Consumed serde will be passed along to downstream. During IQ the object deserialized from Materialized serde will be returned. 1.b) During restoration, use deserializer from Consumed and handle exceptions if necessary, then put the raw bytes into the store. By doing so only deserializer of Consumed / Materialized would be used and hence no symmetry is required, and since Materialized serializer is not used there's no risk of side-effects. Admittedly 1.b) is a bit tricky to do and may need poking some holes in class hierarchies, we will see if an elegant solution can be made while implementing it. 2) we will not make reusing the changelog topics as part of the optimization any more but would add a new public API to allow users to turn on / off this optimization per-store. As a result of that we would deprecate the `OPTIMIZE_ALL` config value, and instead use version numbers as possible config values, e.g. "topology.optimization = 2.6" means enable all optimization rules that are available as of Kafka version 2.6.x. We do not guarantee an optimization rule would exist forever in future versions, e.g. if in the future we found another optimization which may have side-effects and hence is not ubiquitously better, we may remove it. And users wanting to maintain backward compatibility can stick with the original version number for optimization. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)