[ 
https://issues.apache.org/jira/browse/SPARK-25071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581957#comment-16581957
 ] 

Ayush Anubhava commented on SPARK-25071:
----------------------------------------

Thanks for the information. Please, ponder upon this scenario, if we consider  
number of rows /Row Count as the CBO parameter. Please find  the below two 
scenarios for the comparison:
{code:java}
scala> spark.sql("CREATE TABLE small7 (c1 bigint) TBLPROPERTIES ('numRows'='2', 
'rawDataSize'='600','totalSize'='80000000000')")
18/08/16 12:38:14 WARN HiveMetaStore: Location: 
hdfs://hacluster/user/sparkhive/warehouse/jira.db/small7 specified for 
non-external table:small7
res89: org.apache.spark.sql.DataFrame = []

scala> spark.sql("CREATE TABLE big7 (c1 bigint, c2 bigint) TBLPROPERTIES 
('numRows'='1', 'rawDataSize'='60000000', 'totalSize'='800')")
18/08/16 12:38:14 WARN HiveMetaStore: Location: 
hdfs://hacluster/user/sparkhive/warehouse/jira.db/big7 specified for 
non-external table:big7
res90: org.apache.spark.sql.DataFrame = []

scala> val plan = spark.sql("select * from small7 t1 join big7 t2 on (t1.c1 = 
t2.c1)").queryExecution.executedPlan
plan: org.apache.spark.sql.execution.SparkPlan =
*(2) BroadcastHashJoin [c1#149L], [c1#150L], Inner, BuildRight
:- *(2) Filter isnotnull(c1#149L)
: +- HiveTableScan [c1#149L], HiveTableRelation `jira`.`small7`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#149L]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
+- *(1) Filter isnotnull(c1#150L)
+- HiveTableScan [c1#150L, c2#151L], HiveTableRelation `jira`.`big7`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#150L, c2#151L]

scala> val buildSide = 
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight

scala> println(buildSide)
BuildRight
{code}
{code:java}
 
scala> spark.sql("CREATE TABLE small8 (c1 bigint) TBLPROPERTIES ('numRows'='2', 
'rawDataSize'='600','totalSize'='80000000000')") 18/08/16 12:39:09 WARN 
HiveMetaStore: Location: 
hdfs://hacluster/user/sparkhive/warehouse/jira.db/small8 specified for 
non-external table:small8 res92: org.apache.spark.sql.DataFrame = []
scala> spark.sql("CREATE TABLE big8 (c1 bigint, c2 string) TBLPROPERTIES 
('numRows'='1', 'rawDataSize'='60000000', 'totalSize'='800')") 18/08/16 
12:39:09 WARN HiveMetaStore: Location: 
hdfs://hacluster/user/sparkhive/warehouse/jira.db/big8 specified for 
non-external table:big8 res93: org.apache.spark.sql.DataFrame = []
scala> val plan = spark.sql("select * from small8 t1 join big8 t2 on (t1.c1 = 
t2.c1)").queryExecution.executedPlan plan: 
org.apache.spark.sql.execution.SparkPlan = *(2) BroadcastHashJoin [c1#155L], 
[c1#156L], Inner, BuildLeft :- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Filter 
isnotnull(c1#155L) : +- HiveTableScan [c1#155L], HiveTableRelation 
`jira`.`small8`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#155L] 
+- *(2) Filter isnotnull(c1#156L) +- HiveTableScan [c1#156L, c2#157], 
HiveTableRelation `jira`.`big8`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#156L, c2#157]
scala> val buildSide = 
plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide buildSide: 
org.apache.spark.sql.execution.joins.BuildSide = BuildLeft 
scala> println(buildSide)
BuildLeft
{code}
As per theno. of rows behavior, In second scenario also the build side should 
have been in accordance with scenario 1.

Any suggestions would be appreciated.

 

> BuildSide is coming not as expected with join queries
> -----------------------------------------------------
>
>                 Key: SPARK-25071
>                 URL: https://issues.apache.org/jira/browse/SPARK-25071
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.1
>         Environment: Spark 2.3.1 
> Hadoop 2.7.3
>            Reporter: Ayush Anubhava
>            Priority: Major
>
> *BuildSide is not coming as expected.*
> Pre-requisites:
> *CBO is set as true &  spark.sql.cbo.joinReorder.enabled= true.*
> *import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec*
> *Steps:*
> *Scenario 1:*
> spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2', 
> 'rawDataSize'='600','totalSize'='80000000000')")
>  spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2', 
> 'rawDataSize'='60000000', 'totalSize'='800')")
>  val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = 
> t2.c1)").queryExecution.executedPlan
>  val buildSide = 
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
>  println(buildSide)
>  
> *Result 1:*
> scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = 
> t2.c1)").queryExecution.executedPlan
>  plan: org.apache.spark.sql.execution.SparkPlan =
>  *(2) BroadcastHashJoin [c1#0L|#0L], [c1#1L|#1L], Inner, BuildRight
>  :- *(2) Filter isnotnull(c1#0L)
>  : +- HiveTableScan [c1#0L|#0L], HiveTableRelation `default`.`small3`, 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#0L|#0L]
>  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>  +- *(1) Filter isnotnull(c1#1L)
>  +- HiveTableScan [c1#1L|#1L], HiveTableRelation `default`.`big3`, 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#1L|#1L]
> scala> val buildSide = 
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
>  buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight
> scala> println(buildSide)
>  *BuildRight*
>  
> *Scenario 2:*
> spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2', 
> 'rawDataSize'='600','totalSize'='80')")
>  spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2', 
> 'rawDataSize'='60000000', 'totalSize'='800')")
>  val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = 
> t2.c1)").queryExecution.executedPlan
>  val buildSide = 
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
>  println(buildSide)
> *Result 2:*
> scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = 
> t2.c1)").queryExecution.executedPlan
>  plan: org.apache.spark.sql.execution.SparkPlan =
>  *(2) BroadcastHashJoin [c1#4L|#4L], [c1#5L|#5L], Inner, BuildRight
>  :- *(2) Filter isnotnull(c1#4L)
>  : +- HiveTableScan [c1#4L|#4L], HiveTableRelation `default`.`small4`, 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#4L|#4L]
>  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>  +- *(1) Filter isnotnull(c1#5L)
>  +- HiveTableScan [c1#5L|#5L], HiveTableRelation `default`.`big4`, 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#5L|#5L]
> scala> val buildSide = 
> plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide
>  buildSide: org.apache.spark.sql.execution.joins.BuildSide = *BuildRight*
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to