Bruno Cadonna created KAFKA-9916:
------------------------------------
Summary: Materialize Table-Table Join Result to Avoid Performing
Same Join Twice
Key: KAFKA-9916
URL: https://issues.apache.org/jira/browse/KAFKA-9916
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 2.5.0
Reporter: Bruno Cadonna
If a table-table join processor performs a join and the join needs to forward
downstream the old join result (e.g. due to an aggregation operation
downstream), it performs the same join (i.e. calls the {{ValueJoiner}}) twice.
Given a left value {{L1}}, a right value {{R1}}, and a new right value {{R2}}
with the same keys and input into the join operation in this order, the join
processor at some point will join {{L1}} with {{R1}}. When the new right value
{{R2}} triggers the join, it will join {{L1}} with {{R2}} and again {{L1}} with
{{R1}}.
We could avoid calling the {{ValueJoiner}} twice by materializing the join
result. We would trade a call to the {{ValueJoiner}} with a lookup into a state
store. Depending on the logic in the {{ValueJoiner}} this may or may not
improve the performance. However, calling the {{ValueJoiner}} once will only
access the input values of the {{ValueJoiner}} once, which avoids the need to
copy the input values each time the {{ValueJoiner}} is called. For example,
consider the following {{ValueJoiner}}:
{code:java}
private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
leftValue.setSomeValue(rightValue);
return leftValue;
}
{code}
With this {{ValueJoiner}}, {{setSomeValue(rightValue)}} will be called twice
when {{R2}} trigger the join, the first time with {{R2}} and the second time
with {{R1}}. That means, {{R2}} will be overwritten by {{R1}}, which is
probably not what the users want. To get the correct result, the
{{ValueJoiner}} should be implemented as follows:
{code:java}
private ComplexValue eventFeesJoin(ComplexValue leftValue, Long rightValue) {
ComplexValue copy = copy(leftValue);
copy.setSomeValue(rightValue);
return copy;
}
{code}
Copying values during joins could be avoided if the join result were
materialized.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)