[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161565#comment-17161565
 ] 

Guozhang Wang commented on KAFKA-8037:
--------------------------------------

There are some more discussions on related issues in KAFKA-10179, and I'd like 
to continue the discussion inheriting from that ticket since it is marked as 
resolved. This would effectively enlarge the scope of this ticket, in quick 
sum: 

1) we need to decide, if we can reuse the source topic as changelog for source 
tables (including global ktables) at all; today as long as optimization is 
enabled it always reuse blindly.
2) if 1) is "yes", we still need to decide, when restoring from the source 
topic, could we safely skip serde / customized processor at all;
3) if 1) is "yes", we still need to decide, when restoring how to skip loading 
ill-formatted data into the store. Note that we cannot control if the source 
topic is log compacted or not, and hence if the bad new data ever obsolete the 
old good data in log compaction, it is out of Streams' control, but Streams 
itself need to make sure the behavior during normal processing and during 
restoration is consistent.

For 1) and 2), there are a couple of scenarios we need to consider: 

* are the bytes in the source topic exactly the same as bytes in the state 
store (i.e. the serdes are symmetric). For example, if users specified both 
{{Consumed}} and {{Materialized}} in the table / globalTable API, then they may 
use different serdes.
* is there any side-effects that serde implementation incurs. For example, does 
it register a subject to the schema registry service.
* are there any customized processor between the consumer and the 
materialization --- today this is only possible with global KTable / state 
stores.

My thoughts are that, given we are going to optimize restoration to separate 
threads that would be less impacting the HA of Streams, the second goal "make 
restoration AFAP without serde" is a good to have but not critical, whereas on 
the other hand, transiting from "do not create a separate changelog topic" to 
"create a separate changelog topic" would be considered a big regression to 
user's perspective. So I'm proposing that we try to keep the first goal of 
"reuse source topic to avoid replicating data" while making the second an 
opt-in feature that user's specify. More specifically:

1) when restoring the state stores, the module would differentiate normal 
changelog topics restoration (no serde, just bytes copy) from the source 
topics, and for the latter case we would handle it just like normal processing, 
in which we would do deserialization (using the {{Consumed}} serde) with error 
handling, customized processing if needed, and then serialization (using the 
{{Materialized}} serde), and then writes to store.
2) as an opt-in feature for non-global KTables, when users only specify the 
{{Consumed}} but not {{Materialized}} so that it would piggy-back with 
{{Consumed}}, we can skip the serialization step, i.e. we only do the 
deserialization to check again bad data, and then writes the original raw data 
to the state store. I.e. we basically push the responsibility to users that, if 
they think the serde has side effects or is not symmetric, they should always 
specify the {{Materialized}} object to indicate the serde step should not be 
skipped.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to