[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161423#comment-17161423 ] Guozhang Wang commented on KAFKA-10179: --- I agree with Almog and Rohan’s arguments here. What I’m thinking is how we could define a principle for users to indicate that: 1) the bytes in the source topic are exactly the same as bytes in the state store (i.e. the serdes are symmetric). 2) there’s no side-effects that serde incurs; only 1) and 2) together means it is safe to skip serde during restoration. 3) and also, there’s no corrupted or ill-formatted data from source topics that should be skipped when loading into state stores. This is https://issues.apache.org/jira/browse/KAFKA-8037 During restoration time, compared with during normal processing time. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146707#comment-17146707 ] Almog Gavra commented on KAFKA-10179: - [~ableegoldman] I confirmed locally that nothing "breaks" if we use a deserializer that projects a subset of the fields in the record, as you suspected, but consider the following points: # Some of the most popular serdes are asymmetric (e.g. avro builds in the concept of reader/writer schema into their APIs) # It may be impossible to determine, for a given serde, whether it is symmetric # State after recovery should be identical to before recovery for predictable operations (especially in cloud environments) # Some of the most popular serdes have side effects (e.g. Confluent schema registry serdes will create subjects on your behalf) In practice, the first three points in conjunction with what [~mjsax] said (the source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic), means that we can't safely turn on the source-topic-changelog optimization unless the user indicates either (a) they are using a symmetrical serde or (b) they are willing to waive 3 in order to speed up recovery ([~cadonna] if we consider 3 a matter of correctness, we can't sacrifice correctness for performance without the user's consent). Even if the user indicates (a) or (b) above, I still don't think we can implement the fix described here because of the fourth point. It may be possible that the user is using a symmetric serde but their schema is not identical to the one that wrote to the kafka topic (e.g. ksql, for example, generates a new schema where all the fields are the same but the schema has a different name, I can also easily imagine a schema with _more_ fields that would write the same value as it read from an event with fewer fields). I'm not sure I understand this comment: "The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing)." Why can't we just always pass-through the data into the state store if the optimization is enabled? > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143238#comment-17143238 ] Sophie Blee-Goldman commented on KAFKA-10179: - [~desai.p.rohan] I'm not sure I understand why it's a problem for the deserializer to modify the value slightly, by dropping fields to take your example. We would end up restoring the full bytes into the store, sure, but the plain bytes are never actually used right? We would always go through the deserializer when reading the value from the store and using it in an operation. So the "extra" fields would still get dropped. Maybe if your values are bloated with a lot of useful information that you didn't want to store, this could blow up the disk usage. But I think there's a difference between a simple operation on data to extract only the relevant bits – eg dropping a field you don't care about – and fundamentally transforming the data to get it into a different form. The former seems reasonable to do during a deserialization, but the latter should be its own operation in the topology. Of course, this just applies to modifying the values. If your deserializer modifies the key in any way, this would be a problem since lookups by key would fail after a restoration copies over the plain bytes. But I would argue that it's illegal to modify the key during de/serialization at all, not because of the restoration issue but because it can cause incorrect partitioning. Anyways, I'm probably overlooking something obvious, but I'm struggling to see exactly where and how this breaks. That said I do agree we should clarify that `serialize(deserialize())` must be the identity for keys > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140328#comment-17140328 ] Bruno Cadonna commented on KAFKA-10179: --- [~desai.p.rohan] While I find the idea of optimizing the materialization in the deserializer intriguing, I think the performance penalty that we would pay by deserializing and serializing each record during restoration is not worthwhile. Additionally -- if optimization is turned on -- we would need to read the original data from the source topic instead of the projected data from the changelog topic during each restoration which would again hit performance. Of course, we would need experiments to better understand the implications. An alternative idea would be to allow to plugin a byte-based transformation that does not need to deserialize and serialize each record. However, that would not solve the issue of having to read the unprojected data during each restoration. If you are concerned with the amount of data to materialize a solution could be to optimize on topology-level by introducing a {{map()}} that makes the projection followed by a {{toTable()}} to materialize the data. That data read from the input topic would be the unprojected data but the one materialized is the projected one and also during restoration we would just read the projected data. An additional advantage of this method is that you can leave the source table optimization turned on, because it would not apply to this case. In summary, the source table optimization was not introduced for the case you describe. IMO, it is not even an optimization in that case. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140208#comment-17140208 ] Matthias J. Sax commented on KAFKA-10179: - What you say is fair I guess. Given the current code, if you want to do any of those, you need to disable the optimization. However, for the actual bug this ticket is about, the problem seems to be, that if the optimization is turned on, at some point in the code we pass the changelog topic name into the serde instead of the source topic name. And thus the schema cannot be found and the serde crashes. Thus, this ticket should focus on this bug. Not sure if KAFKA-8037 covers all cases you describe. Maybe you want to follow up on this ticket (so we can extent its scope) or create a new ticket that describes the shortcomings of the current implementation. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140166#comment-17140166 ] Rohan Desai commented on KAFKA-10179: - Also, it's not really clear from the documentation that `serialize(deserialize())` is assumed to be the identity function for `ktable(..)`. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140158#comment-17140158 ] Rohan Desai commented on KAFKA-10179: - Deserialization may itself be a transformation. For example, suppose I have source data with 10 fields, but only care about 3 of them for my stream processing app. It seems that it would be reasonable to provide a deserializer that just extracts those 3 fields. I suppose you could express this as a projection after creating the table, but that does preclude optimizations that use selective deserialization. And it may be much more expensive to do the materialization (since you're potentially materializing lots of data unnecessarily). I think there should be some way to achieve each of the following: * optimized and the data in the store is exactly the same as the topic data . In this case (what's implemented today) the data can be restored by writing the source records into the store * optimized and the deserializer transforms the data somehow. In this case the data can be restored by deserializing/serializing each row from the source topic before writing it into the store. I don't think this is possible today. * not optimized (which would you have a transforming deserializer and faster recovery, at the cost of extra data in kafka). I don't think this is possible today without turning all optimizations off. > This is a known issue and tracked via: >https://issues.apache.org/jira/browse/KAFKA-8037 ack - thanks! > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075 ] Matthias J. Sax commented on KAFKA-10179: - {quote}I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. For this case, the soure-topic-changelog optimization does no apply, and the store would always have its own changelog topic. And thus, the input-topic schema registered in the SR should not be "touched", and the write to the changelog topic should register a new scheme using the changelog topic name. Thus, no naming issue in SR should happen. {quote} The source-topic-changelog optimization really only applies, if the data in the input topic is exactly the same as in the changelog topic and thus, we avoid creating the changelog topic. To ensure this, we don't allow any processing to happen in between. The data would be deserialized and re-serialized using the same Serde (this is inefficiency we pay, as we also need to send the de-serialized data downstream for further processing). {quote}Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. {quote} This is a know issue and tracked via: https://issues.apache.org/jira/browse/KAFKA-8037 > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
[ https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139926#comment-17139926 ] Rohan Desai commented on KAFKA-10179: - I'm not sure it's correct to use the same "topic" name for materializing optimized source tables, as it's logically different data. In the normal flow (not recovery), we're taking the topic data, validating/transforming it by deserializing it (which might apply some transforms like projecting just fields of interest), and then serializing it, and then writing it into the store. So the "topic" we pass to the serializer should be different since it represents different data from the source topic. This has consequences in practice when used with a schema registry using the confluent serializers. If we use the same topic, `serialize` might register a different schema with the source subject, which we probably don't want. I think the technically correct thing to do (though this is of course more expensive) would be (when the source table is optimized) to deserialize and serialize each record when restoring. Another issue that I think exists (need to try to reproduce) that deserializing/serializing would solve is skipped validation. The source topic deserializer functions as a sort of validator for records from the source topic. When the streams app is configured to skip on deserialization errors, bad source records are just skipped. However if we restore by just writing those records to the state store, we now hit the deserialization error when reading the state store, which is a query-killing error. > State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables > - > > Key: KAFKA-10179 > URL: https://issues.apache.org/jira/browse/KAFKA-10179 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 2.7.0 > > > {{MeteredKeyValueStore}} passes the name of the changelog topic of the state > store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for > optimized source tables the changelog topic is the source topic. > Most serdes do not use the topic name passed to them. However, if the serdes > actually use the topic name for (de)serialization, e.g., when Kafka Streams > is used with Confluent's Schema Registry, a > {{org.apache.kafka.common.errors.SerializationException}} is thrown. -- This message was sent by Atlassian Jira (v8.3.4#803005)