[ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751212#comment-17751212
 ] 

Matthias J. Sax commented on KAFKA-15303:
-----------------------------------------

Thanks for filing this ticket.

Kafka Streams as only limited support for schema evolution, because Kafka 
Streams is schema agnostic – the runtime does not even know what format is 
used, and thus cannot reason about it. I updated the ticket as "improvement" 
because it's not a bug: the system works as designed.

In the end, Kafka Streams uses whatever Serde you provide, so it's not clear if 
we even could fix it on our end? Maybe you could put some hack into the Serde 
you provide to fix it? It's unfortunately not possible atm to get the original 
raw bytes right now (that would allow so avoid the re-serialization to begin 
with).

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15303
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15303
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.4.0
>            Reporter: Charles-Eddy
>            Priority: Major
>         Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v2) and response event (still v1) are compared.
>  # The join result is not sent since the hash is different.
>  
> *Potential Fix:*
> Currently, the offer’s hash is computed from the serialization of 
> deserialized offer data. This means that binary offer data v1 is deserialized 
> into a v2 data class using a v1-compatible deserializer, and then serialized 
> into binary v2 data.
> As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
> the response event).
> Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
> directly retrieving the bytes from the store to compute the hash?
> This way, the hash would be the same.
>  
> Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to