[ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Charles-Eddy updated KAFKA-15303:
---------------------------------
    Description: 
Hello everyone, I am currently working on a project that uses Kafka Streams 
(version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
Our goal is to join offer information from our sellers with additional data 
from various input topics, and then feed the resulting joined information into 
an output topic.

Our application is deployed in Kubernetes using the StatefulSet feature, with 
one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.

We are using avro to serialize / deserialize input topics and storing in the 
state stores of Kafka streams.

We have encountered a bug in Kafka Streams that prevents us from deploying new 
versions of Kafka Streams containing new compatible Avro schemas of our input 
topics.

The symptom is that after deploying our new version, which contains no changes 
in topology but only changes to the Avro schema used, we discard every event 
coming from the right part of the join concerned by these Avro schema changes 
until we receive something from the left part of the join.

As a result, we are losing events and corrupting our output topics and stores 
with outdated information.

After checking the local help for the priority to assign, I have assigned it as 
*CRITICAL* because we are losing data (for example, tombstones are not 
propagated to the following joins, so some products are still visible on our 
website when they should not be).

Please feel free to change the priority if you think it is not appropriate.

 

*The bug:*
After multiple hours of investigation we found out that the bug is located in 
the foreign key join feature and specifically in this class: 
*SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
join. 
This class and his method process(...) is computing a hash from the local store 
via a serialization of a deserialized value from the left state store and 
comparing it with the hash of the original message from the 
subscription-response-topic. 
 
It means that when we deploy a new version of our kafka streams instance with a 
new compatible avro schema from the left side of a join, every join triggered 
by the right part of the join are invalidated until we receive all the events 
again on the left side. Every join triggered by the right part of the join are 
discarded because all the hashes computed by kafka streams are different now 
from the original messages.
 
*How to reproduce it:*
If we take a working and a non-working workflow, it will do something like this:
+Normal non-breaking+ workflow from the left part of the FK join:
 # A new offer event occurs. The offer is received and stored (v1).
 # A subscription registration is sent with the offer-hash (v1).
 # The subscription is saved to the store with the v1 offer-hash.
 # Product data is searched for.
 # If product data is found, a subscription response is sent back, including 
the v1 offer-hash.
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

New product event from the right part of the FK join:
 # The product is received and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

 
+Breaking workflow:+ 
The offer serializer is changed to offer v2
New product event from the right part of the FK join: 
 # The product is received and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v2) and response event (still v1) are compared.
 # The join result is not sent since the hash is different.

 

*Potential Fix:*
Currently, the offer’s hash is computed from the serialization of deserialized 
offer data. This means that binary offer data v1 is deserialized into a v2 data 
class using a v1-compatible deserializer, and then serialized into binary v2 
data.
As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
the response event).
Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
directly retrieving the bytes from the store to compute the hash?
This way, the hash would be the same.
 
Thanks.

  was:
Hello everyone, I am currently working on a project that uses Kafka Streams 
(version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
Our goal is to join offer information from our sellers with additional data 
from various input topics, and then feed the resulting joined information into 
an output topic.

Our application is deployed in Kubernetes using the StatefulSet feature, with 
one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.

We are using avro to serialize / deserialize input topics and storing in the 
state stores of Kafka streams.

We have encountered a bug in Kafka Streams that prevents us from deploying new 
versions of Kafka Streams containing new compatible Avro schemas of our input 
topics.

The symptom is that after deploying our new version, which contains no changes 
in topology but only changes to the Avro schema used, we discard every event 
coming from the right part of the join concerned by these Avro schema changes 
until we receive something from the left part of the join.

As a result, we are losing events and corrupting our output topics and stores 
with outdated information.

After checking the local help for the priority to assign, I have assigned it as 
*CRITICAL* because we are losing data (for example, tombstones are not 
propagated to the following joins, so some products are still visible on our 
website when they should not be).

Please feel free to change the priority if you think it is not appropriate.

 

*The bug:*
After multiple hours of investigation we found out that the bug is located in 
the foreign key join feature and specifically in this class: 
*SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
join. 
This class and his method process(...) is computing a hash from the local store 
via a serialization of a deserialized value from the left state store and 
comparing it with the hash of the original message from the 
subscription-response-topic. 
 
It means that when we deploy a new version of our kafka streams instance with a 
new compatible avro schema from the left side of a join, every join triggered 
by the right part of the join are invalidated until we receive all the events 
again on the left side. Every join triggered by the right part of the join are 
discarded because all the hashes computed by kafka streams are different now 
from the original messages.
 
*How to reproduce it:*
If we take a working and a non-working workflow, it will do something like this:
+Normal non-breaking+ workflow from the left part of the FK join: # A new offer 
event occurs. The offer is received and stored (v1).
 # A subscription registration is sent with the offer-hash (v1).
 # The subscription is saved to the store with the v1 offer-hash.
 # Product data is searched for.
 # If product data is found, a subscription response is sent back, including 
the v1 offer-hash.
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

New product event from the right part of the FK join: # The product is received 
and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v1) and response event (also v1) are compared.
 # Finally, the join result is sent.

 
+Breaking workflow:+ 
The offer serializer is changed to offer v2
New product event from the right part of the FK join: # The product is received 
and stored.
 # All related offers in the registration store are searched for.
 # A subscription response is sent for each offer, including their offer hash 
(v1).
 # The offer data in the store is searched for and the offer hashes between the 
store (v2) and response event (still v1) are compared.
 # The join result is not sent since the hash is different.

 

*Potential Fix:*
Currently, the offer’s hash is computed from the serialization of deserialized 
offer data. This means that binary offer data v1 is deserialized into a v2 data 
class using a v1-compatible deserializer, and then serialized into binary v2 
data.
As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
the response event).
Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
directly retrieving the bytes from the store to compute the hash?
This way, the hash would be the same.
 
Thanks.


> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15303
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15303
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.4.0
>            Reporter: Charles-Eddy
>            Priority: Critical
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v2) and response event (still v1) are compared.
>  # The join result is not sent since the hash is different.
>  
> *Potential Fix:*
> Currently, the offer’s hash is computed from the serialization of 
> deserialized offer data. This means that binary offer data v1 is deserialized 
> into a v2 data class using a v1-compatible deserializer, and then serialized 
> into binary v2 data.
> As a result, we are comparing a v2 hash (from the store) with a v1 hash (from 
> the response event).
> Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by 
> directly retrieving the bytes from the store to compute the hash?
> This way, the hash would be the same.
>  
> Thanks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to