[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161675#comment-17161675
 ] 

John Roesler commented on KAFKA-8037:
-------------------------------------

Thanks for your thoughts, [~guozhang] .

It really seems like this optimization has grown to be quite a lot of trouble, 
but it does also seem like for some users the duplicated changelog topic might 
be a heavy burden, despite its advantages.

Your reasoning about Consumed vs. Materialized serdes is sound. However, from a 
usability perspective, it seems like there's a mismatch between the subtlety of 
the feature vs. the size of the effect. I'm also a little concerned because I'm 
now looking back at all the times I have recommended people to just specify 
serdes everywhere when they are having a lot of trouble with the inheritance 
rules. Also, it comes to mind that this is exactly what the Scala API does: 
every single operator gets serdes implicitly provided, if they can be resolved.

Also, it does seem like there are some aspects of the optimization's 
correctness that depend on semantics of the source topic we can't know, and 
which might vary between two topics in the same application.

I'm wondering if we could just make the optimization an explicit opt-in option 
on the source node itself. I.e., we'd deprecate the topology constructor that 
takes Properties and add some kind of `useSourceAsChangelog()` parameter to the 
StreamBuilder#table and StreamBuilder#globalTable methods.

Then, the mental model is very simple: you always get a changelog for stores 
unless you specifically ask not to get them. If you opt-in to the optimization, 
we do just what you said in (1), and that's it. It's a simple and clear 
tradeoff between storage (and network) of the changelog vs. CPU time on 
restore, but either way we get a correct and safe outcome. The one fly in the 
ointment is if the serde or the ingest function (as in global tables) is 
non-deterministic, but that's out of our hands.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to