Charles-Eddy created KAFKA-15303:
------------------------------------
Summary: Foreign key joins no longer triggered by events on the
right side of the join after deployment with a new compatible Avro schema
Key: KAFKA-15303
URL: https://issues.apache.org/jira/browse/KAFKA-15303
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.4.0
Reporter: Charles-Eddy
Hello everyone, I am currently working on a project that uses Kafka Streams
(version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
Our goal is to join offer information from our sellers with additional data
from various input topics, and then feed the resulting joined information into
an output topic.
Our application is deployed in Kubernetes using the StatefulSet feature, with
one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
We are using avro to serialize / deserialize input topics and storing in the
state stores of Kafka streams.
We have encountered a bug in Kafka Streams that prevents us from deploying new
versions of Kafka Streams containing new compatible Avro schemas of our input
topics.
The symptom is that after deploying our new version, which contains no changes
in topology but only changes to the Avro schema used, we discard every event
coming from the right part of the join concerned by these Avro schema changes
until we receive something from the left part of the join.
As a result, we are losing events and corrupting our output topics and stores
with outdated information.
After checking the local help for the priority to assign, I have assigned it as
*CRITICAL* because we are losing data (for example, tombstones are not
propagated to the following joins, so some products are still visible on our
website when they should not be).
Please feel free to change the priority if you think it is not appropriate.
*The bug:*
After multiple hours of investigation we found out that the bug is located in
the foreign key join feature and specifically in this class:
*SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key
join.
This class and his method process(...) is computing a hash from the local store
via a serialization of a deserialized value from the left state store and
comparing it with the hash of the original message from the
subscription-response-topic.
It means that when we deploy a new version of our kafka streams instance with a
new compatible avro schema from the left side of a join, every join triggered
by the right part of the join are invalidated until we receive all the events
again on the left side. Every join triggered by the right part of the join are
discarded because all the hashes computed by kafka streams are different now
from the original messages.
*How to reproduce it:*
If we take a working and a non-working workflow, it will do something like this:
+Normal non-breaking+ workflow from the left part of the FK join: # A new offer
event occurs. The offer is received and stored (v1).
# A subscription registration is sent with the offer-hash (v1).
# The subscription is saved to the store with the v1 offer-hash.
# Product data is searched for.
# If product data is found, a subscription response is sent back, including
the v1 offer-hash.
# The offer data in the store is searched for and the offer hashes between the
store (v1) and response event (also v1) are compared.
# Finally, the join result is sent.
New product event from the right part of the FK join: # The product is received
and stored.
# All related offers in the registration store are searched for.
# A subscription response is sent for each offer, including their offer hash
(v1).
# The offer data in the store is searched for and the offer hashes between the
store (v1) and response event (also v1) are compared.
# Finally, the join result is sent.
+Breaking workflow:+
The offer serializer is changed to offer v2
New product event from the right part of the FK join: # The product is received
and stored.
# All related offers in the registration store are searched for.
# A subscription response is sent for each offer, including their offer hash
(v1).
# The offer data in the store is searched for and the offer hashes between the
store (v2) and response event (still v1) are compared.
# The join result is not sent since the hash is different.
*Potential Fix:*
Currently, the offer’s hash is computed from the serialization of deserialized
offer data. This means that binary offer data v1 is deserialized into a v2 data
class using a v1-compatible deserializer, and then serialized into binary v2
data.
As a result, we are comparing a v2 hash (from the store) with a v1 hash (from
the response event).
Is it possible to avoid this binary v1 -> data class v2 -> binary v2 step by
directly retrieving the bytes from the store to compute the hash?
This way, the hash would be the same.
Thanks.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)