Re: Spark 1.6 Catalyst optimizer

2016-05-12 Thread Telmo Rodrigues
Thank you Takeshi.

After executing df3.explain(true) I realised that the Optimiser batches are
being performed and also the predicate push down.

I think that only the analiser batches are executed when creating the data
frame by the context.sql(query). It seems that the optimiser batches are
executed when some action like collect or explain takes place.

scala> d3.explain(true)
16/05/13 02:08:12 DEBUG Analyzer$ResolveReferences: Resolving 't.id to id#0L
16/05/13 02:08:12 DEBUG Analyzer$ResolveReferences: Resolving 'u.id to id#1L
16/05/13 02:08:12 DEBUG Analyzer$ResolveReferences: Resolving 't.id to id#0L
16/05/13 02:08:12 DEBUG SQLContext$$anon$1:
== Parsed Logical Plan ==
'Project [unresolvedalias(*)]
+- 'Filter ('t.id = 1)
   +- 'Join Inner, Some(('t.id = 'u.id))
  :- 'UnresolvedRelation `t`, None
  +- 'UnresolvedRelation `u`, None

== Analyzed Logical Plan ==
id: bigint, id: bigint
Project [id#0L,id#1L]
+- Filter (id#0L = cast(1 as bigint))
   +- Join Inner, Some((id#0L = id#1L))
  :- Subquery t
  :  +- Relation[id#0L] JSONRelation
  +- Subquery u
 +- Relation[id#1L] JSONRelation

== Optimized Logical Plan ==
Project [id#0L,id#1L]
+- Join Inner, Some((id#0L = id#1L))
   :- Filter (id#0L = 1)
   :  +- Relation[id#0L] JSONRelation
   +- Relation[id#1L] JSONRelation

== Physical Plan ==
Project [id#0L,id#1L]
+- BroadcastHashJoin [id#0L], [id#1L], BuildRight
   :- Filter (id#0L = 1)
   :  +- Scan JSONRelation[id#0L] InputPaths: file:/persons.json,
PushedFilters: [EqualTo(id,1)]
   +- Scan JSONRelation[id#1L] InputPaths: file:/cars.json


2016-05-12 16:34 GMT+01:00 Takeshi Yamamuro :

> Hi,
>
> What's the result of `df3.explain(true)`?
>
> // maropu
>
> On Thu, May 12, 2016 at 10:04 AM, Telmo Rodrigues <
> telmo.galante.rodrig...@gmail.com> wrote:
>
>> I'm building spark from branch-1.6 source with mvn -DskipTests package
>> and I'm running the following code with spark shell.
>>
>> *val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)
>>
>> *import* *sqlContext.implicits._*
>>
>>
>> *val df = sqlContext.read.json("persons.json")*
>>
>> *val df2 = sqlContext.read.json("cars.json")*
>>
>>
>> *df.registerTempTable("t")*
>>
>> *df2.registerTempTable("u")*
>>
>>
>> *val d3 =sqlContext.sql("select * from t join u on t.id  =
>> u.id  where t.id  = 1")*
>>
>> With the log4j root category level WARN, the last printed messages refers
>> to the Batch Resolution rules execution.
>>
>> === Result of Batch Resolution ===
>> !'Project [unresolvedalias(*)]  Project [id#0L,id#1L]
>> !+- 'Filter ('t.id = 1) +- Filter (id#0L = cast(1 as
>> bigint))
>> !   +- 'Join Inner, Some(('t.id = 'u.id))  +- Join Inner,
>> Some((id#0L = id#1L))
>> !  :- 'UnresolvedRelation `t`, None   :- Subquery t
>> !  +- 'UnresolvedRelation `u`, None   :  +- Relation[id#0L]
>> JSONRelation
>> ! +- Subquery u
>> !+- Relation[id#1L]
>> JSONRelation
>>
>>
>> I think that only the analyser rules are being executed.
>>
>> The optimiser rules should not to run in this case?
>>
>> 2016-05-11 19:24 GMT+01:00 Michael Armbrust :
>>
>>>
 logical plan after optimizer execution:

 Project [id#0L,id#1L]
 !+- Filter (id#0L = cast(1 as bigint))
 !   +- Join Inner, Some((id#0L = id#1L))
 !  :- Subquery t
 !  :  +- Relation[id#0L] JSONRelation
 !  +- Subquery u
 !  +- Relation[id#1L] JSONRelation

>>>
>>> I think you are mistaken.  If this was the optimized plan there would be
>>> no subqueries.
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark 1.6 Catalyst optimizer

2016-05-12 Thread Takeshi Yamamuro
Hi,

What's the result of `df3.explain(true)`?

// maropu

On Thu, May 12, 2016 at 10:04 AM, Telmo Rodrigues <
telmo.galante.rodrig...@gmail.com> wrote:

> I'm building spark from branch-1.6 source with mvn -DskipTests package and
> I'm running the following code with spark shell.
>
> *val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)
>
> *import* *sqlContext.implicits._*
>
>
> *val df = sqlContext.read.json("persons.json")*
>
> *val df2 = sqlContext.read.json("cars.json")*
>
>
> *df.registerTempTable("t")*
>
> *df2.registerTempTable("u")*
>
>
> *val d3 =sqlContext.sql("select * from t join u on t.id  =
> u.id  where t.id  = 1")*
>
> With the log4j root category level WARN, the last printed messages refers
> to the Batch Resolution rules execution.
>
> === Result of Batch Resolution ===
> !'Project [unresolvedalias(*)]  Project [id#0L,id#1L]
> !+- 'Filter ('t.id = 1) +- Filter (id#0L = cast(1 as
> bigint))
> !   +- 'Join Inner, Some(('t.id = 'u.id))  +- Join Inner, Some((id#0L
> = id#1L))
> !  :- 'UnresolvedRelation `t`, None   :- Subquery t
> !  +- 'UnresolvedRelation `u`, None   :  +- Relation[id#0L]
> JSONRelation
> ! +- Subquery u
> !+- Relation[id#1L]
> JSONRelation
>
>
> I think that only the analyser rules are being executed.
>
> The optimiser rules should not to run in this case?
>
> 2016-05-11 19:24 GMT+01:00 Michael Armbrust :
>
>>
>>> logical plan after optimizer execution:
>>>
>>> Project [id#0L,id#1L]
>>> !+- Filter (id#0L = cast(1 as bigint))
>>> !   +- Join Inner, Some((id#0L = id#1L))
>>> !  :- Subquery t
>>> !  :  +- Relation[id#0L] JSONRelation
>>> !  +- Subquery u
>>> !  +- Relation[id#1L] JSONRelation
>>>
>>
>> I think you are mistaken.  If this was the optimized plan there would be
>> no subqueries.
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Telmo Rodrigues
I'm building spark from branch-1.6 source with mvn -DskipTests package and
I'm running the following code with spark shell.

*val* sqlContext *=* *new* org.apache.spark.sql.*SQLContext*(sc)

*import* *sqlContext.implicits._*


*val df = sqlContext.read.json("persons.json")*

*val df2 = sqlContext.read.json("cars.json")*


*df.registerTempTable("t")*

*df2.registerTempTable("u")*


*val d3 =sqlContext.sql("select * from t join u on t.id  =
u.id  where t.id  = 1")*

With the log4j root category level WARN, the last printed messages refers
to the Batch Resolution rules execution.

=== Result of Batch Resolution ===
!'Project [unresolvedalias(*)]  Project [id#0L,id#1L]
!+- 'Filter ('t.id = 1) +- Filter (id#0L = cast(1 as
bigint))
!   +- 'Join Inner, Some(('t.id = 'u.id))  +- Join Inner, Some((id#0L =
id#1L))
!  :- 'UnresolvedRelation `t`, None   :- Subquery t
!  +- 'UnresolvedRelation `u`, None   :  +- Relation[id#0L]
JSONRelation
! +- Subquery u
!+- Relation[id#1L]
JSONRelation


I think that only the analyser rules are being executed.

The optimiser rules should not to run in this case?

2016-05-11 19:24 GMT+01:00 Michael Armbrust :

>
>> logical plan after optimizer execution:
>>
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !  :- Subquery t
>> !  :  +- Relation[id#0L] JSONRelation
>> !  +- Subquery u
>> !  +- Relation[id#1L] JSONRelation
>>
>
> I think you are mistaken.  If this was the optimized plan there would be
> no subqueries.
>


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Michael Armbrust
>
>
> logical plan after optimizer execution:
>
> Project [id#0L,id#1L]
> !+- Filter (id#0L = cast(1 as bigint))
> !   +- Join Inner, Some((id#0L = id#1L))
> !  :- Subquery t
> !  :  +- Relation[id#0L] JSONRelation
> !  +- Subquery u
> !  +- Relation[id#1L] JSONRelation
>

I think you are mistaken.  If this was the optimized plan there would be no
subqueries.


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Rishi Mishra
Will try with JSON relation, but with Spark's temp tables (Spark version
1.6 ) I get an optimized plan as you have mentioned. Should not be much
different though.

Query : "select t1.col2, t1.col3 from t1, t2 where t1.col1=t2.col1 and
t1.col3=7"

Plan :

Project [COL2#1,COL3#2]
+- Join Inner, Some((COL1#0 = COL1#3))
   :- Filter (COL3#2 = 7)
   :  +- LogicalRDD [col1#0,col2#1,col3#2], MapPartitionsRDD[4] at apply at
Transformer.scala:22
   +- Project [COL1#3]
  +- LogicalRDD [col1#3,col2#4,col3#5], MapPartitionsRDD[5] at apply at
Transformer.scala:22

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Wed, May 11, 2016 at 4:56 PM, Telmo Rodrigues <
telmo.galante.rodrig...@gmail.com> wrote:

> In this case, isn't better to perform the filter earlier as possible even
> there could be unhandled predicates?
>
> Telmo Rodrigues
>
> No dia 11/05/2016, às 09:49, Rishi Mishra 
> escreveu:
>
> It does push the predicate. But as a relations are generic and might or
> might not handle some of the predicates , it needs to apply filter of
> un-handled predicates.
>
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>
> On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <
> telmo.galante.rodrig...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a question about the Catalyst optimizer in Spark 1.6.
>>
>> initial logical plan:
>>
>> !'Project [unresolvedalias(*)]
>> !+- 'Filter ('t.id = 1)
>> !   +- 'Join Inner, Some(('t.id = 'u.id))
>> !  :- 'UnresolvedRelation `t`, None
>> !  +- 'UnresolvedRelation `u`, None
>>
>>
>> logical plan after optimizer execution:
>>
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !  :- Subquery t
>> !  :  +- Relation[id#0L] JSONRelation
>> !  +- Subquery u
>> !  +- Relation[id#1L] JSONRelation
>>
>>
>> Shouldn't the optimizer push down predicates to subquery t in order to
>> the filter be executed before join?
>>
>> Thanks
>>
>>
>>
>


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Telmo Rodrigues
In this case, isn't better to perform the filter earlier as possible even there 
could be unhandled predicates?

Telmo Rodrigues

No dia 11/05/2016, às 09:49, Rishi Mishra  escreveu:

> It does push the predicate. But as a relations are generic and might or might 
> not handle some of the predicates , it needs to apply filter of un-handled 
> predicates. 
> 
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
> 
> https://in.linkedin.com/in/rishiteshmishra
> 
>> On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues 
>>  wrote:
>> Hello,
>> 
>> I have a question about the Catalyst optimizer in Spark 1.6.
>> 
>> initial logical plan:
>> 
>> !'Project [unresolvedalias(*)]
>> !+- 'Filter ('t.id = 1)
>> !   +- 'Join Inner, Some(('t.id = 'u.id))
>> !  :- 'UnresolvedRelation `t`, None
>> !  +- 'UnresolvedRelation `u`, None
>> 
>> 
>> logical plan after optimizer execution:
>> 
>> Project [id#0L,id#1L]
>> !+- Filter (id#0L = cast(1 as bigint))
>> !   +- Join Inner, Some((id#0L = id#1L))
>> !  :- Subquery t
>> !  :  +- Relation[id#0L] JSONRelation
>> !  +- Subquery u
>> !  +- Relation[id#1L] JSONRelation
>> 
>> 
>> Shouldn't the optimizer push down predicates to subquery t in order to the 
>> filter be executed before join?
>> 
>> Thanks
> 


Re: Spark 1.6 Catalyst optimizer

2016-05-11 Thread Rishi Mishra
It does push the predicate. But as a relations are generic and might or
might not handle some of the predicates , it needs to apply filter of
un-handled predicates.

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Wed, May 11, 2016 at 6:27 AM, Telmo Rodrigues <
telmo.galante.rodrig...@gmail.com> wrote:

> Hello,
>
> I have a question about the Catalyst optimizer in Spark 1.6.
>
> initial logical plan:
>
> !'Project [unresolvedalias(*)]
> !+- 'Filter ('t.id = 1)
> !   +- 'Join Inner, Some(('t.id = 'u.id))
> !  :- 'UnresolvedRelation `t`, None
> !  +- 'UnresolvedRelation `u`, None
>
>
> logical plan after optimizer execution:
>
> Project [id#0L,id#1L]
> !+- Filter (id#0L = cast(1 as bigint))
> !   +- Join Inner, Some((id#0L = id#1L))
> !  :- Subquery t
> !  :  +- Relation[id#0L] JSONRelation
> !  +- Subquery u
> !  +- Relation[id#1L] JSONRelation
>
>
> Shouldn't the optimizer push down predicates to subquery t in order to the
> filter be executed before join?
>
> Thanks
>
>
>