[
https://issues.apache.org/jira/browse/KAFKA-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16759019#comment-16759019
]
Lee Dongjin commented on KAFKA-7293:
------------------------------------
[~mjsax] So... in short, if all input {{KStreams}} given to {{KStreams#merge}}
method are not flagged with {{repartitionRequired}}, the output {{KStream}}
(with {{repartitionRequired = false}}) may not correctly be partitioned so it
can yield an incorrect result if {{groupByKey}} or {{join}} method is called
upon it. Am I understand correctly?
If so, I have an idea:
1. Add a flag to denote 'a runtime inspection required, whether data is
correctly (co-)partitioned or not.' (like {{repartitionRequired}} flag.)
2. When {{merge}} method is called with non-repartitioned {{KStream}}s, set
the flag above.
3. In {{KStream#[join, groupByKey, ...]}}, call
{{THIS.ensureJoinableWith(THAT)}} to inspect co-partitioning if the flag above
is set. (This method is called in {{KStream#[doJoin, doStreamTableJoin]}} now.)
If it is what you want, I would like to open a PR.
> Merge followed by groupByKey/join might violate co-partioning
> -------------------------------------------------------------
>
> Key: KAFKA-7293
> URL: https://issues.apache.org/jira/browse/KAFKA-7293
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Priority: Major
>
> The merge() operations can be applied to input KStreams that have a different
> number of tasks (ie, input topic partitions). For this case, the input topics
> are not co-partitioned and thus the result KStream is not partitioned even if
> each input KStream is partitioned by its own.
> Because, no "repartitionRequired" flag is set on the input KStreams, the flag
> is also not set on the output KStream. Hence, if a groupByKey() or join()
> operation is applied the output KStream, we don't insert a repartition topic.
> However, repartitioning would be required because the KStream is not
> partitioned.
> We cannot detect this during compile time, because the number or partitions
> is unknown, and thus, we cannot decide if repartitioning is required or not.
> However, we can add a runtime check similar to joins() that checks if data is
> correctly (co-)partitioned and if not, we can raise a runtime exception.
> Note, for merge() in contrast to join(), we should only check for
> co-partitioning, if the merge() is followed by a groupByKey() or join()
> operations.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)