[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163002#comment-17163002
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

Hey guys thanks for your input.

I think [~vvcephei] has a point that relying on an implicit implication of 
{{StreamsBuilder#table(Consumed,  Materialized)}} might be asking for more 
trouble than convenience from users; on the other hand, we cannot tell if the 
two serdes specified in those two control objects are exact the same or not, 
and even if they do, we cannot guarantee that the serde is symmetric, and have 
no side effects. But I'm not sure I follow why we want to "deprecate the 
topology constructor that takes Properties" because the Properties parameter is 
not only used for reading the OPTIMIZE config, but also for other purposes such 
as reading application ID, etc.

So by the end of the day we still need to push this decision to users. More 
concretely we can ask them "whether Streams should create a new changelog topic 
or not", and if "no" then we just treat the source topic as normal changelog 
topics, a.k.a we would not apply serde either, so users need to be aware that 
by relying "no" they indicate that serde is symmetric and have no side effects 
etc.

And this question would be asked per-source-store, and we can either consider 
adding this API to the `StreamBuilder#table` API or extending the 
`Materialized` class that allows a `enableSourceAsChangelog`.

As for global state stores, I think I'm more inclined to my original propose as 
to treat its restoration just as normal processing and never create changelogs.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to