[ https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Charles-Eddy updated KAFKA-15303: --------------------------------- Attachment: image (1).png > Foreign key joins no longer triggered by events on the right side of the join > after deployment with a new compatible Avro schema > -------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-15303 > URL: https://issues.apache.org/jira/browse/KAFKA-15303 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.4.0 > Reporter: Charles-Eddy > Priority: Critical > Attachments: image (1).png > > > Hello everyone, I am currently working on a project that uses Kafka Streams > (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK. > Our goal is to join offer information from our sellers with additional data > from various input topics, and then feed the resulting joined information > into an output topic. > Our application is deployed in Kubernetes using the StatefulSet feature, with > one EBS volume per Kafka Streams pod and 5 Streams Threads per pod. > We are using avro to serialize / deserialize input topics and storing in the > state stores of Kafka streams. > We have encountered a bug in Kafka Streams that prevents us from deploying > new versions of Kafka Streams containing new compatible Avro schemas of our > input topics. > The symptom is that after deploying our new version, which contains no > changes in topology but only changes to the Avro schema used, we discard > every event coming from the right part of the join concerned by these Avro > schema changes until we receive something from the left part of the join. > As a result, we are losing events and corrupting our output topics and stores > with outdated information. > After checking the local help for the priority to assign, I have assigned it > as *CRITICAL* because we are losing data (for example, tombstones are not > propagated to the following joins, so some products are still visible on our > website when they should not be). > Please feel free to change the priority if you think it is not appropriate. > > *The bug:* > After multiple hours of investigation we found out that the bug is located in > the foreign key join feature and specifically in this class: > *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key > join. > This class and his method process(...) is computing a hash from the local > store via a serialization of a deserialized value from the left state store > and comparing it with the hash of the original message from the > subscription-response-topic. > > It means that when we deploy a new version of our kafka streams instance with > a new compatible avro schema from the left side of a join, every join > triggered by the right part of the join are invalidated until we receive all > the events again on the left side. Every join triggered by the right part of > the join are discarded because all the hashes computed by kafka streams are > different now from the original messages. > > *How to reproduce it:* > If we take a working and a non-working workflow, it will do something like > this: > +Normal non-breaking+ workflow from the left part of the FK join: > # A new offer event occurs. The offer is received and stored (v1). > # A subscription registration is sent with the offer-hash (v1). > # The subscription is saved to the store with the v1 offer-hash. > # Product data is searched for. > # If product data is found, a subscription response is sent back, including > the v1 offer-hash. > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v1) and response event (also v1) are compared. > # Finally, the join result is sent. > > +Breaking workflow:+ > The offer serializer is changed to offer v2 > New product event from the right part of the FK join: > # The product is received and stored. > # All related offers in the registration store are searched for. > # A subscription response is sent for each offer, including their offer hash > (v1). > # The offer data in the store is searched for and the offer hashes between > the store (v2) and response event (still v1) are compared. > # The join result is not sent since the hash is different. > > *Potential Fix:* > Currently, the offer’s hash is computed from the serialization of > deserialized offer data. This means that binary offer data v1 is deserialized > into a v2 data class using a v1-compatible deserializer, and then serialized > into binary v2 data. > As a result, we are comparing a v2 hash (from the store) with a v1 hash (from > the response event). > Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by > directly retrieving the bytes from the store to compute the hash? > This way, the hash would be the same. > > Thanks. -- This message was sent by Atlassian Jira (v8.20.10#820010)