[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161565#comment-17161565 ]
Guozhang Wang commented on KAFKA-8037: -------------------------------------- There are some more discussions on related issues in KAFKA-10179, and I'd like to continue the discussion inheriting from that ticket since it is marked as resolved. This would effectively enlarge the scope of this ticket, in quick sum: 1) we need to decide, if we can reuse the source topic as changelog for source tables (including global ktables) at all; today as long as optimization is enabled it always reuse blindly. 2) if 1) is "yes", we still need to decide, when restoring from the source topic, could we safely skip serde / customized processor at all; 3) if 1) is "yes", we still need to decide, when restoring how to skip loading ill-formatted data into the store. Note that we cannot control if the source topic is log compacted or not, and hence if the bad new data ever obsolete the old good data in log compaction, it is out of Streams' control, but Streams itself need to make sure the behavior during normal processing and during restoration is consistent. For 1) and 2), there are a couple of scenarios we need to consider: * are the bytes in the source topic exactly the same as bytes in the state store (i.e. the serdes are symmetric). For example, if users specified both {{Consumed}} and {{Materialized}} in the table / globalTable API, then they may use different serdes. * is there any side-effects that serde implementation incurs. For example, does it register a subject to the schema registry service. * are there any customized processor between the consumer and the materialization --- today this is only possible with global KTable / state stores. My thoughts are that, given we are going to optimize restoration to separate threads that would be less impacting the HA of Streams, the second goal "make restoration AFAP without serde" is a good to have but not critical, whereas on the other hand, transiting from "do not create a separate changelog topic" to "create a separate changelog topic" would be considered a big regression to user's perspective. So I'm proposing that we try to keep the first goal of "reuse source topic to avoid replicating data" while making the second an opt-in feature that user's specify. More specifically: 1) when restoring the state stores, the module would differentiate normal changelog topics restoration (no serde, just bytes copy) from the source topics, and for the latter case we would handle it just like normal processing, in which we would do deserialization (using the {{Consumed}} serde) with error handling, customized processing if needed, and then serialization (using the {{Materialized}} serde), and then writes to store. 2) as an opt-in feature for non-global KTables, when users only specify the {{Consumed}} but not {{Materialized}} so that it would piggy-back with {{Consumed}}, we can skip the serialization step, i.e. we only do the deserialization to check again bad data, and then writes the original raw data to the state store. I.e. we basically push the responsibility to users that, if they think the serde has side effects or is not symmetric, they should always specify the {{Materialized}} object to indicate the serde step should not be skipped. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)