Allen George created SPARK-19981:
------------------------------------
Summary: Sort-Merge join inserts shuffles when joining dataframes
with aliased columns
Key: SPARK-19981
URL: https://issues.apache.org/jira/browse/SPARK-19981
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.0.2
Reporter: Allen George
Performing a sort-merge join with two dataframes - each of which has the join
column aliased - causes Spark to insert an unnecessary shuffle.
Consider the scala test code below, which should be equivalent to the following
SQL.
{code:SQL}
SELECT * FROM
(SELECT number AS aliased from df1) t1
LEFT JOIN
(SELECT number AS aliased from df2) t2
ON t1.aliased = t2.aliased
{code}
{code:scala}
private case class OneItem(number: Long)
private case class TwoItem(number: Long, value: String)
test("join with aliases should not trigger shuffle") {
val df1 = sqlContext.createDataFrame(
Seq(
OneItem(0),
OneItem(2),
OneItem(4)
)
)
val partitionedDf1 = df1.repartition(10, col("number"))
partitionedDf1.createOrReplaceTempView("df1")
partitionedDf1.cache() partitionedDf1.count()
val df2 = sqlContext.createDataFrame(
Seq(
TwoItem(0, "zero"),
TwoItem(2, "two"),
TwoItem(4, "four")
)
)
val partitionedDf2 = df2.repartition(10, col("number"))
partitionedDf2.createOrReplaceTempView("df2")
partitionedDf2.cache() partitionedDf2.count()
val fromDf1 = sqlContext.sql("SELECT number from df1")
val fromDf2 = sqlContext.sql("SELECT number from df2")
val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased")
val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased")
aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") }
{code}
Both the SQL and the Scala code generate a query-plan where an extra exchange
is inserted before performing the sort-merge join. This exchange changes the
partitioning from {{HashPartitioning("number", 10)}} for each frame being
joined into {{HashPartitioning("aliased", 5)}}. I would have expected that
since it's a simple column aliasing, and both frames have exactly the same
partitioning that the initial frames.
{noformat}
*Project [args=[aliased#267L]][outPart=PartitioningCollection(5,
hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L,
5)%NONNULL)][outOrder=List(aliased#267L
ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)]
+- *SortMergeJoin [args=[aliased#267L], [aliased#270L],
Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L,
5)%NONNULL,hashpartitioning(aliased#270L,
5)%NONNULL)][outOrder=List(aliased#267L
ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL,
aliased#270:bigint%NONNULL)]
:- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5,
aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L
ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
: +- Exchange [args=hashpartitioning(aliased#267L,
5)%NONNULL][outPart=HashPartitioning(5,
aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
: +- *Project [args=[number#198L AS
aliased#267L]][outPart=HashPartitioning(10,
number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)]
: +- InMemoryTableScan
[args=[number#198L]][outPart=HashPartitioning(10,
number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)]
: : +- InMemoryRelation [number#198L], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas),
false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)]
: : : +- Exchange [args=hashpartitioning(number#198L,
10)%NONNULL][outPart=HashPartitioning(10,
number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
: : : +- LocalTableScan
[args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)]
+- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5,
aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L
ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
+- Exchange [args=hashpartitioning(aliased#270L,
5)%NONNULL][outPart=HashPartitioning(5,
aliased#270:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
+- *Project [args=[number#223L AS
aliased#270L]][outPart=HashPartitioning(10,
number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)]
+- InMemoryTableScan
[args=[number#223L]][outPart=HashPartitioning(10,
number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#223:bigint%NONNULL)]
: +- InMemoryRelation [number#223L, value#224], true, 10000,
StorageLevel(disk, memory, deserialized, 1 replicas),
false[Statistics(47,false)][output=List(number#223:bigint%NONNULL,
value#224:string%NULL)]
: : +- Exchange [args=hashpartitioning(number#223L,
10)%NONNULL][outPart=HashPartitioning(10,
number#223:bigint%NONNULL)][outOrder=List()][output=List(number#223:bigint%NONNULL,
value#224:string%NULL)]
: : +- LocalTableScan [args=[number#223L,
value#224]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#223:bigint%NONNULL,
value#224:string%NULL)]
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]