Bruno Cadonna created KAFKA-9916: ------------------------------------ Summary: Materialize Table-Table Join Result to Avoid Performing Same Join Twice Key: KAFKA-9916 URL: https://issues.apache.org/jira/browse/KAFKA-9916 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.5.0 Reporter: Bruno Cadonna
If a table-table join processor performs a join and the join needs to forward downstream the old join result (e.g. due to an aggregation operation downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice. Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}} with the same keys and input into the join operation in this order, the join processor at some point will join {{L1}} with {{R1}}. When the new right value {{R2}} triggers the join, it will join {{L1}} with {{R2}} and again {{L1}} with {{R1}}. We could avoid calling the {{ValueJoiner}} twice by materializing the join result. We would trade a call to the {{ValueJoiner}} with a lookup into a state store. Depending on the logic in the {{ValueJoiner}} this may or may not improve the performance. However, calling the {{ValueJoiner}} once will only access the input values of the {{ValueJoiner}} once, which avoids the need to copy the input values each time the {{ValueJoiner}} is called. For example, consider the following {{ValueJoiner}}: {code:java} private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { leftValue.setSomeValue(rightValue); return leftValue; } {code} With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice when {{R2}} trigger the join, the first time with {{R2}} and the second time with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is probably not what the users want. To get the correct result, the {{ValueJoiner}} should be implemented as follows: {code:java} private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) { ComplexValue copy = copy(leftValue); copy.setSomeValue(rightValue); return copy; } {code} Copying values during joins could be avoided if the join result were materialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)