Hi Wangsan,
the bahavior of DataStreamRel#translateToPlan is more or less intended.
That's why you call `toAppendStream` on the table environment. Because
you add your pipeline to the environment (from source to current operator).
However, the explain() method should not cause those side-effects.
Regards,
Timo
Am 21.08.18 um 17:29 schrieb wangsan:
Hi Timo,
I think this may not only affect explain() method. Method
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode
into DataStream or DataSet, we add desired operators in execution environment.
By side effect, I mean that if we call DataStreamRel#translateToPlan on same
RelNode several times, the same operators are added in execution environment
more than once, but actually we need that for only one time. Correct me if I
misunderstood that.
I will open an issue late this day, if this is indeed a problem.
Best,
wangsan
On Aug 21, 2018, at 10:16 PM, Timo Walther <twal...@apache.org> wrote:
Hi,
this sounds like a bug to me. Maybe the explain() method is not implemented
correctly. Can you open an issue for it in Jira?
Thanks,
Timo
Am 21.08.18 um 15:04 schrieb wangsan:
Hi all,
I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that
may cause the execution plan not as what we expected. Every time we call
DataStreamRel#translateToPlan (in TableEnvirnment#explain,
TableEnvirnment#writeToSink, etc), we add same operators in execution
environment repeatedly.
Should we eliminate the side effect of DataStreamRel#translateToPlan ?
Best, Wangsan
appendix
tenv.registerTableSource("test_source", sourceTable)
val t = tenv.sqlQuery("SELECT * from test_source")
println(tenv.explain(t))
println(tenv.explain(t))
implicit val typeInfo = TypeInformation.of(classOf[Row])
tenv.toAppendStream(t)
println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.
== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
source=[CsvTableSource(read fields: f1, f2)])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
source=[CsvTableSource(read fields: f1, f2)])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
LogicalTableScan(table=[[test_source]])
== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2],
source=[CsvTableSource(read fields: f1, f2)])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Source
content : collect elements with CollectionInputFormat
Stage 8 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 9 : Operator
content : Map
ship_strategy : FORWARD
Stage 10 : Operator
content : to: Row
ship_strategy : FORWARD
Stage 11 : Data Source
content : collect elements with CollectionInputFormat
Stage 12 : Operator
content : CsvTableSource(read fields: f1, f2)
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD