[ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598168#comment-17598168
 ] 

Luke Jackson commented on KAFKA-13386:
--------------------------------------

Thanks [~guozhang] for the detail, and explaining why a check solely on FK 
value could result in duplicate join outputs.

I work with [~nikuis] and we've subsequently also hit the issue which [~sduran] 
has reported, related an issue with Schema Registry IDs being incorrectly 
assigned by the cached schema registry client (a bug which is fixed in later 
versions).

I wonder whether using the event (LHS) message offset instead of the value hash 
would be better?

It wouldn't depend on the serialiser being deterministic (as the serialiser 
wouldn't be used at all), and it would avoid the overhead of having to 
re-serialise the event (LHS) value, having just deserialised it on retrieving 
it from the state store.

At present this extra serialisation is happening on every event (LHS) message 
to compute the original hash and on every entity (RHS) join attempt to compute 
the hash again to see if the value has changed, which doesn't seem particularly 
efficient, especially for cases like ours where we join fast moving data.

> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-13386
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13386
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.2
>            Reporter: Sergio Duran Vegas
>            Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. So in case of changes this opimitzation will drop 
> invalid/intermediate records. In other situations we have relied on the same 
> property, for example when computing whether an update is a duplicate result 
> or not.
>  
> The problem is that some serializers are sadly not deterministic.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to