Re: Side effect of DataStreamRel#translateToPlan
Hi Wangsan, the bahavior of DataStreamRel#translateToPlan is more or less intended. That's why you call `toAppendStream` on the table environment. Because you add your pipeline to the environment (from source to current operator). However, the explain() method should not cause those side-effects. Regards, Timo Am 21.08.18 um 17:29 schrieb wangsan: Hi Timo, I think this may not only affect explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that. I will open an issue late this day, if this is indeed a problem. Best, wangsan On Aug 21, 2018, at 10:16 PM, Timo Walther wrote: Hi, this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira? Thanks, Timo Am 21.08.18 um 15:04 schrieb wangsan: Hi all, I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly. Should we eliminate the side effect of DataStreamRel#translateToPlan ? Best, Wangsan appendix tenv.registerTableSource("test_source", sourceTable) val t = tenv.sqlQuery("SELECT * from test_source") println(tenv.explain(t)) println(tenv.explain(t)) implicit val typeInfo = TypeInformation.of(classOf[Row]) tenv.toAppendStream(t) println(tenv.explain(t)) We call explain three times, and the Physical Execution Plan are all diffrent. == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Source content : collect elements with CollectionInputFormat Stage 8 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 9 : Operator content : Map ship_strategy : FORWARD Stage 10 : Operator content : to: Row ship_strategy : FORWARD Stage 11 : Data Source content : collect elements with CollectionInputFormat Stage 12 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD
Re: Side effect of DataStreamRel#translateToPlan
Hi Timo, I think this may not only affect explain() method. Method DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode into DataStream or DataSet, we add desired operators in execution environment. By side effect, I mean that if we call DataStreamRel#translateToPlan on same RelNode several times, the same operators are added in execution environment more than once, but actually we need that for only one time. Correct me if I misunderstood that. I will open an issue late this day, if this is indeed a problem. Best, wangsan > On Aug 21, 2018, at 10:16 PM, Timo Walther wrote: > > Hi, > > this sounds like a bug to me. Maybe the explain() method is not implemented > correctly. Can you open an issue for it in Jira? > > Thanks, > Timo > > > Am 21.08.18 um 15:04 schrieb wangsan: >> Hi all, >> >> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that >> may cause the execution plan not as what we expected. Every time we call >> DataStreamRel#translateToPlan (in TableEnvirnment#explain, >> TableEnvirnment#writeToSink, etc), we add same operators in execution >> environment repeatedly. >> >> Should we eliminate the side effect of DataStreamRel#translateToPlan ? >> >> Best, Wangsan >> >> appendix >> >> tenv.registerTableSource("test_source", sourceTable) >> >> val t = tenv.sqlQuery("SELECT * from test_source") >> println(tenv.explain(t)) >> println(tenv.explain(t)) >> >> implicit val typeInfo = TypeInformation.of(classOf[Row]) >> tenv.toAppendStream(t) >> println(tenv.explain(t)) >> We call explain three times, and the Physical Execution Plan are all >> diffrent. >> >> == Abstract Syntax Tree == >> LogicalProject(f1=[$0], f2=[$1]) >> LogicalTableScan(table=[[test_source]]) >> >> == Optimized Logical Plan == >> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], >> source=[CsvTableSource(read fields: f1, f2)]) >> >> == Physical Execution Plan == >> Stage 1 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 2 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 3 : Operator >> content : Map >> ship_strategy : FORWARD >> >> >> == Abstract Syntax Tree == >> LogicalProject(f1=[$0], f2=[$1]) >> LogicalTableScan(table=[[test_source]]) >> >> == Optimized Logical Plan == >> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], >> source=[CsvTableSource(read fields: f1, f2)]) >> >> == Physical Execution Plan == >> Stage 1 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 2 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 3 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 4 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 5 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 6 : Operator >> content : Map >> ship_strategy : FORWARD >> >> >> == Abstract Syntax Tree == >> LogicalProject(f1=[$0], f2=[$1]) >> LogicalTableScan(table=[[test_source]]) >> >> == Optimized Logical Plan == >> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], >> source=[CsvTableSource(read fields: f1, f2)]) >> >> == Physical Execution Plan == >> Stage 1 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 2 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 3 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 4 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 5 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 6 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 7 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 8 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 9 : Operator >> content : Map >> ship_strategy : FORWARD >> >> Stage 10 : Operator >> content : to: Row >> ship_strategy : FORWARD >> >> Stage 11 : Data Source >> content : collect elements with CollectionInputFormat >> >> Stage 12 : Operator >> content : CsvTableSource(read fields: f1, f2) >> ship_strategy : FORWARD >> >> Stage 13 : Operator >> content : Map >> ship_strategy : FORWARD >> >>
Re: Side effect of DataStreamRel#translateToPlan
Hi, this sounds like a bug to me. Maybe the explain() method is not implemented correctly. Can you open an issue for it in Jira? Thanks, Timo Am 21.08.18 um 15:04 schrieb wangsan: Hi all, I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that may cause the execution plan not as what we expected. Every time we call DataStreamRel#translateToPlan (in TableEnvirnment#explain, TableEnvirnment#writeToSink, etc), we add same operators in execution environment repeatedly. Should we eliminate the side effect of DataStreamRel#translateToPlan ? Best, Wangsan appendix tenv.registerTableSource("test_source", sourceTable) val t = tenv.sqlQuery("SELECT * from test_source") println(tenv.explain(t)) println(tenv.explain(t)) implicit val typeInfo = TypeInformation.of(classOf[Row]) tenv.toAppendStream(t) println(tenv.explain(t)) We call explain three times, and the Physical Execution Plan are all diffrent. == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD == Abstract Syntax Tree == LogicalProject(f1=[$0], f2=[$1]) LogicalTableScan(table=[[test_source]]) == Optimized Logical Plan == StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], source=[CsvTableSource(read fields: f1, f2)]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 3 : Operator content : Map ship_strategy : FORWARD Stage 4 : Data Source content : collect elements with CollectionInputFormat Stage 5 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 6 : Operator content : Map ship_strategy : FORWARD Stage 7 : Data Source content : collect elements with CollectionInputFormat Stage 8 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 9 : Operator content : Map ship_strategy : FORWARD Stage 10 : Operator content : to: Row ship_strategy : FORWARD Stage 11 : Data Source content : collect elements with CollectionInputFormat Stage 12 : Operator content : CsvTableSource(read fields: f1, f2) ship_strategy : FORWARD Stage 13 : Operator content : Map ship_strategy : FORWARD