[
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176224#comment-17176224
]
feyman commented on KAFKA-10307:
--------------------------------
[~vvcephei] Thanks for letting me know that we can have cycles. :)
Thanks [~bchen225242] [~bbejeck] for the help !
But after playing with the test *shouldInnerJoinMultiPartitionQueryable* for a
while, I found that the current impl of
StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions seems
problematic, *my understanding of this problem* is that the topics in
repartitionTopicMetadata should satisfy that if a topic is sinkTopic of certain
sub-topology, then the numOfRepartition of this topic should be the maximum of
all the numOfRepartitions of sourceTopics of this sub-topology, but it's
conflict with the updated repartitionTopicMetadata by
setRepartitionTopicMetadataNumberOfPartitions ,attached is the simple sketch
that describes the conflict.
Further more, another proof is that the calculation of
setRepartitionTopicMetadataNumberOfPartitions is depending on the order of
topicGroups, I tried tweaked the order of topicGroups and found it will yield a
different result for repartitionTopicMetadata.
If my understanding of the problem is correct, and it allows cycles, then to
solve this problem, we should leverage some algorithm related to [strongly
connected
components|[https://www.google.com/search?q=strongly+connected+components&rlz=1C1CHBD_enSG860SG860&oq=strongly+connected&aqs=chrome.0.0j69i57j0l6.5477j1j7&sourceid=chrome&ie=UTF-8]]
Looking forward to your thoughts!
!repartition_calc.jpg|width=850,height=637!
> Topology cycles in
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.0, 2.5.0, 2.6.0
> Reporter: Boyang Chen
> Priority: Major
> Attachments: repartition_calc.jpg
>
>
> We have spotted a cycled topology for the foreign-key join test
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug
> in the algorithm or the test only. Used
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-0000000019 (topics:
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020
> Source: KTABLE-SOURCE-0000000032 (topics:
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000030-topic])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033
> Source: KSTREAM-SOURCE-0000000001 (topics: [table1])
> --> KTABLE-SOURCE-0000000002
> Processor:
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020 (stores:
> [table1-STATE-STORE-0000000000])
> --> KTABLE-FK-JOIN-OUTPUT-0000000021
> <-- KTABLE-SOURCE-0000000019
> Processor:
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033 (stores:
> [INNER-store1])
> --> KTABLE-FK-JOIN-OUTPUT-0000000034
> <-- KTABLE-SOURCE-0000000032
> Processor: KTABLE-FK-JOIN-OUTPUT-0000000021 (stores: [INNER-store1])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020
> Processor: KTABLE-FK-JOIN-OUTPUT-0000000034 (stores: [INNER-store2])
> --> KTABLE-TOSTREAM-0000000035
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033
> Processor: KTABLE-SOURCE-0000000002 (stores:
> [table1-STATE-STORE-0000000000])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010
> <-- KSTREAM-SOURCE-0000000001
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010 (stores:
> [])
> --> KTABLE-SINK-0000000011
> <-- KTABLE-SOURCE-0000000002
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023 (stores:
> [])
> --> KTABLE-SINK-0000000024
> <-- KTABLE-FK-JOIN-OUTPUT-0000000021
> Processor: KTABLE-TOSTREAM-0000000035 (stores: [])
> --> KSTREAM-SINK-0000000036
> <-- KTABLE-FK-JOIN-OUTPUT-0000000034
> Sink: KSTREAM-SINK-0000000036 (topic: output-)
> <-- KTABLE-TOSTREAM-0000000035
> Sink: KTABLE-SINK-0000000011 (topic:
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic)
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010
> Sink: KTABLE-SINK-0000000024 (topic:
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000022-topic)
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023 Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000004 (topics: [table2])
> --> KTABLE-SOURCE-0000000005
> Source: KTABLE-SOURCE-0000000012 (topics:
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014 (stores:
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000013])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015
> <-- KTABLE-SOURCE-0000000012
> Processor: KTABLE-SOURCE-0000000005 (stores:
> [table2-STATE-STORE-0000000003])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016
> <-- KSTREAM-SOURCE-0000000004
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015 (stores:
> [table2-STATE-STORE-0000000003])
> --> KTABLE-SINK-0000000018
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016 (stores:
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000013])
> --> KTABLE-SINK-0000000018
> <-- KTABLE-SOURCE-0000000005
> Sink: KTABLE-SINK-0000000018 (topic:
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic)
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015,
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016 Sub-topology: 2
> Source: KSTREAM-SOURCE-0000000007 (topics: [table3])
> --> KTABLE-SOURCE-0000000008
> Source: KTABLE-SOURCE-0000000025 (topics:
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000022-topic])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027 (stores:
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000026])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028
> <-- KTABLE-SOURCE-0000000025
> Processor: KTABLE-SOURCE-0000000008 (stores:
> [table3-STATE-STORE-0000000006])
> --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029
> <-- KSTREAM-SOURCE-0000000007
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028 (stores:
> [table3-STATE-STORE-0000000006])
> --> KTABLE-SINK-0000000031
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029 (stores:
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000026])
> --> KTABLE-SINK-0000000031
> <-- KTABLE-SOURCE-0000000008
> Sink: KTABLE-SINK-0000000031 (topic:
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000030-topic)
> <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028,
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)