Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread Timo Walther

Hi Wangsan,

the bahavior of DataStreamRel#translateToPlan is more or less intended. 
That's why you call `toAppendStream` on the table environment. Because 
you add your pipeline to the environment (from source to current operator).


However, the explain() method should not cause those side-effects.

Regards,
Timo

Am 21.08.18 um 17:29 schrieb wangsan:

Hi Timo,

I think this may not only affect  explain() method. Method 
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode 
into DataStream or DataSet, we add desired operators in execution environment. 
By side effect, I mean that if we call DataStreamRel#translateToPlan on same 
RelNode  several times, the same operators are added in execution environment 
more than once, but actually we need that for only one time. Correct me if I 
misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan




On Aug 21, 2018, at 10:16 PM, Timo Walther  wrote:

Hi,

this sounds like a bug to me. Maybe the explain() method is not implemented 
correctly. Can you open an issue for it in Jira?

Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:

Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

 tenv.registerTableSource("test_source", sourceTable)

 val t = tenv.sqlQuery("SELECT * from test_source")
 println(tenv.explain(t))
 println(tenv.explain(t))

 implicit val typeInfo = TypeInformation.of(classOf[Row])
 tenv.toAppendStream(t)
 println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 7 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 8 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 9 : Operator
 content : Map
 ship_strategy : FORWARD

 Stage 10 : Operator
 content : to: Row
 ship_strategy : FORWARD

Stage 11 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 12 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 13 : Operator
 content : Map
 ship_strategy : FORWARD








Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread wangsan
Hi Timo, 

I think this may not only affect  explain() method. Method 
DataStreamRel#translateToPlan is called when we need translate a FlinkRelNode 
into DataStream or DataSet, we add desired operators in execution environment. 
By side effect, I mean that if we call DataStreamRel#translateToPlan on same 
RelNode  several times, the same operators are added in execution environment 
more than once, but actually we need that for only one time. Correct me if I 
misunderstood that.

I will open an issue late this day, if this is indeed a problem.

Best,
wangsan



> On Aug 21, 2018, at 10:16 PM, Timo Walther  wrote:
> 
> Hi,
> 
> this sounds like a bug to me. Maybe the explain() method is not implemented 
> correctly. Can you open an issue for it in Jira?
> 
> Thanks,
> Timo
> 
> 
> Am 21.08.18 um 15:04 schrieb wangsan:
>> Hi all,
>> 
>> I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
>> may cause the execution plan not as what we expected. Every time we call 
>> DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
>> TableEnvirnment#writeToSink, etc), we add same operators in execution 
>> environment repeatedly.
>> 
>> Should we eliminate the side effect of DataStreamRel#translateToPlan ?
>> 
>> Best,  Wangsan
>> 
>> appendix
>> 
>> tenv.registerTableSource("test_source", sourceTable)
>> 
>> val t = tenv.sqlQuery("SELECT * from test_source")
>> println(tenv.explain(t))
>> println(tenv.explain(t))
>> 
>> implicit val typeInfo = TypeInformation.of(classOf[Row])
>> tenv.toAppendStream(t)
>> println(tenv.explain(t))
>> We call explain three times, and the Physical Execution Plan are all 
>> diffrent.
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 
>> == Abstract Syntax Tree ==
>> LogicalProject(f1=[$0], f2=[$1])
>>   LogicalTableScan(table=[[test_source]])
>> 
>> == Optimized Logical Plan ==
>> StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
>> source=[CsvTableSource(read fields: f1, f2)])
>> 
>> == Physical Execution Plan ==
>> Stage 1 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 2 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 3 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 4 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 5 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 6 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 7 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 8 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 9 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> Stage 10 : Operator
>> content : to: Row
>> ship_strategy : FORWARD
>> 
>> Stage 11 : Data Source
>> content : collect elements with CollectionInputFormat
>> 
>> Stage 12 : Operator
>> content : CsvTableSource(read fields: f1, f2)
>> ship_strategy : FORWARD
>> 
>> Stage 13 : Operator
>> content : Map
>> ship_strategy : FORWARD
>> 
>> 



Re: Side effect of DataStreamRel#translateToPlan

2018-08-21 Thread Timo Walther

Hi,

this sounds like a bug to me. Maybe the explain() method is not 
implemented correctly. Can you open an issue for it in Jira?


Thanks,
Timo


Am 21.08.18 um 15:04 schrieb wangsan:

Hi all,

I noticed that the DataStreamRel#translateToPlan is non-idempotent, and that 
may cause the execution plan not as what we expected. Every time we call 
DataStreamRel#translateToPlan (in TableEnvirnment#explain, 
TableEnvirnment#writeToSink, etc), we add same operators in execution 
environment repeatedly.

Should we eliminate the side effect of DataStreamRel#translateToPlan ?

Best,  Wangsan

appendix

 tenv.registerTableSource("test_source", sourceTable)

 val t = tenv.sqlQuery("SELECT * from test_source")
 println(tenv.explain(t))
 println(tenv.explain(t))

 implicit val typeInfo = TypeInformation.of(classOf[Row])
 tenv.toAppendStream(t)
 println(tenv.explain(t))
We call explain three times, and the Physical Execution Plan are all diffrent.

== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD


== Abstract Syntax Tree ==
LogicalProject(f1=[$0], f2=[$1])
   LogicalTableScan(table=[[test_source]])

== Optimized Logical Plan ==
StreamTableSourceScan(table=[[test_source]], fields=[f1, f2], 
source=[CsvTableSource(read fields: f1, f2)])

== Physical Execution Plan ==
Stage 1 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 2 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 3 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 4 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 5 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 6 : Operator
 content : Map
 ship_strategy : FORWARD

Stage 7 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 8 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 9 : Operator
 content : Map
 ship_strategy : FORWARD

 Stage 10 : Operator
 content : to: Row
 ship_strategy : FORWARD

Stage 11 : Data Source
 content : collect elements with CollectionInputFormat

 Stage 12 : Operator
 content : CsvTableSource(read fields: f1, f2)
 ship_strategy : FORWARD

 Stage 13 : Operator
 content : Map
 ship_strategy : FORWARD