[ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176224#comment-17176224
 ] 

feyman commented on KAFKA-10307:
--------------------------------

[~vvcephei] Thanks for letting me know that we can have cycles. :)

Thanks [~bchen225242] [~bbejeck] for the help !

But after playing with the test *shouldInnerJoinMultiPartitionQueryable* for a 
while, I found that the current impl of 
StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions seems 
problematic, *my understanding of this problem* is that  the topics in 
repartitionTopicMetadata should satisfy that if a topic is sinkTopic of certain 
sub-topology, then the numOfRepartition of this topic should be the maximum of 
all the numOfRepartitions of sourceTopics of this sub-topology, but it's 
conflict with the updated repartitionTopicMetadata by  
setRepartitionTopicMetadataNumberOfPartitions ,attached is the simple sketch 
that describes the conflict.

Further more, another proof is that the calculation of  
setRepartitionTopicMetadataNumberOfPartitions is depending on the order of 
topicGroups, I tried tweaked the order of topicGroups and found it will yield a 
different result for repartitionTopicMetadata.

 

If my understanding of the problem is correct, and it allows cycles, then to 
solve this problem,  we should leverage some algorithm related to [strongly 
connected 
components|[https://www.google.com/search?q=strongly+connected+components&rlz=1C1CHBD_enSG860SG860&oq=strongly+connected&aqs=chrome.0.0j69i57j0l6.5477j1j7&sourceid=chrome&ie=UTF-8]]
 

 

Looking forward to your thoughts!

 

!repartition_calc.jpg|width=850,height=637!

 

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10307
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10307
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0, 2.5.0, 2.6.0
>            Reporter: Boyang Chen
>            Priority: Major
>         Attachments: repartition_calc.jpg
>
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
>     Source: KTABLE-SOURCE-0000000019 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020
>     Source: KTABLE-SOURCE-0000000032 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000030-topic])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033
>     Source: KSTREAM-SOURCE-0000000001 (topics: [table1])
>       --> KTABLE-SOURCE-0000000002
>     Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020 (stores: 
> [table1-STATE-STORE-0000000000])
>       --> KTABLE-FK-JOIN-OUTPUT-0000000021
>       <-- KTABLE-SOURCE-0000000019
>     Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033 (stores: 
> [INNER-store1])
>       --> KTABLE-FK-JOIN-OUTPUT-0000000034
>       <-- KTABLE-SOURCE-0000000032
>     Processor: KTABLE-FK-JOIN-OUTPUT-0000000021 (stores: [INNER-store1])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000020
>     Processor: KTABLE-FK-JOIN-OUTPUT-0000000034 (stores: [INNER-store2])
>       --> KTABLE-TOSTREAM-0000000035
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-0000000033
>     Processor: KTABLE-SOURCE-0000000002 (stores: 
> [table1-STATE-STORE-0000000000])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010
>       <-- KSTREAM-SOURCE-0000000001
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010 (stores: 
> [])
>       --> KTABLE-SINK-0000000011
>       <-- KTABLE-SOURCE-0000000002
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023 (stores: 
> [])
>       --> KTABLE-SINK-0000000024
>       <-- KTABLE-FK-JOIN-OUTPUT-0000000021
>     Processor: KTABLE-TOSTREAM-0000000035 (stores: [])
>       --> KSTREAM-SINK-0000000036
>       <-- KTABLE-FK-JOIN-OUTPUT-0000000034
>     Sink: KSTREAM-SINK-0000000036 (topic: output-)
>       <-- KTABLE-TOSTREAM-0000000035
>     Sink: KTABLE-SINK-0000000011 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic)
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010
>     Sink: KTABLE-SINK-0000000024 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000022-topic)
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000023  Sub-topology: 1
>     Source: KSTREAM-SOURCE-0000000004 (topics: [table2])
>       --> KTABLE-SOURCE-0000000005
>     Source: KTABLE-SOURCE-0000000012 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000009-topic])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000013])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015
>       <-- KTABLE-SOURCE-0000000012
>     Processor: KTABLE-SOURCE-0000000005 (stores: 
> [table2-STATE-STORE-0000000003])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016
>       <-- KSTREAM-SOURCE-0000000004
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015 (stores: 
> [table2-STATE-STORE-0000000003])
>       --> KTABLE-SINK-0000000018
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000014
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000013])
>       --> KTABLE-SINK-0000000018
>       <-- KTABLE-SOURCE-0000000005
>     Sink: KTABLE-SINK-0000000018 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000017-topic)
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000015, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000016  Sub-topology: 2
>     Source: KSTREAM-SOURCE-0000000007 (topics: [table3])
>       --> KTABLE-SOURCE-0000000008
>     Source: KTABLE-SOURCE-0000000025 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000022-topic])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000026])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028
>       <-- KTABLE-SOURCE-0000000025
>     Processor: KTABLE-SOURCE-0000000008 (stores: 
> [table3-STATE-STORE-0000000006])
>       --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029
>       <-- KSTREAM-SOURCE-0000000007
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028 (stores: 
> [table3-STATE-STORE-0000000006])
>       --> KTABLE-SINK-0000000031
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000027
>     Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000026])
>       --> KTABLE-SINK-0000000031
>       <-- KTABLE-SOURCE-0000000008
>     Sink: KTABLE-SINK-0000000031 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000030-topic)
>       <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000028, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-0000000029
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to