[ 
https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093416#comment-17093416
 ] 

Bruno Cadonna commented on KAFKA-9916:
--------------------------------------

[~guozhang] Thank you for adding all the details!

While I agree with [~mjsax] that this issue of "modifying the field directly" 
may still exist in other operators, I think that the provided example is 
different from the case in which the {{ValueJoiner}} is called twice. In the 
example, the access to the modified object in the second filter is quite 
explicit. However, in the situation that I described above the second call to 
the {{ValueJoiner}} is not explicit at all because it is well hidden in 
Streams' internals. Only users that are very familiar with the internals of 
Streams would be able to identify the second call to the {{ValueJoiner}} as the 
cause of incorrect results.




> Materialize Table-Table Join Result to Avoid Performing Same Join Twice
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-9916
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9916
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Bruno Cadonna
>            Priority: Major
>
> If a table-table join processor performs a join and the join needs to forward 
> downstream the old join result (e.g. due to an aggregation operation 
> downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
> Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} 
> with the same keys and input into the join operation in this order, the join 
> processor at some point will join {{L1}} with {{R1}}. When the new right 
> value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again 
> {{L1}} with {{R1}}.
> We could avoid calling the {{ValueJoiner}} twice by materializing the join 
> result. We would trade a call to the {{ValueJoiner}} with a lookup into a 
> state store. Depending on the logic in the {{ValueJoiner}} this may or may 
> not improve the performance. However, calling the {{ValueJoiner}} once will 
> only access the input values of the {{ValueJoiner}} once, which avoids the 
> need to copy the input values each time the {{ValueJoiner}} is called. For 
> example, consider the following {{ValueJoiner}}:
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
>         leftValue.setSomeValue(rightValue);
>         return leftValue;
> }
> {code}
> With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice 
> when {{R2}} trigger the join, the first time with {{R2}} and the second time 
> with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is 
> probably not what the users want. To get the correct result,  the 
> {{ValueJoiner}} should be implemented as follows:
>   
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
>         ComplexValue copy = copy(leftValue);
>         copy.setSomeValue(rightValue);
>         return copy;
> }
> {code}
> Copying values during joins could be avoided if the join result were 
> materialized. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to