[ https://issues.apache.org/jira/browse/SPARK-25071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581957#comment-16581957 ]
Ayush Anubhava commented on SPARK-25071: ---------------------------------------- Thanks for the information. Please, ponder upon this scenario, if we consider number of rows /Row Count as the CBO parameter. Please find the below two scenarios for the comparison: {code:java} scala> spark.sql("CREATE TABLE small7 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='600','totalSize'='80000000000')") 18/08/16 12:38:14 WARN HiveMetaStore: Location: hdfs://hacluster/user/sparkhive/warehouse/jira.db/small7 specified for non-external table:small7 res89: org.apache.spark.sql.DataFrame = [] scala> spark.sql("CREATE TABLE big7 (c1 bigint, c2 bigint) TBLPROPERTIES ('numRows'='1', 'rawDataSize'='60000000', 'totalSize'='800')") 18/08/16 12:38:14 WARN HiveMetaStore: Location: hdfs://hacluster/user/sparkhive/warehouse/jira.db/big7 specified for non-external table:big7 res90: org.apache.spark.sql.DataFrame = [] scala> val plan = spark.sql("select * from small7 t1 join big7 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan plan: org.apache.spark.sql.execution.SparkPlan = *(2) BroadcastHashJoin [c1#149L], [c1#150L], Inner, BuildRight :- *(2) Filter isnotnull(c1#149L) : +- HiveTableScan [c1#149L], HiveTableRelation `jira`.`small7`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#149L] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) +- *(1) Filter isnotnull(c1#150L) +- HiveTableScan [c1#150L, c2#151L], HiveTableRelation `jira`.`big7`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#150L, c2#151L] scala> val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight scala> println(buildSide) BuildRight {code} {code:java} scala> spark.sql("CREATE TABLE small8 (c1 bigint) TBLPROPERTIES ('numRows'='2', 'rawDataSize'='600','totalSize'='80000000000')") 18/08/16 12:39:09 WARN HiveMetaStore: Location: hdfs://hacluster/user/sparkhive/warehouse/jira.db/small8 specified for non-external table:small8 res92: org.apache.spark.sql.DataFrame = [] scala> spark.sql("CREATE TABLE big8 (c1 bigint, c2 string) TBLPROPERTIES ('numRows'='1', 'rawDataSize'='60000000', 'totalSize'='800')") 18/08/16 12:39:09 WARN HiveMetaStore: Location: hdfs://hacluster/user/sparkhive/warehouse/jira.db/big8 specified for non-external table:big8 res93: org.apache.spark.sql.DataFrame = [] scala> val plan = spark.sql("select * from small8 t1 join big8 t2 on (t1.c1 = t2.c1)").queryExecution.executedPlan plan: org.apache.spark.sql.execution.SparkPlan = *(2) BroadcastHashJoin [c1#155L], [c1#156L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Filter isnotnull(c1#155L) : +- HiveTableScan [c1#155L], HiveTableRelation `jira`.`small8`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#155L] +- *(2) Filter isnotnull(c1#156L) +- HiveTableScan [c1#156L, c2#157], HiveTableRelation `jira`.`big8`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#156L, c2#157] scala> val buildSide = plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildLeft scala> println(buildSide) BuildLeft {code} As per theno. of rows behavior, In second scenario also the build side should have been in accordance with scenario 1. Any suggestions would be appreciated. > BuildSide is coming not as expected with join queries > ----------------------------------------------------- > > Key: SPARK-25071 > URL: https://issues.apache.org/jira/browse/SPARK-25071 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.1 > Environment: Spark 2.3.1 > Hadoop 2.7.3 > Reporter: Ayush Anubhava > Priority: Major > > *BuildSide is not coming as expected.* > Pre-requisites: > *CBO is set as true & spark.sql.cbo.joinReorder.enabled= true.* > *import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec* > *Steps:* > *Scenario 1:* > spark.sql("CREATE TABLE small3 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='600','totalSize'='80000000000')") > spark.sql("CREATE TABLE big3 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='60000000', 'totalSize'='800')") > val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > println(buildSide) > > *Result 1:* > scala> val plan = spark.sql("select * from small3 t1 join big3 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > plan: org.apache.spark.sql.execution.SparkPlan = > *(2) BroadcastHashJoin [c1#0L|#0L], [c1#1L|#1L], Inner, BuildRight > :- *(2) Filter isnotnull(c1#0L) > : +- HiveTableScan [c1#0L|#0L], HiveTableRelation `default`.`small3`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#0L|#0L] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) > +- *(1) Filter isnotnull(c1#1L) > +- HiveTableScan [c1#1L|#1L], HiveTableRelation `default`.`big3`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#1L|#1L] > scala> val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > buildSide: org.apache.spark.sql.execution.joins.BuildSide = BuildRight > scala> println(buildSide) > *BuildRight* > > *Scenario 2:* > spark.sql("CREATE TABLE small4 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='600','totalSize'='80')") > spark.sql("CREATE TABLE big4 (c1 bigint) TBLPROPERTIES ('numRows'='2', > 'rawDataSize'='60000000', 'totalSize'='800')") > val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > println(buildSide) > *Result 2:* > scala> val plan = spark.sql("select * from small4 t1 join big4 t2 on (t1.c1 = > t2.c1)").queryExecution.executedPlan > plan: org.apache.spark.sql.execution.SparkPlan = > *(2) BroadcastHashJoin [c1#4L|#4L], [c1#5L|#5L], Inner, BuildRight > :- *(2) Filter isnotnull(c1#4L) > : +- HiveTableScan [c1#4L|#4L], HiveTableRelation `default`.`small4`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#4L|#4L] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, > false])) > +- *(1) Filter isnotnull(c1#5L) > +- HiveTableScan [c1#5L|#5L], HiveTableRelation `default`.`big4`, > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#5L|#5L] > scala> val buildSide = > plan.children.head.asInstanceOf[BroadcastHashJoinExec].buildSide > buildSide: org.apache.spark.sql.execution.joins.BuildSide = *BuildRight* > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org