wangxiaoying commented on issue #423:
URL: 
https://github.com/apache/incubator-wayang/issues/423#issuecomment-2043243095

   > 1. The type of join that Spark SQL uses. Wayang's current join operator 
maps to the corresponding join in RDDs, which if I'm not mistaken is 
implemented as as hash join. Maybe Spark SQL uses a broadcast join and thus, 
the difference in the data transferred?
   
   Yes, I think the executed join algorithms are different from the two 
approaches. Below is the default physical plan generated by spark:
   ```
   +- == Final Plan ==
      *(8) Sort [revenue#138 DESC NULLS LAST, o_orderdate#58 ASC NULLS FIRST], 
true, 0
      +- AQEShuffleRead coalesced
         +- ShuffleQueryStage 5
            +- Exchange rangepartitioning(revenue#138 DESC NULLS LAST, 
o_orderdate#58 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=448]
               +- *(7) HashAggregate(keys=[l_orderkey#0, o_orderdate#58, 
o_shippriority#61], functions=[sum((l_extendedprice#5 * (1 - l_discount#6)))])
                  +- *(7) HashAggregate(keys=[l_orderkey#0, o_orderdate#58, 
o_shippriority#61], functions=[partial_sum((l_extendedprice#5 * (1 - 
l_discount#6)))])
                     +- *(7) Project [o_orderdate#58, o_shippriority#61, 
l_orderkey#0, l_extendedprice#5, l_discount#6]
                        +- *(7) SortMergeJoin [o_orderkey#54], [l_orderkey#0], 
Inner
                           :- *(5) Sort [o_orderkey#54 ASC NULLS FIRST], false, 0
                           :  +- AQEShuffleRead coalesced
                           :     +- ShuffleQueryStage 4
                           :        +- Exchange hashpartitioning(o_orderkey#54, 
200), ENSURE_REQUIREMENTS, [plan_id=341]
                           :           +- *(4) Project [o_orderkey#54, 
o_orderdate#58, o_shippriority#61]
                           :              +- *(4) BroadcastHashJoin 
[c_custkey#36], [o_custkey#55], Inner, BuildLeft, false
                           :                 :- BroadcastQueryStage 3
                           :                 :  +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[plan_id=225]
                           :                 :     +- AQEShuffleRead local
                           :                 :        +- ShuffleQueryStage 0
                           :                 :           +- Exchange 
hashpartitioning(c_custkey#36, 200), ENSURE_REQUIREMENTS, [plan_id=132]
                           :                 :              +- *(1) Project 
[c_custkey#36]
                           :                 :                 +- *(1) Filter 
(staticinvoke(class org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils, 
StringType, readSidePadding, c_mktsegment#42, 10, true, false, true) = BUILDING 
 )
                           :                 :                    +- *(1) Scan 
JDBCRelation(public.customer) [numPartitions=1] [c_custkey#36,c_mktsegment#42] 
PushedFilters: [*IsNotNull(c_custkey)], ReadSchema: 
struct<c_custkey:int,c_mktsegment:string>
                           :                 +- AQEShuffleRead local
                           :                    +- ShuffleQueryStage 1
                           :                       +- Exchange 
hashpartitioning(o_custkey#55, 200), ENSURE_REQUIREMENTS, [plan_id=139]
                           :                          +- *(2) Scan 
JDBCRelation(public.orders) [numPartitions=1] 
[o_orderkey#54,o_custkey#55,o_orderdate#58,o_shippriority#61] PushedFilters: 
[*IsNotNull(o_orderdate), *LessThan(o_orderdate,1995-03-15), 
*IsNotNull(o_custkey), *IsNotNull(o_..., ReadSchema: 
struct<o_orderkey:int,o_custkey:int,o_orderdate:date,o_shippriority:int>
                           +- *(6) Sort [l_orderkey#0 ASC NULLS FIRST], false, 0
                              +- AQEShuffleRead coalesced
                                 +- ShuffleQueryStage 2
                                    +- Exchange hashpartitioning(l_orderkey#0, 
200), ENSURE_REQUIREMENTS, [plan_id=150]
                                       +- *(3) Scan 
JDBCRelation(public.lineitem) [numPartitions=1] 
[l_orderkey#0,l_extendedprice#5,l_discount#6] PushedFilters: 
[*IsNotNull(l_shipdate), *GreaterThan(l_shipdate,1995-03-15), 
*IsNotNull(l_orderkey)], ReadSchema: 
struct<l_orderkey:int,l_extendedprice:decimal(15,2),l_discount:decimal(15,2)>
   ```
   
   I tried to add config: `.config("spark.sql.join.preferSortMergeJoin", 
"false")` when building the spark session so the `SortMergeJoin` above will 
become a `HashJoin`, but the performance does not changes much. `BoradcastJoin` 
is still used though.
   
   > 2. I'm not very familiar with the views in Spark, but when one registers 
the temporary views are they materialized in memory? If so, the timer you have 
would measure data accessed via memory. But again not sure how the temp views 
in Spark work. Maybe you could time the registerviews method to check this out.
   
   Spark uses lazy evaluation so the view creation does not take much time 
(only some metadata will be fetched). And as I have shown above in the postgres 
log, spark does fetch the three tables (with projection and filter pushdown) 
during runtime like wayang does.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to