[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17140075#comment-17140075
 ] 

Matthias J. Sax commented on KAFKA-10179:
-----------------------------------------

{quote}I'm not sure it's correct to use the same "topic" name for materializing 
optimized source tables, as it's logically different data. In the normal flow 
(not recovery), we're taking the topic data, validating/transforming it by 
deserializing it (which might apply some transforms like projecting just fields 
of interest), and then serializing it, and then writing it into the store. So 
the "topic" we pass to the serializer should be different since it represents 
different data from the source topic.

For this case, the soure-topic-changelog optimization does no apply, and the 
store would always have its own changelog topic. And thus, the input-topic 
schema registered in the SR should not be "touched", and the write to the 
changelog topic should register a new scheme using the changelog topic name. 
Thus, no naming issue in SR should happen.
{quote}
The source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic. To ensure this, we don't allow any processing to 
happen in between. The data would be deserialized and re-serialized using the 
same Serde (this is inefficiency we pay, as we also need to send the 
de-serialized data downstream for further processing).
{quote}Another issue that I think exists (need to try to reproduce) that 
deserializing/serializing would solve is skipped validation. The source topic 
deserializer functions as a sort of validator for records from the source 
topic. When the streams app is configured to skip on deserialization errors, 
bad source records are just skipped. However if we restore by just writing 
those records to the state store, we now hit the deserialization error when 
reading the state store, which is a query-killing error.
{quote}
This is a know issue and tracked via: 
https://issues.apache.org/jira/browse/KAFKA-8037

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-10179
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10179
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Bruno Cadonna
>            Assignee: Bruno Cadonna
>            Priority: Major
>             Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{<application 
> ID>-<store name>-changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to