Matthias J. Sax created KAFKA-18692:
---------------------------------------
Summary: Consider to unify KStreamImpl "repartitionRequired" with
GraphNode "keyChangingOperation"
Key: KAFKA-18692
URL: https://issues.apache.org/jira/browse/KAFKA-18692
Project: Kafka
Issue Type: Improvement
Components: streams
Reporter: Matthias J. Sax
In `KStreamImpl`, we use a flag `repartitionRequired` which allows us to track
if an operator (or one of its transitive ancestors) did modify the key, so if a
key-dependent operation like aggregation or join is executed, we know if we
need to insert an auto-repartition step.
In parallel, we also track key-changing operations in our internal "topology
graph" representation (which is use by our topology optimization layer), via
GraphNode#keyChangingOperation().
Thus, we have two independent code path which do a similar thing (note, both
are semantically not exactly the same thing, so we need to be careful to get
this right; more details below). To avoid subtle bugs, it might be worth to
refactor the code, and to unify both.
The high level idea would be (without me looking into details) to remove
`KStreamImpl#repartitionRequired` flag all together, as we already pass in a
`GraphNode` into `KStreamImpl` and thus can access `isKeyChangingOperation()`.
However, `isKeyChangingOperation()` only tracks if the _current_ node is
key-changing, and if it returns `false` we don't know anything about it's
ancestors. Thus, semantics are different to `repartitionRequired` flag, which
already considers ancestor information. Hence, we might need to traverse the
`GraphNode` structure backwards, to verify if a parent did change the key or
not (cf `InternalStreamsBuilder#getKeyChangingParentNode()` that I believe we
could re-use).
Another thing to consider is, that some operators like `repartition()` explicit
reset `repartitionRequired=false` in a hard-coded way. However,
`keyChangingOperation=false` does not carry this information – we don't know if
the current operator just does not touch the key, or if the current operator
_ensures_ that we are partitioned by the current key. Ie, we need a new way to
track this information on `GraphNode` to have a way to stop/break the backward
traversal if we hit such a node which does this "reset".
Overall, I believe it would be good to do this rewrite (based on an educated
guess...), but it might also turn out (after some POC coding) that it's not a
good idea and it's not worth to do as could add more problems as it solves, and
we might also just need to close this ticket as "won't fix". Only a PR could
tell for sure.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)