Hi Jacek,

I haven't played with 2.1.0 yet, so not sure how much more optimized Window 
functions are compared to 1.6 and 2.0.


However, one thing I do see in the self-join is a broadcast. So there's going 
to be a need broadcast the results of the groupBy out to the executors before 
it can do the join. In both cases it's shuffling the data (for the groupBy or 
the Window).


Have you tried running both queries to see? Would be interesting to test it on 
varying data volumes as well (e.g. what if there's no broadcast).


Thanks,

Silvio

________________________________
From: Jacek Laskowski <ja...@japila.pl>
Sent: Wednesday, November 9, 2016 7:36:47 AM
To: user
Subject: Physical plan for windows and joins - how to know which is faster?

Hi,

While playing around with Spark 2.1.0-SNAPSHOT (built today) and
explain'ing two queries with WindowSpec and inner join I found the
following plans and am wondering if you could help me to judge which
query could be faster.

What else would you ask for to be able to answer the question of one
being more efficient than the other?

Just by looking at the Spark's "stack traces" of the queries one could
say that windowed variant (first one) is gonna be faster (as there are
less physical operators) yet top-level Window operator is not
codegened so it might be misleading.

I'd appreciate your help to get me better at reading such trees. Thanks!

scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain
== Physical Plan ==
Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS
sum(id)#665L], [ID % 3#60]
+- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ID % 3#60, 200)
      +- LocalTableScan [id#15, multiplied#16, ID % 3#60]

scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain
== Physical Plan ==
*Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L]
+- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight
   :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS
ID % 3#60]
   :  +- *Filter isnotnull((_1#12 % 3))
   :     +- LocalTableScan [_1#12, _2#13]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, true] as bigint)))
      +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15
as bigint))])
         +- Exchange hashpartitioning(ID % 3#681, 200)
            +- *HashAggregate(keys=[ID % 3#681],
functions=[partial_sum(cast(id#15 as bigint))])
               +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681]
                  +- *Filter isnotnull((_1#12 % 3))
                     +- LocalTableScan [_1#12, _2#13]

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to