[ https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-12424: ------------------------------- Description: Currently, Flink planner will optimize the plan in {{writeToSink}} method. If there are more than one sink in a query, each sink-tree will be optimized independent and the result execution plans are also completely independent. Actually, there is a high probability of duplicate computing for a multiple-sinks query. This issue aims to resolve the above problem. The basic idea of the solution is as following: 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, just puts the plan into a collection. 2. whole plan optimization and execution: a new {{execute}} method is added in {{TableEnvironment}}, this method will trigger whole plan optimization and execute the job. The basic idea of dag (multiple-sinks query) optimization: 1. decompose the dag into different block, the leaf block is the common sub-plan 2. optimize each block from leaf block to root block, each block only needs to be optimized once e.g. {code:scala} val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not null) t2 where a1 = b2") tableEnv.registerTable("TempTable", table) val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70") tableEnv.writeToSink(table1, Sink1) val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70") tableEnv.writeToSink(table2, Sink2) {code} !image-2019-05-07-13-33-02-793.png! the above plan will be decomposed into 3 blocks, block1 is the input of block2 and block3. block2 and block3 will be optimized after block1 has been optimized. was: Currently, Flink planner will optimize the plan in {{writeToSink}} method. If there are more than one sink in a query, each sink-tree will be optimized independent and the result execution plans are also completely independent. Actually, there is a high probability of duplicate computing for a multiple-sinks query. This issue aims to resolve the above problem. The basic idea of the solution is as following: 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, just puts the plan into a collection. 2. whole plan optimization and execution: a new {{execute}} method is added in {{TableEnvironment}}, this method will trigger whole plan optimization and execute the job. The basic idea of dag (multiple-sinks query) optimization: 1. decompose the dag into different block, the leaf block is the common sub-plan 2. optimize each block from leaf block to root block, each block only needs to be optimized once e.g. {code:scala} val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not null) t2 where a1 = b2") tableEnv.registerTable("TempTable", table) val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70") tableEnv.writeToSink(table1, Sink1) val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70") tableEnv.writeToSink(table2, Sink2) {code} !image-2019-05-07-13-33-02-793.png! the above plan will be decomposed into 3 blocks, and block1 is the input of block2 and block3. block2 and block3 will be optimized after block1 has been optimized. > Supports dag (multiple-sinks query) optimization > ------------------------------------------------ > > Key: FLINK-12424 > URL: https://issues.apache.org/jira/browse/FLINK-12424 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner > Reporter: godfrey he > Assignee: godfrey he > Priority: Major > Attachments: image-2019-05-07-13-33-02-793.png > > > Currently, Flink planner will optimize the plan in {{writeToSink}} method. If > there are more than one sink in a query, each sink-tree will be optimized > independent and the result execution plans are also completely independent. > Actually, there is a high probability of duplicate computing for a > multiple-sinks query. This issue aims to resolve the above problem. > The basic idea of the solution is as following: > 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, > just puts the plan into a collection. > 2. whole plan optimization and execution: a new {{execute}} method is added > in {{TableEnvironment}}, this method will trigger whole plan optimization and > execute the job. > The basic idea of dag (multiple-sinks query) optimization: > 1. decompose the dag into different block, the leaf block is the common > sub-plan > 2. optimize each block from leaf block to root block, each block only needs > to be optimized once > e.g. > {code:scala} > val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from > MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not > null) t2 where a1 = b2") > tableEnv.registerTable("TempTable", table) > val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70") > tableEnv.writeToSink(table1, Sink1) > val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70") > tableEnv.writeToSink(table2, Sink2) > {code} > !image-2019-05-07-13-33-02-793.png! > the above plan will be decomposed into 3 blocks, block1 is the input of > block2 and block3. block2 and block3 will be optimized after block1 has been > optimized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)