[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2019-02-06 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761535#comment-16761535
 ] 

Mitesh edited comment on SPARK-19981 at 2/6/19 7:12 PM:


Ping [~maropu] any updates here? This still is an issue in 2.3.2. 

Also maybe a dupe of SPARK-19468


was (Author: masterddt):
Ping any updates here? This still is an issue in 2.3.2. 

Also maybe a dupe of SPARK-19468

> Sort-Merge join inserts shuffles when joining dataframes with aliased columns
> -
>
> Key: SPARK-19981
> URL: https://issues.apache.org/jira/browse/SPARK-19981
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Allen George
>Priority: Major
>
> Performing a sort-merge join with two dataframes - each of which has the join 
> column aliased - causes Spark to insert an unnecessary shuffle.
> Consider the scala test code below, which should be equivalent to the 
> following SQL.
> {code:SQL}
> SELECT * FROM
>   (SELECT number AS aliased from df1) t1
> LEFT JOIN
>   (SELECT number AS aliased from df2) t2
> ON t1.aliased = t2.aliased
> {code}
> {code:scala}
> private case class OneItem(number: Long)
> private case class TwoItem(number: Long, value: String)
> test("join with aliases should not trigger shuffle") {
>   val df1 = sqlContext.createDataFrame(
> Seq(
>   OneItem(0),
>   OneItem(2),
>   OneItem(4)
> )
>   )
>   val partitionedDf1 = df1.repartition(10, col("number"))
>   partitionedDf1.createOrReplaceTempView("df1")
>   partitionedDf1.cache() partitionedDf1.count()
>   
>   val df2 = sqlContext.createDataFrame(
> Seq(
>   TwoItem(0, "zero"),
>   TwoItem(2, "two"),
>   TwoItem(4, "four")
> )
>   )
>   val partitionedDf2 = df2.repartition(10, col("number"))
>   partitionedDf2.createOrReplaceTempView("df2")
>   partitionedDf2.cache() partitionedDf2.count()
>   
>   val fromDf1 = sqlContext.sql("SELECT number from df1")
>   val fromDf2 = sqlContext.sql("SELECT number from df2")
>   val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased")
>   val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased")
>   aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") }
> {code}
> Both the SQL and the Scala code generate a query-plan where an extra exchange 
> is inserted before performing the sort-merge join. This exchange changes the 
> partitioning from {{HashPartitioning("number", 10)}} for each frame being 
> joined into {{HashPartitioning("aliased", 5)}}. I would have expected that 
> since it's a simple column aliasing, and both frames have exactly the same 
> partitioning that the initial frames.
> {noformat} 
> *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, 
> hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)]
> +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], 
> Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 
> 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, 
> aliased#270:bigint%NONNULL)]
>:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:  +- Exchange [args=hashpartitioning(aliased#267L, 
> 5)%NONNULL][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>: +- *Project [args=[number#198L AS 
> aliased#267L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:+- InMemoryTableScan 
> [args=[number#198L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)]
>:   :  +- InMemoryRelation [number#198L], true, 1, 
> StorageLevel(disk, memory, deserialized, 1 replicas), 
> false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)]
>:   : :  +- Exchange [args=hashpartitioning(number#198L, 
> 10)%NONNULL][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>:   : : +- LocalTableScan 
> [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L 
> 

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2019-02-05 Thread Mitesh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16761535#comment-16761535
 ] 

Mitesh edited comment on SPARK-19981 at 2/6/19 6:54 AM:


Ping any updates here? This still is an issue in 2.3.2. 

Also maybe a dupe of SPARK-19468


was (Author: masterddt):
Ping any updates here? This still is an issue in 2.3.2.

> Sort-Merge join inserts shuffles when joining dataframes with aliased columns
> -
>
> Key: SPARK-19981
> URL: https://issues.apache.org/jira/browse/SPARK-19981
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Allen George
>Priority: Major
>
> Performing a sort-merge join with two dataframes - each of which has the join 
> column aliased - causes Spark to insert an unnecessary shuffle.
> Consider the scala test code below, which should be equivalent to the 
> following SQL.
> {code:SQL}
> SELECT * FROM
>   (SELECT number AS aliased from df1) t1
> LEFT JOIN
>   (SELECT number AS aliased from df2) t2
> ON t1.aliased = t2.aliased
> {code}
> {code:scala}
> private case class OneItem(number: Long)
> private case class TwoItem(number: Long, value: String)
> test("join with aliases should not trigger shuffle") {
>   val df1 = sqlContext.createDataFrame(
> Seq(
>   OneItem(0),
>   OneItem(2),
>   OneItem(4)
> )
>   )
>   val partitionedDf1 = df1.repartition(10, col("number"))
>   partitionedDf1.createOrReplaceTempView("df1")
>   partitionedDf1.cache() partitionedDf1.count()
>   
>   val df2 = sqlContext.createDataFrame(
> Seq(
>   TwoItem(0, "zero"),
>   TwoItem(2, "two"),
>   TwoItem(4, "four")
> )
>   )
>   val partitionedDf2 = df2.repartition(10, col("number"))
>   partitionedDf2.createOrReplaceTempView("df2")
>   partitionedDf2.cache() partitionedDf2.count()
>   
>   val fromDf1 = sqlContext.sql("SELECT number from df1")
>   val fromDf2 = sqlContext.sql("SELECT number from df2")
>   val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased")
>   val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased")
>   aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") }
> {code}
> Both the SQL and the Scala code generate a query-plan where an extra exchange 
> is inserted before performing the sort-merge join. This exchange changes the 
> partitioning from {{HashPartitioning("number", 10)}} for each frame being 
> joined into {{HashPartitioning("aliased", 5)}}. I would have expected that 
> since it's a simple column aliasing, and both frames have exactly the same 
> partitioning that the initial frames.
> {noformat} 
> *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, 
> hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)]
> +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], 
> Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 
> 5)%NONNULL,hashpartitioning(aliased#270L, 
> 5)%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, 
> aliased#270:bigint%NONNULL)]
>:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:  +- Exchange [args=hashpartitioning(aliased#267L, 
> 5)%NONNULL][outPart=HashPartitioning(5, 
> aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>: +- *Project [args=[number#198L AS 
> aliased#267L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
>:+- InMemoryTableScan 
> [args=[number#198L]][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)]
>:   :  +- InMemoryRelation [number#198L], true, 1, 
> StorageLevel(disk, memory, deserialized, 1 replicas), 
> false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)]
>:   : :  +- Exchange [args=hashpartitioning(number#198L, 
> 10)%NONNULL][outPart=HashPartitioning(10, 
> number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>:   : : +- LocalTableScan 
> [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
>+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, 
> aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L 
> ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
>   +- Exchange 

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2017-03-28 Thread Mitesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945166#comment-15945166
 ] 

Mitesh edited comment on SPARK-19981 at 3/28/17 3:21 PM:
-

As I mentioned on the PR, maybe its better to fix it here?

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)

*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
{code}


was (Author: masterddt):
As I mentioned on the PR, maybe a better to fix it here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)

*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2017-03-28 Thread Mitesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945166#comment-15945166
 ] 

Mitesh edited comment on SPARK-19981 at 3/28/17 1:35 PM:
-

As I mentioned on the PR, maybe a better to fix it here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)

*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
{code}


was (Author: masterddt):
As I mentioned on the PR, maybe a better to fix it here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2017-03-28 Thread Mitesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945166#comment-15945166
 ] 

Mitesh edited comment on SPARK-19981 at 3/28/17 1:34 PM:
-

As I mentioned on the PR, maybe a better way is to handle it here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
{code}


was (Author: masterddt):
As I mentioned on the PR, this seems like it should be handled here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2017-03-28 Thread Mitesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945166#comment-15945166
 ] 

Mitesh edited comment on SPARK-19981 at 3/28/17 1:34 PM:
-

As I mentioned on the PR, this seems like it should be handled here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
{code}


was (Author: masterddt):
As I mentioned on the PR, this seems like it should be handled here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {[newA}] is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2017-03-28 Thread Mitesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945166#comment-15945166
 ] 

Mitesh edited comment on SPARK-19981 at 3/28/17 1:34 PM:
-

As I mentioned on the PR, maybe a better to fix it here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
{code}


was (Author: masterddt):
As I mentioned on the PR, maybe a better way is to handle it here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {{newA}} is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 

[jira] [Comment Edited] (SPARK-19981) Sort-Merge join inserts shuffles when joining dataframes with aliased columns

2017-03-28 Thread Mitesh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945166#comment-15945166
 ] 

Mitesh edited comment on SPARK-19981 at 3/28/17 1:33 PM:
-

As I mentioned on the PR, this seems like it should be handled here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so {{a#1}} is the same as 
{{a#1 as newA#2}}.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on {[newA}] is unnecessary.

{code}
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
{code}


was (Author: masterddt):
As I mentioned on the PR, this seems like it should be handled here:

https://github.com/maropu/spark/blob/b5d1038ed5d65a6ddec20ea6eef186d25fc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L41

Perhaps it should handle canonicalizing an alias so `a#1` is the same as `a#1 
as newA#2`.

Otherwise you have a similar problem with sorting. Here is a sort example with 
1 partition. I believe the extra sort on `newA` is unnecessary.

```
scala> val df1 = Seq((1, 2), (3, 4)).toDF("a", 
"b").coalesce(1).sortWithinPartitions("a")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala> val df2 = df1.selectExpr("a as newA", "b")
df2: org.apache.spark.sql.DataFrame = [newA: int, b: int]

scala> println(df1.join(df2, df1("a") === 
df2("newA")).queryExecution.executedPlan)
*SortMergeJoin [args=[a#37225], [newA#37232], 
Inner][outPart=PartitioningCollection(1, )][outOrder=List(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL, 
newA#37232:int%NONNULL, b#37243:int%NONNULL)]
:- *Sort [args=[a#37225 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37225 
ASC%NONNULL)][output=List(a#37225:int%NONNULL, b#37226:int%NONNULL)]
:  +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
: +- LocalTableScan [args=[a#37225, 
b#37226]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37225:int%NONNULL,
 b#37226:int%NONNULL)]
+- *Sort [args=[newA#37232 ASC], false, 
0][outPart=SinglePartition][outOrder=List(newA#37232 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
   +- *Project [args=[a#37242 AS newA#37232, 
b#37243]][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=ArrayBuffer(newA#37232:int%NONNULL, b#37243:int%NONNULL)]
  +- *Sort [args=[a#37242 ASC], false, 
0][outPart=SinglePartition][outOrder=ArrayBuffer(a#37242 
ASC%NONNULL)][output=List(a#37242:int%NONNULL, b#37243:int%NONNULL)]
 +- Coalesce 
[args=1][outPart=SinglePartition][outOrder=List()][output=List(a#37242:int%NONNULL,
 b#37243:int%NONNULL)]
+- LocalTableScan [args=[a#37242, 
b#37243]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(a#37242:int%NONNULL,