[ https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945166#comment-15945166 ]
Mitesh edited comment on SPARK-19981 at 3/28/17 1:33 PM: --------------------------------------------------------- As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038edffff5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as {{a#1 as newA#2}}. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on {[newA}] is unnecessary. {code} scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] {code} was (Author: masterddt): As I mentioned on the PR, this seems like it should be handled here: https://github.com/maropu/spark/blob/b5d1038edffff5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41 Perhaps it should handle canonicalizing an alias so `a#1` is the same as `a#1 as newA#2`. Otherwise you have a similar problem with sorting. Here is a sort example with 1 partition. I believe the extra sort on `newA` is unnecessary. ``` scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", "b").coalesce(1).sortWithinPartitions("a") df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int] scala> val df2 = df1.selectExpr("a as newA", "b") df2: org.apache.spark.sql.DataFrame = [newA: int, b: int] scala> println(df1.join(df2, df1("a") === df2("newA")).queryExecution.executedPlan) *SortMergeJoin [args=[a#37225], [newA#37232], Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, newA#37232:int%NONNULL, b#37243:int%NONNULL)] :- *Sort [args=[a#37225 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] : +- LocalTableScan [args=[a#37225, b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)] +- *Sort [args=[newA#37232 ASC], false, 0][outPart=SinglePartition][outOrder=List(newA#37232 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Project [args=[a#37242 AS newA#37232, b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)] +- *Sort [args=[a#37242 ASC], false, 0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- Coalesce [args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)] +- LocalTableScan [args=[a#37242, b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]``` > Sort-Merge join inserts shuffles when joining dataframes with aliased columns > ----------------------------------------------------------------------------- > > Key: SPARK-19981 > URL: https://issues.apache.org/jira/browse/SPARK-19981 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.2 > Reporter: Allen George > > Performing a sort-merge join with two dataframes - each of which has the join > column aliased - causes Spark to insert an unnecessary shuffle. > Consider the scala test code below, which should be equivalent to the > following SQL. > {code:SQL} > SELECT * FROM > (SELECT number AS aliased from df1) t1 > LEFT JOIN > (SELECT number AS aliased from df2) t2 > ON t1.aliased = t2.aliased > {code} > {code:scala} > private case class OneItem(number: Long) > private case class TwoItem(number: Long, value: String) > test("join with aliases should not trigger shuffle") { > val df1 = sqlContext.createDataFrame( > Seq( > OneItem(0), > OneItem(2), > OneItem(4) > ) > ) > val partitionedDf1 = df1.repartition(10, col("number")) > partitionedDf1.createOrReplaceTempView("df1") > partitionedDf1.cache() partitionedDf1.count() > > val df2 = sqlContext.createDataFrame( > Seq( > TwoItem(0, "zero"), > TwoItem(2, "two"), > TwoItem(4, "four") > ) > ) > val partitionedDf2 = df2.repartition(10, col("number")) > partitionedDf2.createOrReplaceTempView("df2") > partitionedDf2.cache() partitionedDf2.count() > > val fromDf1 = sqlContext.sql("SELECT number from df1") > val fromDf2 = sqlContext.sql("SELECT number from df2") > val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased") > val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased") > aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") } > {code} > Both the SQL and the Scala code generate a query-plan where an extra exchange > is inserted before performing the sort-merge join. This exchange changes the > partitioning from {{HashPartitioning("number", 10)}} for each frame being > joined into {{HashPartitioning("aliased", 5)}}. I would have expected that > since it's a simple column aliasing, and both frames have exactly the same > partitioning that the initial frames. > {noformat} > *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, > hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)] > +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], > Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, > 5)%NONNULL,hashpartitioning(aliased#270L, > 5)%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, > aliased#270:bigint%NONNULL)] > :- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L > ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)] > : +- Exchange [args=hashpartitioning(aliased#267L, > 5)%NONNULL][outPart=HashPartitioning(5, > aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] > : +- *Project [args=[number#198L AS > aliased#267L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] > : +- InMemoryTableScan > [args=[number#198L]][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)] > : : +- InMemoryRelation [number#198L], true, 10000, > StorageLevel(disk, memory, deserialized, 1 replicas), > false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)] > : : : +- Exchange [args=hashpartitioning(number#198L, > 10)%NONNULL][outPart=HashPartitioning(10, > number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)] > : : : +- LocalTableScan > [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)] > +- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, > aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L > ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)] > +- Exchange [args=hashpartitioning(aliased#270L, > 5)%NONNULL][outPart=HashPartitioning(5, > aliased#270:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)] > +- *Project [args=[number#223L AS > aliased#270L]][outPart=HashPartitioning(10, > number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)] > +- InMemoryTableScan > [args=[number#223L]][outPart=HashPartitioning(10, > number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#223:bigint%NONNULL)] > : +- InMemoryRelation [number#223L, value#224], true, 10000, > StorageLevel(disk, memory, deserialized, 1 replicas), > false[Statistics(47,false)][output=List(number#223:bigint%NONNULL, > value#224:string%NULL)] > : : +- Exchange [args=hashpartitioning(number#223L, > 10)%NONNULL][outPart=HashPartitioning(10, > number#223:bigint%NONNULL)][outOrder=List()][output=List(number#223:bigint%NONNULL, > value#224:string%NULL)] > : : +- LocalTableScan [args=[number#223L, > value#224]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#223:bigint%NONNULL, > value#224:string%NULL)] > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org