[ https://issues.apache.org/jira/browse/KAFKA-9916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091963#comment-17091963 ]
Guozhang Wang edited comment on KAFKA-9916 at 4/24/20, 11:15 PM: ----------------------------------------------------------------- Thanks [~cadonna] for filing the JIRA! I'd like to leave some more thoughts on this: I think the key here is not to always materialize the join-results, but only materialize when the downstream operators requires its parent to "sendOldValues". On the high-level argument, today we may propagate "sendOldValues" a long way to parent and ancestor operators until materialization is introduced. For example, if there's a topology: {{A -> B -> C -> D}} where D requires its parent C to sendOldValues, this may be traced back to B and A, and then eventually requiring A to "materialize", and then sendOldValues to B, then to C, then to D. Instead of doing that, we should consider either materializing {{C}} directly and cut the trace from {{A -> B -> ..}}, or even just let D to materialize itself to avoid requiring its direct parent {{C}} to sendOldValues. Today there are only the following scenarios that would require parents to send old values: 1. KTable-KTable left / outer joins: we want to avoid sending unnecessary tombstones and hence would need to know the old values. This is explained in KIP-77. 2. KTable aggregation: we would want to require old values to be sent so that we can subtract the old values. 3. KTable foreign-key join: we need both parents to send their old values for optimization / correctness purposes. --------------------------- The idea is that: For 1, instead of requiring its parent to sendOldValues, we just materialize the joined result and hence avoid the second `joiner.apply(... oldValue)` (and hence can help fixing the issue described in this JIRA ticket). Of course with chained joins this means we may materialize intermediate results and hence incur larger IO cost; I think this can be further address by supporting N-way joins more naturally than the chained manner. For 2, the direct parent of the aggregation would then just materialize and not propagate this `sendOldValue` to ancestors. I think may be we already did this today, but if not yet, then we should fix this. For 3, the left hand side sendOldValues are required for correctness, and we can consider doing the same as 2); the right hand side sendOldValues are for optimization and is not necessary anymore after KIP-557, and hence we can remove that requiring statement later. -------------------------------------- Also [~mjsax] pointed out to me that this issue of "modifying the field directly" may still exist in other operators. For example: {{stream.filter((k,v) -> { v.setA(“a”); return true: }).filter((k,v) -> …) }} The first filter would modify the object, and the second filter hence would not see the original object but the "modified" object, which then could possibly break the correctness. was (Author: guozhang): Thanks [~cadonna] for filing the JIRA! I'd like to leave some more thoughts on this: I think the key here is not to always materialize the join-results, but only materialize when the downstream operators requires its parent to "sendOldValues". On the high-level argument, today we may propagate "sendOldValues" a long way to parent and ancestor operators until materialization is introduced. For example, if there's a topology: {{A -> B -> C -> D}} where D requires its parent C to sendOldValues, this may be traced back to B and A, and then eventually requiring A to "materialize", and then sendOldValues to B, then to C, then to D. Instead of doing that, we should consider either materializing {{C}} directly and cut the trace from {{A -> B -> ..}}, or even just let D to materialize itself to avoid requiring its direct parent {{C}} to sendOldValues. Today there are only the following scenarios that would require parents to send old values: 1. KTable-KTable left / outer joins: we want to avoid sending unnecessary tombstones and hence would need to know the old values. This is explained in KIP-77. 2. KTable aggregation: we would want to require old values to be sent so that we can subtract the old values. 3. KTable foreign-key join: we need both parents to send their old values for optimization / correctness purposes. --------------------------- The idea is that: For 1, instead of requiring its parent to sendOldValues, we just materialize the joined result and hence avoid the second `joiner.apply(... oldValue)` (and hence can help fixing the issue described in this JIRA ticket). Of course with chained joins this means we may materialize intermediate results and hence incur larger IO cost; I think this can be further address by supporting N-way joins more naturally than the chained manner. For 2, the direct parent of the aggregation would then just materialize and not propagate this `sendOldValue` to ancestors. I think may be we already did this today, but if not yet, then we should fix this. For 3, the left hand side sendOldValues are required for correctness, and we can consider doing the same as 2); the right hand side sendOldValues are for optimization (and it seems we did not do this yet), and we should re-consider whether this is a good trade-off to make. -------------------------------------- Also [~mjsax] pointed out to me that this issue of "modifying the field directly" may still exist in other operators. For example: {{stream.filter((k,v) -> { v.setA(“a”); return true: }).filter((k,v) -> …) }} The first filter would modify the object, and the second filter hence would not see the original object but the "modified" object, which then could possibly break the correctness. > Materialize Table-Table Join Result to Avoid Performing Same Join Twice > ----------------------------------------------------------------------- > > Key: KAFKA-9916 > URL: https://issues.apache.org/jira/browse/KAFKA-9916 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.5.0 > Reporter: Bruno Cadonna > Priority: Major > > If a table-table join processor performs a join and the join needs to forward > downstream the old join result (e.g. due to an aggregation operation > downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice. > Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} > with the same keys and input into the join operation in this order, the join > processor at some point will join {{L1}} with {{R1}}. When the new right > value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again > {{L1}} with {{R1}}. > We could avoid calling the {{ValueJoiner}} twice by materializing the join > result. We would trade a call to the {{ValueJoiner}} with a lookup into a > state store. Depending on the logic in the {{ValueJoiner}} this may or may > not improve the performance. However, calling the {{ValueJoiner}} once will > only access the input values of the {{ValueJoiner}} once, which avoids the > need to copy the input values each time the {{ValueJoiner}} is called. For > example, consider the following {{ValueJoiner}}: > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > leftValue.setSomeValue(rightValue); > return leftValue; > } > {code} > With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice > when {{R2}} trigger the join, the first time with {{R2}} and the second time > with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is > probably not what the users want. To get the correct result, the > {{ValueJoiner}} should be implemented as follows: > > {code:java} > private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { > ComplexValue copy = copy(leftValue); > copy.setSomeValue(rightValue); > return copy; > } > {code} > Copying values during joins could be avoided if the join result were > materialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)