[ 
https://issues.apache.org/jira/browse/KAFKA-13386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554526#comment-17554526
 ] 

Kin Siu commented on KAFKA-13386:
---------------------------------

[~guozhang], I am not sure if what I faced is same as [~sduran], but I hit an 
issue while load testing my application which contains FK join, there can be no 
output until the very last Left hand side record being processed.
 
Simplified test data in my load testing :
|| - || Key Fields || Value Fields ||
| Left hand side table | K1 | FK1, DF1, UpdateTs |
| Right hand side table | FK1 | V1, V2 |

It is a simple FK join of Left hand side table and Right hand side table on 
field "FK1", for each of the Left hand side update, I changed the DF1 and 
UpdateTs.
When we increase the Left hand side table publishing rate, at some point when 
the value is high, application started not generating any output, until the 
last left hand side record being processed. Think of the case that I ran the 
test for 10 mins, we can end up with only receiving output on the last few 
seconds. And instead of having same number of join output as the Left hand side 
updates + Right hand side updates, we received a lot less.

I belive that it is the same issue as described above, the "hash" comparison 
when processing the right hand side return is compared with latest Left hand 
side hash, while in my test data, the FK relation remain the same, the "hash" 
changed due to value of "DF1" and "UpdateTs" changed.


> Foreign Key Join filtering out valid records after a code change / schema 
> evolved
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-13386
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13386
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.2
>            Reporter: Sergio Duran Vegas
>            Priority: Major
>
> The join optimization assumes the serializer is deterministic and invariant 
> across upgrades. So in case of changes this opimitzation will drop 
> invalid/intermediate records. In other situations we have relied on the same 
> property, for example when computing whether an update is a duplicate result 
> or not.
>  
> The problem is that some serializers are sadly not deterministic.
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplier.java]
>  
> {code:java}
> //If this value doesn't match the current value from the original table, it 
> is stale and should be discarded.
>  if (java.util.Arrays.equals(messageHash, currentHash)) {{code}
>  
> A solution for this problem would be that the comparison use foreign-key 
> reference itself instead of the whole message hash.
>  
> The bug fix proposal is to be allow the user to choose between one method of 
> comparison or another (whole hash or Fk reference). This would fix the 
> problem of dropping valid records on certain cases and allow the user to also 
> choose the current optimized way of checking valid records and intermediate 
> results dropping.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to