Hi all, I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly.
Should we eliminate the side effect of DataStreamRel#translateToPlan ? Best, Wangsan appendix tenv.registerTableSource("test_source", sourceTable) val t = tenv.sqlQuery("SELECT * from test_source") println(tenv.explain(t)) println(tenv.explain(t)) implicit val typeInfo = TypeInformation.of(classOf[Row]) tenv.toAppendStream(t) println(tenv.explain(t)) We call explain three times, and the Physical Execution Plan are all diffrent. == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Source content : collect elements with CollectionInputFormat Stage 8 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 9 : Operator content : Map ship_strategy : FORWARD Stage 10 : Operator content : to: Row ship_strategy : FORWARD Stage 11 : Data Source content : collect elements with CollectionInputFormat Stage 12 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD