[ 
https://issues.apache.org/jira/browse/KAFKA-6599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602124#comment-17602124
 ] 

Guozhang Wang commented on KAFKA-6599:
--------------------------------------

Hi [~jfilipiak], great to hear back from you.

Yeah I think I agree with you the whether joined tables are materialized or not 
are orthogonal to this issue, what I thought is that, if it is materialized, 
then we can potentially mitigate the issue by comparing the new joined value 
with the old value (retrieved from the materialized store) and then we would 
know if the new value should be emitted. But that's not the key of this issue 
itself.

For this issue itself, I think the key to it is still decoupling the caching 
from emitting policy itself. We are currently working towards that already by 
allow users to specify explicitly when to emit / suppress, and by default even 
with caching we always emit results.

> KTable KTable join semantics violated when caching enabled
> ----------------------------------------------------------
>
>                 Key: KAFKA-6599
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6599
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Jan Filipiak
>            Priority: Critical
>              Labels: bug
>
> Say a tuple A,B got emmited after joining and the delete for A goes into the 
> cache. After that the B record would be deleted aswell. B's join processor 
> would look up A and see `null` while computing for old and new value (at this 
> point we can execute joiner with A beeing null and still emit something, but 
> its not gonna represent the actual oldValue) Then As cache flushes it doesn't 
> see B so its also not gonna put a proper oldValue. The output can then not be 
> used for say any aggregate as a delete would not reliably find its old 
> aggregate where it needs to be removed from filter will also break as it 
> stopps null,null changes from propagating. So for me it looks pretty clearly 
> that Caching with Join breaks KTable semantics. be it my new join or the 
> currently existing once.
>  
> this if branch here
> [https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L155]
> is not usefull. I think its there because when one would delegate the true 
> case to the underlying. One would get proper semantics for streams, but the 
> weiredest cache I've seen.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to