[ https://issues.apache.org/jira/browse/SPARK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16737964#comment-16737964 ]
John Zhuge commented on SPARK-26576: ------------------------------------ No issue on the master branch. Please note "rightHint=(broadcast)" for the Join in Optimized Plan. {noformat} scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => df.join(broadcast(df), "dateint").explain(true)) == Parsed Logical Plan == 'Join UsingJoin(Inner,List(dateint)) :- SubqueryAlias `jzhuge`.`parquet_with_part` : +- Relation[val#34,dateint#35] parquet +- ResolvedHint (broadcast) +- SubqueryAlias `jzhuge`.`parquet_with_part` +- Relation[val#40,dateint#41] parquet == Analyzed Logical Plan == dateint: int, val: string, val: string Project [dateint#35, val#34, val#40] +- Join Inner, (dateint#35 = dateint#41) :- SubqueryAlias `jzhuge`.`parquet_with_part` : +- Relation[val#34,dateint#35] parquet +- ResolvedHint (broadcast) +- SubqueryAlias `jzhuge`.`parquet_with_part` +- Relation[val#40,dateint#41] parquet == Optimized Logical Plan == Project [dateint#35, val#34, val#40] +- Join Inner, (dateint#35 = dateint#41), rightHint=(broadcast) :- Project [val#34, dateint#35] : +- Filter isnotnull(dateint#35) : +- Relation[val#34,dateint#35] parquet +- Project [val#40, dateint#41] +- Filter isnotnull(dateint#41) +- Relation[val#40,dateint#41] parquet == Physical Plan == *(2) Project [dateint#35, val#34, val#40] +- *(2) BroadcastHashJoin [dateint#35], [dateint#41], Inner, BuildRight :- *(2) FileScan parquet jzhuge.parquet_with_part[val#34,dateint#35] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#35)], PushedFilters: [], ReadSchema: struct<val:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint))) +- *(1) FileScan parquet jzhuge.parquet_with_part[val#40,dateint#41] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionCount: 0, PartitionFilters: [isnotnull(dateint#41)], PushedFilters: [], ReadSchema: struct<val:string> {noformat} >From a quick look at the source, EliminateResolvedHint pulls broadcast hint >into Join and eliminates the ResolvedHint node. It is called before >PruneFileSourcePartitions so the above code in >PhysicalOperation.collectProjectsAndFilters is never called on master branch. > Broadcast hint not applied to partitioned Parquet table > ------------------------------------------------------- > > Key: SPARK-26576 > URL: https://issues.apache.org/jira/browse/SPARK-26576 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2, 2.4.0 > Reporter: John Zhuge > Priority: Major > > Broadcast hint is not applied to partitioned Parquet table. Below > "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed > in Optimized Plan. > {noformat} > scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) > PARTITIONED BY (dateint INT) STORED AS parquet") > scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") > scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => > df.join(broadcast(df), "dateint").explain(true)) > == Parsed Logical Plan == > 'Join UsingJoin(Inner,List(dateint)) > :- SubqueryAlias `jzhuge`.`parquet_with_part` > : +- Relation[val#28,dateint#29] parquet > +- ResolvedHint (broadcast) > +- SubqueryAlias `jzhuge`.`parquet_with_part` > +- Relation[val#32,dateint#33] parquet > == Analyzed Logical Plan == > dateint: int, val: string, val: string > Project [dateint#29, val#28, val#32] > +- Join Inner, (dateint#29 = dateint#33) > :- SubqueryAlias `jzhuge`.`parquet_with_part` > : +- Relation[val#28,dateint#29] parquet > +- ResolvedHint (broadcast) > +- SubqueryAlias `jzhuge`.`parquet_with_part` > +- Relation[val#32,dateint#33] parquet > == Optimized Logical Plan == > Project [dateint#29, val#28, val#32] > +- Join Inner, (dateint#29 = dateint#33) > :- Project [val#28, dateint#29] > : +- Filter isnotnull(dateint#29) > : +- Relation[val#28,dateint#29] parquet > +- Project [val#32, dateint#33] > +- Filter isnotnull(dateint#33) > +- Relation[val#32,dateint#33] parquet > == Physical Plan == > *(5) Project [dateint#29, val#28, val#32] > +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner > :- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0 > : +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, > 500), coordinator[target post-shuffle partition size: 67108864] > : +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] > Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], > PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: > [], ReadSchema: struct<val:string> > +- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0 > +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: > 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle > partition size: 67108864] > {noformat} > Broadcast hint is applied to Parquet table without partition. Below > "BroadcastHashJoin" is chosen as expected. > {noformat} > scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint > INT) STORED AS parquet") > scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") > scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => > df.join(broadcast(df), "dateint").explain(true)) > == Parsed Logical Plan == > 'Join UsingJoin(Inner,List(dateint)) > :- SubqueryAlias `jzhuge`.`parquet_no_part` > : +- Relation[val#44,dateint#45] parquet > +- ResolvedHint (broadcast) > +- SubqueryAlias `jzhuge`.`parquet_no_part` > +- Relation[val#50,dateint#51] parquet > == Analyzed Logical Plan == > dateint: int, val: string, val: string > Project [dateint#45, val#44, val#50] > +- Join Inner, (dateint#45 = dateint#51) > :- SubqueryAlias `jzhuge`.`parquet_no_part` > : +- Relation[val#44,dateint#45] parquet > +- ResolvedHint (broadcast) > +- SubqueryAlias `jzhuge`.`parquet_no_part` > +- Relation[val#50,dateint#51] parquet > == Optimized Logical Plan == > Project [dateint#45, val#44, val#50] > +- Join Inner, (dateint#45 = dateint#51) > :- Filter isnotnull(dateint#45) > : +- Relation[val#44,dateint#45] parquet > +- ResolvedHint (broadcast) > +- Filter isnotnull(dateint#51) > +- Relation[val#50,dateint#51] parquet > == Physical Plan == > *(2) Project [dateint#45, val#44, val#50] > +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight > :- *(2) Project [val#44, dateint#45] > : +- *(2) Filter isnotnull(dateint#45) > : +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] > Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], > PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: > struct<val:string,dateint:int> > +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, > true] as bigint))) > +- *(1) Project [val#50, dateint#51] > +- *(1) Filter isnotnull(dateint#51) > +- *(1) FileScan parquet > jzhuge.parquet_no_part[val#50,dateint#51] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[...], PartitionFilters: [], PushedFilters: > [IsNotNull(dateint)], ReadSchema: struct<val:string,dateint:int> > {noformat} > Observed similar issue with partitioned Orc table. SequenceFile is fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org