[ https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176224#comment-17176224 ]
feyman commented on KAFKA-10307: -------------------------------- [~vvcephei] Thanks for letting me know that we can have cycles. :) Thanks [~bchen225242] [~bbejeck] for the help ! But after playing with the test *shouldInnerJoinMultiPartitionQueryable* for a while, I found that the current impl of StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions seems problematic, *my understanding of this problem* is that the topics in repartitionTopicMetadata should satisfy that if a topic is sinkTopic of certain sub-topology, then the numOfRepartition of this topic should be the maximum of all the numOfRepartitions of sourceTopics of this sub-topology, but it's conflict with the updated repartitionTopicMetadata by setRepartitionTopicMetadataNumberOfPartitions ,attached is the simple sketch that describes the conflict. Further more, another proof is that the calculation of setRepartitionTopicMetadataNumberOfPartitions is depending on the order of topicGroups, I tried tweaked the order of topicGroups and found it will yield a different result for repartitionTopicMetadata. If my understanding of the problem is correct, and it allows cycles, then to solve this problem, we should leverage some algorithm related to [strongly connected components|[https://www.google.com/search?q=strongly+connected+components&rlz=1C1CHBD_enSG860SG860&oq=strongly+connected&aqs=chrome.0.0j69i57j0l6.5477j1j7&sourceid=chrome&ie=UTF-8]] Looking forward to your thoughts! !repartition_calc.jpg|width=850,height=637! > Topology cycles in > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > ------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10307 > URL: https://issues.apache.org/jira/browse/KAFKA-10307 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0, 2.5.0, 2.6.0 > Reporter: Boyang Chen > Priority: Major > Attachments: repartition_calc.jpg > > > We have spotted a cycled topology for the foreign-key join test > *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug > in the algorithm or the test only. Used > [https://zz85.github.io/kafka-streams-viz/] to visualize: > {code:java} > Sub-topology: 0 > Source: KTABLE-SOURCE-0000000019 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020 > Source: KTABLE-SOURCE-0000000032 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000030-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033 > Source: KSTREAM-SOURCE-0000000001 (topics: [table1]) > --> KTABLE-SOURCE-0000000002 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020 (stores: > [table1-STATE-STORE-0000000000]) > --> KTABLE-FK-JOIN-OUTPUT-0000000021 > <-- KTABLE-SOURCE-0000000019 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033 (stores: > [INNER-store1]) > --> KTABLE-FK-JOIN-OUTPUT-0000000034 > <-- KTABLE-SOURCE-0000000032 > Processor: KTABLE-FK-JOIN-OUTPUT-0000000021 (stores: [INNER-store1]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020 > Processor: KTABLE-FK-JOIN-OUTPUT-0000000034 (stores: [INNER-store2]) > --> KTABLE-TOSTREAM-0000000035 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033 > Processor: KTABLE-SOURCE-0000000002 (stores: > [table1-STATE-STORE-0000000000]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010 > <-- KSTREAM-SOURCE-0000000001 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010 (stores: > []) > --> KTABLE-SINK-0000000011 > <-- KTABLE-SOURCE-0000000002 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023 (stores: > []) > --> KTABLE-SINK-0000000024 > <-- KTABLE-FK-JOIN-OUTPUT-0000000021 > Processor: KTABLE-TOSTREAM-0000000035 (stores: []) > --> KSTREAM-SINK-0000000036 > <-- KTABLE-FK-JOIN-OUTPUT-0000000034 > Sink: KSTREAM-SINK-0000000036 (topic: output-) > <-- KTABLE-TOSTREAM-0000000035 > Sink: KTABLE-SINK-0000000011 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010 > Sink: KTABLE-SINK-0000000024 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000022-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023 Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000004 (topics: [table2]) > --> KTABLE-SOURCE-0000000005 > Source: KTABLE-SOURCE-0000000012 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000013]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015 > <-- KTABLE-SOURCE-0000000012 > Processor: KTABLE-SOURCE-0000000005 (stores: > [table2-STATE-STORE-0000000003]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016 > <-- KSTREAM-SOURCE-0000000004 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015 (stores: > [table2-STATE-STORE-0000000003]) > --> KTABLE-SINK-0000000018 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000013]) > --> KTABLE-SINK-0000000018 > <-- KTABLE-SOURCE-0000000005 > Sink: KTABLE-SINK-0000000018 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016 Sub-topology: 2 > Source: KSTREAM-SOURCE-0000000007 (topics: [table3]) > --> KTABLE-SOURCE-0000000008 > Source: KTABLE-SOURCE-0000000025 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000022-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000026]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028 > <-- KTABLE-SOURCE-0000000025 > Processor: KTABLE-SOURCE-0000000008 (stores: > [table3-STATE-STORE-0000000006]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029 > <-- KSTREAM-SOURCE-0000000007 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028 (stores: > [table3-STATE-STORE-0000000006]) > --> KTABLE-SINK-0000000031 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000026]) > --> KTABLE-SINK-0000000031 > <-- KTABLE-SOURCE-0000000008 > Sink: KTABLE-SINK-0000000031 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000030-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)