[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163011#comment-17163011 ]
Sophie Blee-Goldman edited comment on KAFKA-8037 at 7/22/20, 6:57 PM: ---------------------------------------------------------------------- I'm tempted to agree with Matthias that asymmetric/side-effect serdes should be discouraged rather than going out of our way to support them. If you have an asymmetric serde that does some kind of mapping of the record value, you should insert a .mapValues step instead of putting that transformation in the serde. Then the source changelog optimization would be automatically disabled. Of course if you have some trivial operation like dropping a field during deserialization, that might be ok. Similarly if your serde has side effects, I would argue that those operations do not belong in a serde but in their own separate processing step. Obviously that doesn't solve the problem with corrupt data, but as Matthias pointed out we can check if the default deserialization exception handler is used and if so we don't need to re-deserialize everything since we know it's all clean. If another deserialization exception handler is used, we'll have to do the deserialization again and there doesn't seem to be a way around that. But I don't see why we would force this extra deserialization to occur if it's not necessary – people already complain about how long is takes to restore and probably wouldn't be happy if we made that time even longer for reasons that don't affect them. Another advantage of this is that we don't have to ask the user to tell Streams whether the source changelog optimization should be applied. We can just figure it out ourselves. It's not a particularly complicated optimization to understand, but if the existence of this bug and the length of this discussion says anything, it's that it's not trivial to understand the full implications of the optimization. Asking users to configure this is adding just one more thing to learn and keep in mind every time they create a new table. My one caveat is that I think it would be nice to move towards giving users more fine grained control over optimizations. But maybe if we solve this problem then there's less of a reason to need that was (Author: ableegoldman): I'm tempted to agree with Matthias that asymmetric/side-effect serdes should be discouraged rather than going out of our way to support them. If you have an asymmetric serde that does some kind of mapping of the record value, you should insert a .mapValues step instead of putting that transformation in the serde. Then the source changelog optimization would be automatically disabled. Of course if you have some trivial operation like dropping a field during deserialization, that might be ok. Similarly if your serde has side effects, I would argue that those operations do not belong in a serde but in their own separate processing step. Obviously that doesn't solve the problem with corrupt data, but as Matthias pointed out we can check if the default deserialization exception handler is used and if so we don't need to re-deserialize everything since we know it's all clean. If another deserialization exception handler is used, we'll have to do the deserialization again and there doesn't seem to be a way around that. But I don't see why we would force this extra deserialization to occur if it's not necessary – people already complain about how long is takes to restore and probably wouldn't be happy if we made that time even longer for reasons that don't affect them. Another advantage of this is that we don't have to ask the user to tell Streams whether the source changelog optimization should be applied. We can just figure it out ourselves. It's not a particularly complicated optimization to understand, but if the existence of this bug and the length of this discussion says anything, it's that it's not trivial to understand the full implications of the optimization. Asking users to configure this is adding just one more thing to learn and keep in mind every time they create a new table. > KTable restore may load bad data > -------------------------------- > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)