[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17166090#comment-17166090
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

Thank you all for providing your thoughts! In consolidating them I'm going to 
propose the following:

1) when source tables have source topics as changelog topics, we will make the 
restoration behavior to be consistent with normal processing by:

1.a) During normal processing, use the deserializer from Consumed and handle 
exceptions if necessary, and then put the raw bytes into the store; the 
deserialized object from Consumed serde will be passed along to downstream. 
During IQ the object deserialized from Materialized serde will be returned.
1.b) During restoration, use deserializer from Consumed and handle exceptions 
if necessary, then put the raw bytes into the store.

By doing so only deserializer of Consumed / Materialized would be used and 
hence no symmetry is required, and since Materialized serializer is not used 
there's no risk of side-effects. Admittedly 1.b) is a bit tricky to do and may 
need poking some holes in class hierarchies, we will see if an elegant solution 
can be made while implementing it.

2) we will not make reusing the changelog topics as part of the optimization 
any more but would add a new public API to allow users to turn on / off this 
optimization per-store. As a result of that we would deprecate  the 
`OPTIMIZE_ALL` config value, and instead use version numbers as possible config 
values, e.g. "topology.optimization = 2.6" means enable all optimization rules 
that are available as of Kafka version 2.6.x. We do not guarantee an 
optimization rule would exist forever in future versions, e.g. if in the future 
we found another optimization which may have side-effects and hence is not 
ubiquitously better, we may remove it. And users wanting to maintain backward 
compatibility can stick with the original version number for optimization.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to