[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163011#comment-17163011
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8037 at 7/22/20, 6:57 PM:
----------------------------------------------------------------------

I'm tempted to agree with Matthias that asymmetric/side-effect serdes should be 
discouraged rather than going out of our way to support them. If you have an 
asymmetric serde that does some kind of mapping of the record value, you should 
insert a .mapValues step instead of putting that transformation in the serde. 
Then the source changelog optimization would be automatically disabled. Of 
course if you have some trivial operation like dropping a field during 
deserialization, that might be ok.

Similarly if your serde has side effects, I would argue that those operations 
do not belong in a serde but in their own separate processing step.

Obviously that doesn't solve the problem with corrupt data, but as Matthias 
pointed out we can check if the default deserialization exception handler is 
used and if so we don't need to re-deserialize everything since we know it's 
all clean. If another deserialization exception handler is used, we'll have to 
do the deserialization again and there doesn't seem to be a way around that. 
But I don't see why we would force this extra deserialization to occur if it's 
not necessary – people already complain about how long is takes to restore and 
probably wouldn't be happy if we made that time even longer for reasons that 
don't affect them. 

Another advantage of this is that we don't have to ask the user to tell Streams 
whether the source changelog optimization should be applied. We can just figure 
it out ourselves. It's not a particularly complicated optimization to 
understand, but if the existence of this bug and the length of this discussion 
says anything, it's that it's not trivial to understand the full implications 
of the optimization. Asking users to configure this is adding just one more 
thing to learn and keep in mind every time they create a new table.

My one caveat is that I think it would be nice to move towards giving users 
more fine grained control over optimizations. But maybe if we solve this 
problem then there's less of a reason to need that


was (Author: ableegoldman):
I'm tempted to agree with Matthias that asymmetric/side-effect serdes should be 
discouraged rather than going out of our way to support them. If you have an 
asymmetric serde that does some kind of mapping of the record value, you should 
insert a .mapValues step instead of putting that transformation in the serde. 
Then the source changelog optimization would be automatically disabled. Of 
course if you have some trivial operation like dropping a field during 
deserialization, that might be ok.

Similarly if your serde has side effects, I would argue that those operations 
do not belong in a serde but in their own separate processing step.

Obviously that doesn't solve the problem with corrupt data, but as Matthias 
pointed out we can check if the default deserialization exception handler is 
used and if so we don't need to re-deserialize everything since we know it's 
all clean. If another deserialization exception handler is used, we'll have to 
do the deserialization again and there doesn't seem to be a way around that. 
But I don't see why we would force this extra deserialization to occur if it's 
not necessary – people already complain about how long is takes to restore and 
probably wouldn't be happy if we made that time even longer for reasons that 
don't affect them. 

Another advantage of this is that we don't have to ask the user to tell Streams 
whether the source changelog optimization should be applied. We can just figure 
it out ourselves. It's not a particularly complicated optimization to 
understand, but if the existence of this bug and the length of this discussion 
says anything, it's that it's not trivial to understand the full implications 
of the optimization. Asking users to configure this is adding just one more 
thing to learn and keep in mind every time they create a new table.

> KTable restore may load bad data
> --------------------------------
>
>                 Key: KAFKA-8037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8037
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Minor
>              Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to