[ 
https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091963#comment-17091963
 ] 

Guozhang Wang edited comment on KAFKA-9916 at 4/24/20, 11:15 PM:
-----------------------------------------------------------------

Thanks [~cadonna] for filing the JIRA! I'd like to leave some more thoughts on 
this:

I think the key here is not to always materialize the join-results, but only 
materialize when the downstream operators requires its parent to 
"sendOldValues". On the high-level argument, today we may propagate 
"sendOldValues" a long way to parent and ancestor operators until 
materialization is introduced. For example, if there's a topology:

{{A -> B -> C -> D}}

where D requires its parent C to sendOldValues, this may be traced back to B 
and A, and then eventually requiring A to "materialize", and then sendOldValues 
to B, then to C, then to D. Instead of doing that, we should consider either 
materializing {{C}} directly and cut the trace from {{A -> B -> ..}}, or even 
just let D to materialize itself to avoid requiring its direct parent {{C}} to 
sendOldValues.

Today there are only the following scenarios that would require parents to send 
old values:

1. KTable-KTable left / outer joins: we want to avoid sending unnecessary 
tombstones and hence would need to know the old values. This is explained in 
KIP-77.
2. KTable aggregation: we would want to require old values to be sent so that 
we can subtract the old values.
3. KTable foreign-key join: we need both parents to send their old values for 
optimization / correctness purposes.

---------------------------

The idea is that:

For 1, instead of requiring its parent to sendOldValues, we just materialize 
the joined result and hence avoid the second `joiner.apply(... oldValue)` (and 
hence can help fixing the issue described in this JIRA ticket). Of course with 
chained joins this means we may materialize intermediate results and hence 
incur larger IO cost; I think this can be further address by supporting N-way 
joins more naturally than the chained manner.

For 2, the direct parent of the aggregation would then just materialize and not 
propagate this `sendOldValue` to ancestors. I think may be we already did this 
today, but if not yet, then we should fix this.

For 3, the left hand side sendOldValues are required for correctness, and we 
can consider doing the same as 2); the right hand side sendOldValues are for 
optimization and is not necessary anymore after KIP-557, and hence we can 
remove that requiring statement later.

--------------------------------------

Also [~mjsax] pointed out to me that this issue of "modifying the field 
directly" may still exist in other operators. For example:

{{stream.filter((k,v) -> { v.setA(“a”); return true: }).filter((k,v) -> …)
}}

The first filter would modify the object, and the second filter hence would not 
see the original object but the "modified" object, which then could possibly 
break the correctness.




was (Author: guozhang):
Thanks [~cadonna] for filing the JIRA! I'd like to leave some more thoughts on 
this:

I think the key here is not to always materialize the join-results, but only 
materialize when the downstream operators requires its parent to 
"sendOldValues". On the high-level argument, today we may propagate 
"sendOldValues" a long way to parent and ancestor operators until 
materialization is introduced. For example, if there's a topology:

{{A -> B -> C -> D}}

where D requires its parent C to sendOldValues, this may be traced back to B 
and A, and then eventually requiring A to "materialize", and then sendOldValues 
to B, then to C, then to D. Instead of doing that, we should consider either 
materializing {{C}} directly and cut the trace from {{A -> B -> ..}}, or even 
just let D to materialize itself to avoid requiring its direct parent {{C}} to 
sendOldValues.

Today there are only the following scenarios that would require parents to send 
old values:

1. KTable-KTable left / outer joins: we want to avoid sending unnecessary 
tombstones and hence would need to know the old values. This is explained in 
KIP-77.
2. KTable aggregation: we would want to require old values to be sent so that 
we can subtract the old values.
3. KTable foreign-key join: we need both parents to send their old values for 
optimization / correctness purposes.

---------------------------

The idea is that:

For 1, instead of requiring its parent to sendOldValues, we just materialize 
the joined result and hence avoid the second `joiner.apply(... oldValue)` (and 
hence can help fixing the issue described in this JIRA ticket). Of course with 
chained joins this means we may materialize intermediate results and hence 
incur larger IO cost; I think this can be further address by supporting N-way 
joins more naturally than the chained manner.

For 2, the direct parent of the aggregation would then just materialize and not 
propagate this `sendOldValue` to ancestors. I think may be we already did this 
today, but if not yet, then we should fix this.

For 3, the left hand side sendOldValues are required for correctness, and we 
can consider doing the same as 2); the right hand side sendOldValues are for 
optimization (and it seems we did not do this yet), and we should re-consider 
whether this is a good trade-off to make.

--------------------------------------

Also [~mjsax] pointed out to me that this issue of "modifying the field 
directly" may still exist in other operators. For example:

{{stream.filter((k,v) -> { v.setA(“a”); return true: }).filter((k,v) -> …)
}}

The first filter would modify the object, and the second filter hence would not 
see the original object but the "modified" object, which then could possibly 
break the correctness.



> Materialize Table-Table Join Result to Avoid Performing Same Join Twice
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-9916
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9916
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Bruno Cadonna
>            Priority: Major
>
> If a table-table join processor performs a join and the join needs to forward 
> downstream the old join result (e.g. due to an aggregation operation 
> downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
> Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} 
> with the same keys and input into the join operation in this order, the join 
> processor at some point will join {{L1}} with {{R1}}. When the new right 
> value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again 
> {{L1}} with {{R1}}.
> We could avoid calling the {{ValueJoiner}} twice by materializing the join 
> result. We would trade a call to the {{ValueJoiner}} with a lookup into a 
> state store. Depending on the logic in the {{ValueJoiner}} this may or may 
> not improve the performance. However, calling the {{ValueJoiner}} once will 
> only access the input values of the {{ValueJoiner}} once, which avoids the 
> need to copy the input values each time the {{ValueJoiner}} is called. For 
> example, consider the following {{ValueJoiner}}:
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
>         leftValue.setSomeValue(rightValue);
>         return leftValue;
> }
> {code}
> With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice 
> when {{R2}} trigger the join, the first time with {{R2}} and the second time 
> with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is 
> probably not what the users want. To get the correct result,  the 
> {{ValueJoiner}} should be implemented as follows:
>   
> {code:java}
> private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
>         ComplexValue copy = copy(leftValue);
>         copy.setSomeValue(rightValue);
>         return copy;
> }
> {code}
> Copying values during joins could be avoided if the join result were 
> materialized. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to