[ 
https://issues.apache.org/jira/browse/KAFKA-6599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600611#comment-17600611
 ] 

Jan Filipiak edited comment on KAFKA-6599 at 9/9/22 9:07 AM:
-------------------------------------------------------------

Just jumping in here as I'm prepping slides with this included now.

Anyone looked at the fix i suggested of just letting streams use the underlying 
store? That should fix it instantly?

Maybe someone can explain the whole
{quote}I think a more general issue here is that the joined table is not 
materialized
{quote}
? [~guozhang]  maybe? If he joined table is not materialized we don't have 
correct KTable-KTable join semantics anyways. So i don't know how one ends in 
this position but i would think its not to relevant to fixing it?

 
{quote}the old joined value == new joined value,
{quote}
I made a point 5 years back. that filtering these should actually be a user 
operation. It is not possible from streams to know at wich point this makes 
sense todo. After the join there could be an immediate mapper only preserving a 
few fields and the comparison would run much more efficient there.


was (Author: jfilipiak):
Just jumping in here as I'm prepping with this included now.

Anyone looked at the fix i suggested of just letting streams use the underlying 
store? That should fix it instantly?

Maybe someone can explain the whole
{quote}I think a more general issue here is that the joined table is not 
materialized
{quote}
? [~guozhang]  maybe? If he joined table is not materialized we don't have 
correct KTable-KTable join semantics anyways. So i don't know how one ends in 
this position but i would think its not to relevant to fixing it?

 
{quote}the old joined value == new joined value,
{quote}
I made a point 5 years back. that filtering these should actually be a user 
operation. It is not possible from streams to know at wich point this makes 
sense todo. After the join there could be an immediate mapper only preserving a 
few fields and the comparison would run much more efficient there.

> KTable KTable join semantics violated when caching enabled
> ----------------------------------------------------------
>
>                 Key: KAFKA-6599
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6599
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Jan Filipiak
>            Priority: Critical
>              Labels: bug
>
> Say a tuple A,B got emmited after joining and the delete for A goes into the 
> cache. After that the B record would be deleted aswell. B's join processor 
> would look up A and see `null` while computing for old and new value (at this 
> point we can execute joiner with A beeing null and still emit something, but 
> its not gonna represent the actual oldValue) Then As cache flushes it doesn't 
> see B so its also not gonna put a proper oldValue. The output can then not be 
> used for say any aggregate as a delete would not reliably find its old 
> aggregate where it needs to be removed from filter will also break as it 
> stopps null,null changes from propagating. So for me it looks pretty clearly 
> that Caching with Join breaks KTable semantics. be it my new join or the 
> currently existing once.
>  
> this if branch here
> [https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L155]
> is not usefull. I think its there because when one would delegate the true 
> case to the underlying. One would get proper semantics for streams, but the 
> weiredest cache I've seen.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to