[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maximilian Michels resolved FLINK-10566. ---------------------------------------- Resolution: Fixed Fix Version/s: 1.8.0 1.7.1 1.6.3 1.5.6 Backported the fix to the 1.5, 1.6 and the 1.7 release branches. > Flink Planning is exponential in the number of stages > ----------------------------------------------------- > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 1.5.4, 1.6.1, 1.7.0 > Reporter: Robert Bradshaw > Assignee: Maximilian Michels > Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.1, 1.8.0 > > Attachments: chart.png > > Time Spent: 10m > Remaining Estimate: 0h > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<String> input = env.fromElements("a", "b", "c"); > DataSet<String> stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet<String> analyze(DataSet<String> input, > DataSet<String> stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction<String, String>() { > @Override > public void open(Configuration parameters) throws Exception { > Collection<String> broadcastSet = > getRuntimeContext().getBroadcastVariable("stats"); > } > @Override > public String map(String value) throws Exception { > return value; > } > }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); > } > DataSet<String> branch = input > .map(s -> new Tuple2<Integer, String>(0, s + > ii)) > .groupBy(0) > .minBy(1) > .map(kv -> kv.f1); > if (stats == null) { > stats = branch; > } else { > stats = stats.union(branch); > } > } > return stats.map(s -> "(" + s + ").stats"); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)