[ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:
------------------------------------
    Issue Type: Improvement  (was: Bug)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15303
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15303
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.4.0
>            Reporter: Charles-Eddy
>            Priority: Critical
>         Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v2) and response event (still v1) are compared.
>  # The join result is not sent since the hash is different.
>  
> *Potential Fix:*
> Currently, the offer’s hash is computed from the serialization of 
> deserialized offer data. This means that binary offer data v1 is deserialized 
> into a v2 data class using a v1-compatible deserializer, and then serialized 
> into binary v2 data.
> As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
> the response event).
> Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
> directly retrieving the bytes from the store to compute the hash?
> This way, the hash would be the same.
>  
> Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to