Junrui Li created FLINK-35766:
---------------------------------

             Summary: When the job contains many YieldingOperatorFactory 
instances, compiling the JobGraph hangs
                 Key: FLINK-35766
                 URL: https://issues.apache.org/jira/browse/FLINK-35766
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
            Reporter: Junrui Li


When a job contains YieldingOperatorFactory instances, the time complexity of 
compiling the JobGraph is very high (with a complexity of O(N!)). This leads to 
the job compilation hanging on creating chains when there are many 
YieldingOperatorFactory instances (e.g., more than 30).

This is a very rare bug, but we have users who use SQL that contains many 
LookupJoins that use YieldingOperatorFactory in the production environment. A 
simple reproducible case is as follows:
{code:java}
@Test
void test() {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
        env.fromSource(
                        new NumberSequenceSource(0, 10), 
WatermarkStrategy.noWatermarks(), "input")
                .map((x) -> x)
                // add 32 YieldingOperatorFactory
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .transform(
                        "test", BasicTypeInfo.LONG_TYPE_INFO, new 
YieldingTestOperatorFactory<>())
                .addSink(new DiscardingSink<>());
env.getStreamGraph().getJobGraph();
} {code}
The reason is that there is no caching when determining edge chainable, leading 
to repeated backward traversal each time a YiedlingOperatorFactor is 
encountered onwards (see code: 
[https://github.com/apache/flink/blob/90fc679df073754b93eb5c220373daad7dca0a32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L1602]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to